From 077d47f8b2b08ac7cf7c650a26a9a0dfdb63a412 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Thu, 12 Mar 2026 19:00:51 +0100 Subject: [PATCH] feat: clean up local_actors/outbound tables when senders exit The endpoint now links to each local actor when it is first assigned an outbound wire ID, trapping the exit to remove stale entries from the outbound and local_actors tables. To preserve spawn_link semantics, the endpoint stores the spawner's instance_id() at init time. Trapped exits from the spawner are always re-propagated regardless of whether the spawner also appears as an outbound sender. --- src/remote/endpoint.zig | 27 ++++++++++++++++++++++++++- test/remote_endpoint_id_test.zig | 1 + test/remote_endpoint_test.zig | 1 + test/remote_lifetime_test.zig | 1 + 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 5fd6dea..8ea4077 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -23,6 +23,10 @@ pub const Args = struct { /// CBOR-encoded argv array for the child binary. /// Must be heap-allocated; endpoint.init will free it. argv: []const u8, + /// The actor that spawned this endpoint. Exits from this handle are + /// always propagated (re-exited) even if the handle is also in the + /// outbound table, so that spawn_link semantics are preserved. + spawner: tp.pid_ref, }; const Endpoint = struct { @@ -36,6 +40,10 @@ const Endpoint = struct { /// Local actor table: wire ID → local actor pid (reverse of outbound). local_actors: std.AutoHashMap(u64, tp.pid), next_id: u64, + /// Stable identity of the spawning actor. Exits from this actor are + /// always propagated so that spawn_link semantics work correctly even + /// when the spawner is also an outbound sender. + spawner_id: usize, receiver: tp.Receiver(*@This()), fn start(args: Args) tp.result { @@ -54,9 +62,13 @@ const Endpoint = struct { .outbound = std.AutoHashMap(usize, u64).init(args.allocator), .local_actors = std.AutoHashMap(u64, tp.pid).init(args.allocator), .next_id = 1, + .spawner_id = args.spawner.instance_id(), .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); + // Trap exits so we can distinguish monitored-local-actor exits (handle + // in outbound table) from propagated spawner/parent exits. + _ = tp.set_trap(true); tp.receive(&self.receiver); } @@ -83,7 +95,7 @@ const Endpoint = struct { return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); } - fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { var bytes: []const u8 = ""; var from_id: u64 = 0; var to_id: u64 = 0; @@ -119,6 +131,16 @@ const Endpoint = struct { } else if (try m.match(.{ "proxy_exit", tp.extract(&remote_id), tp.any })) { // A local proxy has exited; remove it from the proxy table. if (self.proxies.fetchRemove(remote_id)) |entry| entry.value.deinit(); + } else if (try m.match(.{ "exit", tp.extract(&reason) })) { + // Trapped exit. Always propagate exits from the spawner (spawn_link + // semantics must be preserved even if the spawner also sent messages + // through us and has an outbound entry). For all other exits, clean + // up the outbound/local_actors entries for the exiting actor. + if (from.instance_id() == self.spawner_id) { + return tp.exit(reason); + } else if (self.outbound.fetchRemove(@intFromPtr(from.h))) |entry| { + if (self.local_actors.fetchRemove(entry.value)) |a| a.value.deinit(); + } } else if (try m.match(.{ proc_tag, "term", tp.any, tp.any })) { return tp.exit("transport_closed"); } else { @@ -137,6 +159,9 @@ const Endpoint = struct { const pid_ref: tp.pid_ref = .{ .h = @ptrFromInt(key) }; try self.local_actors.put(id, pid_ref.clone()); try self.outbound.put(key, id); + // Link so we receive a trapped exit when this actor exits, allowing us + // to clean up the outbound and local_actors tables. + try pid_ref.link(); return id; } diff --git a/test/remote_endpoint_id_test.zig b/test/remote_endpoint_id_test.zig index 267dee6..b39c265 100644 --- a/test/remote_endpoint_id_test.zig +++ b/test/remote_endpoint_id_test.zig @@ -53,6 +53,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint", diff --git a/test/remote_endpoint_test.zig b/test/remote_endpoint_test.zig index bab076f..c4ebe01 100644 --- a/test/remote_endpoint_test.zig +++ b/test/remote_endpoint_test.zig @@ -37,6 +37,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint", diff --git a/test/remote_lifetime_test.zig b/test/remote_lifetime_test.zig index 43000ef..c500d98 100644 --- a/test/remote_lifetime_test.zig +++ b/test/remote_lifetime_test.zig @@ -66,6 +66,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint",