thespian/src/remote/endpoint.zig
CJ van den Berg 446667892c
feat: clean up local_actors/outbound tables when senders exit
The endpoint now links to each local actor when it is first assigned an
outbound wire ID, trapping the exit to remove stale entries from the
outbound and local_actors tables.

To preserve spawn_link semantics, the endpoint stores the spawner's
instance_id() at init time. Trapped exits from the spawner are always
re-propagated regardless of whether the spawner also appears as an outbound
sender.
2026-04-17 22:47:23 +02:00

269 lines
13 KiB
Zig

/// Parent-side endpoint actor.
///
/// Wraps a subprocess, accumulates framed CBOR from its stdout, and dispatches
/// decoded messages to local actors or proxies.
///
/// Outbound messages (local → endpoint):
/// {"send", from_id: u64, to_id: u64, payload} — via proxy, by remote ID
/// {"send", from_id: u64, to_name: text, payload} — direct by name
/// {"proxy_exit", remote_id: u64, reason: text} — proxy notifying of exit
const std = @import("std");
const tp = @import("thespian");
const cbor = @import("cbor");
const framing = @import("framing");
const protocol = @import("protocol");
const proxy = @import("proxy");
const subprocess = tp.subprocess;
const proc_tag = "endpoint_proc";
pub const Args = struct {
allocator: std.mem.Allocator,
/// CBOR-encoded argv array for the child binary.
/// Must be heap-allocated; endpoint.init will free it.
argv: []const u8,
/// The actor that spawned this endpoint. Exits from this handle are
/// always propagated (re-exited) even if the handle is also in the
/// outbound table, so that spawn_link semantics are preserved.
spawner: tp.pid_ref,
};
const Endpoint = struct {
allocator: std.mem.Allocator,
proc: subprocess,
accumulator: framing.Accumulator,
/// Inbound proxy table: remote actor ID → local proxy pid.
proxies: std.AutoHashMap(u64, tp.pid),
/// Outbound table: local actor handle pointer → assigned wire ID.
outbound: std.AutoHashMap(usize, u64),
/// Local actor table: wire ID → local actor pid (reverse of outbound).
local_actors: std.AutoHashMap(u64, tp.pid),
next_id: u64,
/// Stable identity of the spawning actor. Exits from this actor are
/// always propagated so that spawn_link semantics work correctly even
/// when the spawner is also an outbound sender.
spawner_id: usize,
receiver: tp.Receiver(*@This()),
fn start(args: Args) tp.result {
return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace());
}
fn init(args: Args) !void {
defer args.allocator.free(args.argv);
const proc = try subprocess.init(args.allocator, tp.message{ .buf = args.argv }, proc_tag, .Pipe);
const self = try args.allocator.create(@This());
self.* = .{
.allocator = args.allocator,
.proc = proc,
.accumulator = .{},
.proxies = std.AutoHashMap(u64, tp.pid).init(args.allocator),
.outbound = std.AutoHashMap(usize, u64).init(args.allocator),
.local_actors = std.AutoHashMap(u64, tp.pid).init(args.allocator),
.next_id = 1,
.spawner_id = args.spawner.instance_id(),
.receiver = .init(receive_fn, deinit, self),
};
errdefer self.deinit();
// Trap exits so we can distinguish monitored-local-actor exits (handle
// in outbound table) from propagated spawner/parent exits.
_ = tp.set_trap(true);
tp.receive(&self.receiver);
}
fn deinit(self: *@This()) void {
// Shut down all live proxies. Use "endpoint_exit" rather than "exit"
// so the proxy can distinguish this deinit-time message (which has no
// live sender context, i.e. from.instance_id()==0) from a Thespian
// trapped-exit sent by a linked local actor.
var proxy_it = self.proxies.valueIterator();
while (proxy_it.next()) |p| {
p.send(.{ "endpoint_exit", "transport_closed" }) catch {};
p.deinit();
}
self.proxies.deinit();
var actor_it = self.local_actors.valueIterator();
while (actor_it.next()) |p| p.deinit();
self.local_actors.deinit();
self.outbound.deinit();
self.proc.deinit();
self.allocator.destroy(self);
}
fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result {
return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace());
}
fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void {
var bytes: []const u8 = "";
var from_id: u64 = 0;
var to_id: u64 = 0;
var to_name: []const u8 = "";
var payload: []const u8 = "";
var remote_id: u64 = 0;
var reason: []const u8 = "";
if (try m.match(.{ proc_tag, "stdout", tp.extract(&bytes) })) {
if (self.accumulator.feed(bytes)) |frame| try self.dispatch_inbound(frame);
} else if (try m.match(.{ proc_tag, "stderr", tp.any })) {
} else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_id), cbor.extract_cbor(&payload) })) {
// Outbound send by remote ID — from a local proxy.
// from_id carries the sender's handle pointer; resolve to wire ID.
const wire_from_id = try self.get_or_assign_outbound_id(from_id);
try self.send_wire_by_id(wire_from_id, to_id, payload);
} else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) {
// Outbound send_named — from a local actor addressing a well-known remote actor by name.
try self.send_wire_named(from_id, to_name, payload);
} else if (try m.match(.{ "link_wire", tp.extract(&from_id), tp.extract(&to_id) })) {
// A local proxy is establishing a remote link: from_id=local handle pointer,
// to_id=remote_id of the target on the remote system.
const wire_from_id = try self.get_or_assign_outbound_id(from_id);
try self.send_wire_link(wire_from_id, to_id);
} else if (try m.match(.{ "local_link_exit", tp.extract(&from_id), tp.extract(&reason) })) {
// A local actor that a proxy was Thespian-linked to has exited.
// from_id is the actor's handle pointer; look up its wire ID and
// forward an exit wire message so the remote side can propagate it.
const key: usize = @intCast(from_id);
if (self.outbound.get(key)) |wire_id|
try self.send_wire_exit(wire_id, reason);
// If not in the outbound table the actor never sent outbound; ignore.
} else if (try m.match(.{ "proxy_exit", tp.extract(&remote_id), tp.any })) {
// A local proxy has exited; remove it from the proxy table.
if (self.proxies.fetchRemove(remote_id)) |entry| entry.value.deinit();
} else if (try m.match(.{ "exit", tp.extract(&reason) })) {
// Trapped exit. Always propagate exits from the spawner (spawn_link
// semantics must be preserved even if the spawner also sent messages
// through us and has an outbound entry). For all other exits, clean
// up the outbound/local_actors entries for the exiting actor.
if (from.instance_id() == self.spawner_id) {
return tp.exit(reason);
} else if (self.outbound.fetchRemove(@intFromPtr(from.h))) |entry| {
if (self.local_actors.fetchRemove(entry.value)) |a| a.value.deinit();
}
} else if (try m.match(.{ proc_tag, "term", tp.any, tp.any })) {
return tp.exit("transport_closed");
} else {
return tp.unexpected(m);
}
}
/// Look up or assign a stable wire ID for a local actor identified by its
/// handle pointer. Also records the pid so inbound sends can be routed back.
fn get_or_assign_outbound_id(self: *@This(), handle_int: u64) !u64 {
const key: usize = @intCast(handle_int);
if (self.outbound.get(key)) |id| return id;
const id = self.next_id;
self.next_id += 1;
// Reconstruct a pid_ref from the handle pointer and clone it for storage.
const pid_ref: tp.pid_ref = .{ .h = @ptrFromInt(key) };
try self.local_actors.put(id, pid_ref.clone());
try self.outbound.put(key, id);
// Link so we receive a trapped exit when this actor exits, allowing us
// to clean up the outbound and local_actors tables.
try pid_ref.link();
return id;
}
/// Get the local proxy for remote_id, creating one if it doesn't exist yet.
fn get_or_create_proxy(self: *@This(), remote_id: u64) !tp.pid_ref {
if (self.proxies.getPtr(remote_id)) |p| return p.ref();
const p = try tp.spawn(self.allocator, proxy.Args{
.allocator = self.allocator,
.endpoint = tp.self_pid().clone(),
.remote_id = remote_id,
}, proxy.start, "proxy");
try self.proxies.put(remote_id, p);
return self.proxies.getPtr(remote_id).?.ref();
}
fn dispatch_inbound(self: *@This(), frame: []const u8) !void {
const msg = try protocol.decode(frame);
switch (msg) {
.send => |s| {
// Ensure a local proxy exists for the sender.
const prx = try self.get_or_create_proxy(s.from_id);
// Route to the local actor that owns this wire ID, delivering
// FROM the proxy so the recipient can reply naturally.
if (self.local_actors.getPtr(s.to_id)) |actor_ptr|
try prx.send(.{ "deliver_pid", @as(u64, @intFromPtr(actor_ptr.h)), protocol.RawCbor{ .bytes = s.payload } })
else
return tp.exit_error(error.UnknownLocalActor, null);
},
.send_named => |s| {
if (s.from_id != 0) {
// Deliver FROM the proxy so the recipient sees the proxy as
// `from` and can reply by sending back through it.
const prx = try self.get_or_create_proxy(s.from_id);
try prx.send(.{ "deliver_named", s.to_name, protocol.RawCbor{ .bytes = s.payload } });
} else {
// No sender identity; deliver directly without from substitution.
const actor = tp.env.get().proc(s.to_name);
try actor.send_raw(tp.message{ .buf = s.payload });
}
},
.exit => |e| {
// Remote actor has exited; notify its local proxy.
if (self.proxies.get(e.id)) |p|
p.send(.{ "exit", e.reason }) catch {};
},
.link => |lnk| {
// Remote side is establishing a link: lnk.local_id is the remote
// actor's wire ID (= our proxy key), lnk.remote_id is the local
// actor's wire ID in our local_actors table.
const prx = try self.get_or_create_proxy(lnk.local_id);
if (self.local_actors.getPtr(lnk.remote_id)) |actor_ptr|
try prx.send(.{ "set_notify", @as(u64, @intFromPtr(actor_ptr.h)) })
else
return tp.exit_error(error.UnknownLocalActor, null);
},
.transport_error => |te| return tp.exit(te.reason),
}
}
fn send_wire_by_id(self: *@This(), from_id: u64, to_id: u64, payload: []const u8) !void {
var msg_buf: [framing.max_frame_size]u8 = undefined;
var msg_stream: std.Io.Writer = .fixed(&msg_buf);
try protocol.encode_send(&msg_stream, from_id, to_id, payload);
var frame_buf: [framing.max_frame_size + 4]u8 = undefined;
var frame_stream: std.Io.Writer = .fixed(&frame_buf);
try framing.write_frame(&frame_stream, msg_stream.buffered());
try self.proc.send(frame_stream.buffered());
}
fn send_wire_link(self: *@This(), local_id: u64, remote_id: u64) !void {
var msg_buf: [framing.max_frame_size]u8 = undefined;
var msg_stream: std.Io.Writer = .fixed(&msg_buf);
try protocol.encode_link(&msg_stream, local_id, remote_id);
var frame_buf: [framing.max_frame_size + 4]u8 = undefined;
var frame_stream: std.Io.Writer = .fixed(&frame_buf);
try framing.write_frame(&frame_stream, msg_stream.buffered());
try self.proc.send(frame_stream.buffered());
}
fn send_wire_exit(self: *@This(), id: u64, reason: []const u8) !void {
var msg_buf: [framing.max_frame_size]u8 = undefined;
var msg_stream: std.Io.Writer = .fixed(&msg_buf);
try protocol.encode_exit(&msg_stream, id, reason);
var frame_buf: [framing.max_frame_size + 4]u8 = undefined;
var frame_stream: std.Io.Writer = .fixed(&frame_buf);
try framing.write_frame(&frame_stream, msg_stream.buffered());
try self.proc.send(frame_stream.buffered());
}
fn send_wire_named(self: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void {
var msg_buf: [framing.max_frame_size]u8 = undefined;
var msg_stream: std.Io.Writer = .fixed(&msg_buf);
try protocol.encode_send_named(&msg_stream, from_id, to_name, payload);
var frame_buf: [framing.max_frame_size + 4]u8 = undefined;
var frame_stream: std.Io.Writer = .fixed(&frame_buf);
try framing.write_frame(&frame_stream, msg_stream.buffered());
try self.proc.send(frame_stream.buffered());
}
};
pub const start = Endpoint.start;