diff --git a/test/remote_child_endpoint.zig b/test/remote_child_endpoint.zig index 77ecfae..d6308db 100644 --- a/test/remote_child_endpoint.zig +++ b/test/remote_child_endpoint.zig @@ -10,8 +10,17 @@ /// so the parent can establish a proxy and reply by ID. /// - If from_id != 0: reply mode — sends wire send to from_id with ["done"]. /// +/// "die_test" (wire ID 4): receives {from_id, payload}. +/// - If from_id == 0: trigger mode — echoes payload back to "test_receiver" +/// so the parent can establish a proxy for this actor. +/// - If from_id != 0: die mode — exits with "die_test", triggering the +/// cross-process link/exit propagation path. +/// /// StdioEndpoint dispatches inbound wire messages to actors, passing /// {from_id, payload} so actors can reply by ID when needed. +/// +/// StdioEndpoint also supports the wire "link" message: it monitors the +/// referenced local actor and sends ["exit", wire_id, reason] when it exits. const std = @import("std"); const tp = @import("thespian"); const cbor = @import("cbor"); @@ -138,6 +147,61 @@ const EchoIdActor = struct { } }; +// --------------------------------------------------------------------------- +// DieTestActor (wire ID 4) +// --------------------------------------------------------------------------- + +const DieTestActor = struct { + allocator: std.mem.Allocator, + receiver: tp.Receiver(*@This()), + + const Args = struct { + allocator: std.mem.Allocator, + parent: tp.pid, + }; + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + defer args.parent.deinit(); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + try args.parent.send(.{"die_test_ready"}); + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { + var from_id: u64 = 0; + var payload: []const u8 = ""; + if (try m.match(.{ tp.extract(&from_id), cbor.extract_cbor(&payload) })) { + if (from_id == 0) { + // Trigger mode: echo payload back so the parent can establish a proxy. + try from.send(.{ "send", @as(u64, 4), "test_receiver", protocol.RawCbor{ .bytes = payload } }); + } else { + // Die mode: any message from a non-zero sender causes a clean exit. + return tp.exit("die_test"); + } + } else { + return tp.unexpected(m); + } + _ = self; + } +}; + // --------------------------------------------------------------------------- // StdioEndpoint // --------------------------------------------------------------------------- @@ -149,6 +213,8 @@ const StdioEndpoint = struct { read_buf: [4096]u8, /// Maps wire ID → local actor pid for routing inbound send-by-ID. wire_ids: std.AutoHashMap(u64, tp.pid), + /// Maps actor instance_id → wire ID for exit notification via wire "link". + link_notify: std.AutoHashMap(usize, u64), /// Number of child actors that have reported ready; arm stdin when all ready. ready_count: u8, receiver: tp.Receiver(*@This()), @@ -168,11 +234,16 @@ const StdioEndpoint = struct { .accumulator = .{}, .read_buf = undefined, .wire_ids = std.AutoHashMap(u64, tp.pid).init(args.allocator), + .link_notify = std.AutoHashMap(usize, u64).init(args.allocator), .ready_count = 0, .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); + // Trap exit signals so linked actor exits arrive as messages, + // allowing us to send wire "exit" notifications to the parent. + _ = tp.set_trap(true); + _ = try tp.spawn_link(args.allocator, EchoActor.Args{ .allocator = args.allocator, .parent = tp.self_pid().clone(), @@ -183,6 +254,11 @@ const StdioEndpoint = struct { .parent = tp.self_pid().clone(), }, EchoIdActor.start, "echo_id"); + _ = try tp.spawn_link(args.allocator, DieTestActor.Args{ + .allocator = args.allocator, + .parent = tp.self_pid().clone(), + }, DieTestActor.start, "die_test"); + tp.receive(&self.receiver); } @@ -190,6 +266,7 @@ const StdioEndpoint = struct { var it = self.wire_ids.valueIterator(); while (it.next()) |p| p.deinit(); self.wire_ids.deinit(); + self.link_notify.deinit(); self.fd_stdin.deinit(); self.allocator.destroy(self); } @@ -203,17 +280,34 @@ const StdioEndpoint = struct { var to_id_wire: u64 = 0; var to_name: []const u8 = ""; var payload: []const u8 = ""; + var reason: []const u8 = ""; if (try m.match(.{"echo_ready"})) { tp.env.get().proc_set("echo", from); try self.wire_ids.put(2, from.clone()); self.ready_count += 1; - if (self.ready_count == 2) try self.fd_stdin.wait_read(); + if (self.ready_count == 3) try self.fd_stdin.wait_read(); } else if (try m.match(.{"echo_id_ready"})) { tp.env.get().proc_set("echo_id", from); try self.wire_ids.put(3, from.clone()); self.ready_count += 1; - if (self.ready_count == 2) try self.fd_stdin.wait_read(); + if (self.ready_count == 3) try self.fd_stdin.wait_read(); + } else if (try m.match(.{"die_test_ready"})) { + tp.env.get().proc_set("die_test", from); + try self.wire_ids.put(4, from.clone()); + self.ready_count += 1; + if (self.ready_count == 3) try self.fd_stdin.wait_read(); + } else if (try m.match(.{ "exit", tp.extract(&reason) })) { + // A linked actor has exited. If it was set up via a wire "link", + // send ["exit", wire_id, reason] to the parent. Otherwise the exit + // is from a sibling actor dying unexpectedly — propagate cleanly. + const actor_id = from.instance_id(); + if (self.link_notify.fetchRemove(actor_id)) |entry| { + self.send_wire_exit(entry.value, reason) catch {}; + } else { + self.send_transport_error(reason); + return tp.exit(reason); + } } else if (try m.match(.{ "fd", "stdin", "read_ready" })) { try self.dispatch_stdin(); try self.fd_stdin.wait_read(); @@ -265,7 +359,17 @@ const StdioEndpoint = struct { if (self.wire_ids.get(e.id)) |actor| actor.send(.{ "exit", e.reason }) catch {}; }, - .link => return tp.exit_error(error.LinkNotSupported, null), + .link => |lnk| { + // Remote side is requesting a link to one of our local actors. + // lnk.remote_id is the wire ID of our local actor to monitor. + // When that actor exits, we send ["exit", wire_id, reason]. + if (self.wire_ids.get(lnk.remote_id)) |actor| { + try actor.link(); + try self.link_notify.put(actor.instance_id(), lnk.remote_id); + } else { + return tp.exit_error(error.UnknownWireId, null); + } + }, } } @@ -291,6 +395,17 @@ const StdioEndpoint = struct { try std.fs.File.stdout().writeAll(frame_stream.buffered()); } + fn send_wire_exit(_: *@This(), wire_id: u64, reason: []const u8) !void { + var msg_buf: [framing.max_frame_size]u8 = undefined; + var msg_stream: std.Io.Writer = .fixed(&msg_buf); + try protocol.encode_exit(&msg_stream, wire_id, reason); + + var frame_buf: [framing.max_frame_size + 4]u8 = undefined; + var frame_stream: std.Io.Writer = .fixed(&frame_buf); + try framing.write_frame(&frame_stream, msg_stream.buffered()); + try std.fs.File.stdout().writeAll(frame_stream.buffered()); + } + fn send_transport_error(_: *@This(), reason: []const u8) void { var msg_buf: [framing.max_frame_size]u8 = undefined; var msg_stream: std.Io.Writer = .fixed(&msg_buf); diff --git a/test/remote_lifetime_test.zig b/test/remote_lifetime_test.zig new file mode 100644 index 0000000..43000ef --- /dev/null +++ b/test/remote_lifetime_test.zig @@ -0,0 +1,176 @@ +/// Tests cross-endpoint actor lifetime and exit propagation via the wire +/// "link" protocol. +/// +/// Scenario (full round-trip): +/// +/// 1. Parent sends send_named to "die_test" (from_id=0, trigger mode). +/// 2. Child die_test echoes payload back — proxy_4 is created on the parent, +/// message arrives FROM proxy_4 at TestActor. +/// 3. TestActor sends {"link"} to proxy_4, establishing a Thespian link +/// between proxy_4 and TestActor AND requesting a wire "link" message. +/// 4. Wire ["link", TA_wire_id, 4] reaches child; child's StdioEndpoint +/// calls actor.link() on die_test and records the mapping in link_notify. +/// 5. TestActor sends {"die"} to proxy_4, which forwards it over the wire +/// to die_test (to_id=4). +/// 6. die_test receives from_id != 0 and exits "die_test". +/// 7. Child's StdioEndpoint receives the trapped exit from die_test, sends +/// wire ["exit", 4, "die_test"] to the parent. +/// 8. Parent endpoint dispatches to proxy_4 via proxies[4], sends +/// {"exit", "die_test"} to proxy_4. +/// 9. proxy_4 exits "die_test"; TestActor (Thespian-linked to proxy_4) +/// receives {"exit", "die_test"} and exits "success". +const std = @import("std"); +const thespian = @import("thespian"); +const cbor = @import("cbor"); +const endpoint = @import("endpoint"); +const build_options = @import("build_options"); + +var trace_file: ?std.fs.File = null; +var trace_buf: [4096]u8 = undefined; +var trace_file_writer: std.fs.File.Writer = undefined; + +fn trace_handler(buf: thespian.message.c_buffer_type) callconv(.c) void { + if (trace_file == null) return; + cbor.toJsonWriter(buf.base[0..buf.len], &trace_file_writer.interface, .{}) catch return; + trace_file_writer.interface.writeByte('\n') catch return; +} + +const Allocator = std.mem.Allocator; +const result = thespian.result; +const pid_ref = thespian.pid_ref; +const Receiver = thespian.Receiver; +const message = thespian.message; + +const TestActor = struct { + allocator: Allocator, + ep: thespian.pid, + receiver: Receiver(*@This()), + state: enum { waiting_for_trigger_reply, waiting_for_proxy_exit }, + + const Args = struct { allocator: Allocator }; + + fn start(args: Args) result { + return init(args) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + // Trap exits so {"exit", reason} arrives as a message rather than + // killing the actor; we need to inspect the exit reason. + _ = thespian.set_trap(true); + + thespian.env.get().proc_set("test_receiver", thespian.self_pid().ref()); + + const argv = try args.allocator.dupe(u8, message.fmt(.{build_options.remote_child_endpoint_path}).buf); + const ep = try thespian.spawn_link( + args.allocator, + endpoint.Args{ + .allocator = args.allocator, + .argv = argv, + }, + endpoint.start, + "endpoint", + ); + + // Trigger die_test in trigger mode (from_id=0): child echoes payload + // back so the parent can establish proxy_4. + try ep.send(.{ "send", @as(u64, 0), "die_test", .{"trigger"} }); + + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .ep = ep, + .receiver = .init(receive_fn, deinit, self), + .state = .waiting_for_trigger_reply, + }; + errdefer self.deinit(); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.ep.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: pid_ref, m: message) result { + return self.receive(from, m) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: pid_ref, m: message) !void { + var reason: []const u8 = ""; + switch (self.state) { + .waiting_for_trigger_reply => { + if (try m.match(.{"trigger"})) { + // `from` is proxy_4 (the proxy for die_test, remote_id=4). + // + // 1. Send {"link"} so proxy_4 establishes a Thespian link + // with this actor AND sends the wire "link" message to + // the child, which will monitor die_test. + try from.send(.{"link"}); + // + // 2. Send {"die"} to proxy_4; it is forwarded over the wire + // to die_test (to_id=4), causing it to exit "die_test". + // Ordering is guaranteed: "link" is processed by the + // child before the die trigger because both travel through + // the same in-order pipe. + try from.send(.{"die"}); + self.state = .waiting_for_proxy_exit; + } else { + return thespian.unexpected(m); + } + }, + .waiting_for_proxy_exit => { + if (try m.match(.{ "exit", thespian.extract(&reason) })) { + if (std.mem.eql(u8, reason, "die_test")) + return thespian.exit("success"); + // Any other exit (e.g. endpoint crash) is a failure. + return thespian.unexpected(m); + } else { + return thespian.unexpected(m); + } + }, + } + } +}; + +test "remote: cross-process link/exit propagation via wire link protocol" { + const allocator = std.testing.allocator; + + var initial_env: ?thespian.env = null; + if (std.posix.getenv("TRACE") != null) { + const f = try std.fs.cwd().createFile("remote_lifetime_trace.json", .{}); + trace_file = f; + trace_file_writer = f.writer(&trace_buf); + var e = thespian.env.init(); + e.on_trace(&trace_handler); + e.enable_all_channels(); + initial_env = e; + } + defer if (initial_env) |e| { + trace_file_writer.interface.flush() catch {}; + trace_file.?.close(); + trace_file = null; + e.deinit(); + }; + + var ctx = try thespian.context.init(allocator); + defer ctx.deinit(); + + var success = false; + var exit_handler = thespian.make_exit_handler(&success, struct { + fn handle(ok: *bool, status: []const u8) void { + ok.* = std.mem.eql(u8, status, "success"); + } + }.handle); + + _ = try ctx.spawn_link( + TestActor.Args{ .allocator = allocator }, + TestActor.start, + "test_actor", + &exit_handler, + if (initial_env) |*e| e else null, + ); + + ctx.run(); + + if (!success) return error.TestFailed; +} diff --git a/test/tests.zig b/test/tests.zig index e7543a8..023c7f7 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -6,6 +6,7 @@ pub const remote_poc = @import("remote_poc_test.zig"); pub const remote_roundtrip = @import("remote_roundtrip_test.zig"); pub const remote_endpoint = @import("remote_endpoint_test.zig"); pub const remote_endpoint_id = @import("remote_endpoint_id_test.zig"); +pub const remote_lifetime = @import("remote_lifetime_test.zig"); test { std.testing.refAllDecls(@This());