diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 5fd6dea..8ea4077 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -23,6 +23,10 @@ pub const Args = struct { /// CBOR-encoded argv array for the child binary. /// Must be heap-allocated; endpoint.init will free it. argv: []const u8, + /// The actor that spawned this endpoint. Exits from this handle are + /// always propagated (re-exited) even if the handle is also in the + /// outbound table, so that spawn_link semantics are preserved. + spawner: tp.pid_ref, }; const Endpoint = struct { @@ -36,6 +40,10 @@ const Endpoint = struct { /// Local actor table: wire ID → local actor pid (reverse of outbound). local_actors: std.AutoHashMap(u64, tp.pid), next_id: u64, + /// Stable identity of the spawning actor. Exits from this actor are + /// always propagated so that spawn_link semantics work correctly even + /// when the spawner is also an outbound sender. + spawner_id: usize, receiver: tp.Receiver(*@This()), fn start(args: Args) tp.result { @@ -54,9 +62,13 @@ const Endpoint = struct { .outbound = std.AutoHashMap(usize, u64).init(args.allocator), .local_actors = std.AutoHashMap(u64, tp.pid).init(args.allocator), .next_id = 1, + .spawner_id = args.spawner.instance_id(), .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); + // Trap exits so we can distinguish monitored-local-actor exits (handle + // in outbound table) from propagated spawner/parent exits. + _ = tp.set_trap(true); tp.receive(&self.receiver); } @@ -83,7 +95,7 @@ const Endpoint = struct { return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); } - fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { var bytes: []const u8 = ""; var from_id: u64 = 0; var to_id: u64 = 0; @@ -119,6 +131,16 @@ const Endpoint = struct { } else if (try m.match(.{ "proxy_exit", tp.extract(&remote_id), tp.any })) { // A local proxy has exited; remove it from the proxy table. if (self.proxies.fetchRemove(remote_id)) |entry| entry.value.deinit(); + } else if (try m.match(.{ "exit", tp.extract(&reason) })) { + // Trapped exit. Always propagate exits from the spawner (spawn_link + // semantics must be preserved even if the spawner also sent messages + // through us and has an outbound entry). For all other exits, clean + // up the outbound/local_actors entries for the exiting actor. + if (from.instance_id() == self.spawner_id) { + return tp.exit(reason); + } else if (self.outbound.fetchRemove(@intFromPtr(from.h))) |entry| { + if (self.local_actors.fetchRemove(entry.value)) |a| a.value.deinit(); + } } else if (try m.match(.{ proc_tag, "term", tp.any, tp.any })) { return tp.exit("transport_closed"); } else { @@ -137,6 +159,9 @@ const Endpoint = struct { const pid_ref: tp.pid_ref = .{ .h = @ptrFromInt(key) }; try self.local_actors.put(id, pid_ref.clone()); try self.outbound.put(key, id); + // Link so we receive a trapped exit when this actor exits, allowing us + // to clean up the outbound and local_actors tables. + try pid_ref.link(); return id; } diff --git a/test/remote_endpoint_id_test.zig b/test/remote_endpoint_id_test.zig index 267dee6..b39c265 100644 --- a/test/remote_endpoint_id_test.zig +++ b/test/remote_endpoint_id_test.zig @@ -53,6 +53,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint", diff --git a/test/remote_endpoint_test.zig b/test/remote_endpoint_test.zig index bab076f..c4ebe01 100644 --- a/test/remote_endpoint_test.zig +++ b/test/remote_endpoint_test.zig @@ -37,6 +37,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint", diff --git a/test/remote_lifetime_test.zig b/test/remote_lifetime_test.zig index 43000ef..c500d98 100644 --- a/test/remote_lifetime_test.zig +++ b/test/remote_lifetime_test.zig @@ -66,6 +66,7 @@ const TestActor = struct { endpoint.Args{ .allocator = args.allocator, .argv = argv, + .spawner = thespian.self_pid().ref(), }, endpoint.start, "endpoint",