diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 0aa2e66..3d62461 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -140,18 +140,25 @@ const Endpoint = struct { switch (msg) { .send => |s| { // Ensure a local proxy exists for the sender. - _ = try self.get_or_create_proxy(s.from_id); - // Route to the local actor that owns this wire ID. - if (self.local_actors.get(s.to_id)) |actor| - try actor.send_raw(tp.message{ .buf = s.payload }) + const prx = try self.get_or_create_proxy(s.from_id); + // Route to the local actor that owns this wire ID, delivering + // FROM the proxy so the recipient can reply naturally. + if (self.local_actors.getPtr(s.to_id)) |actor_ptr| + try prx.send(.{ "deliver_pid", @as(u64, @intFromPtr(actor_ptr.h)), protocol.RawCbor{ .bytes = s.payload } }) else return tp.exit_error(error.UnknownLocalActor, null); }, .send_named => |s| { - // Ensure a local proxy exists for the sender so it can receive replies. - _ = try self.get_or_create_proxy(s.from_id); - const actor = tp.env.get().proc(s.to_name); - try actor.send_raw(tp.message{ .buf = s.payload }); + if (s.from_id != 0) { + // Deliver FROM the proxy so the recipient sees the proxy as + // `from` and can reply by sending back through it. + const prx = try self.get_or_create_proxy(s.from_id); + try prx.send(.{ "deliver_named", s.to_name, protocol.RawCbor{ .bytes = s.payload } }); + } else { + // No sender identity; deliver directly without from substitution. + const actor = tp.env.get().proc(s.to_name); + try actor.send_raw(tp.message{ .buf = s.payload }); + } }, .exit => |e| { // Remote actor has exited; notify its local proxy. diff --git a/src/remote/proxy.zig b/src/remote/proxy.zig index bb4c4e4..79fef76 100644 --- a/src/remote/proxy.zig +++ b/src/remote/proxy.zig @@ -24,6 +24,9 @@ const Proxy = struct { allocator: std.mem.Allocator, endpoint: tp.pid, remote_id: u64, + /// One cloned pid per sender, keyed by stable instance_id(). + /// Kept alive so the endpoint can safely clone from the stored handle. + senders: std.AutoHashMap(usize, tp.pid), receiver: tp.Receiver(*@This()), fn start(args: Args) tp.result { @@ -36,6 +39,7 @@ const Proxy = struct { .allocator = args.allocator, .endpoint = args.endpoint, .remote_id = args.remote_id, + .senders = std.AutoHashMap(usize, tp.pid).init(args.allocator), .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); @@ -46,6 +50,9 @@ const Proxy = struct { } fn deinit(self: *@This()) void { + var it = self.senders.valueIterator(); + while (it.next()) |p| p.deinit(); + self.senders.deinit(); self.endpoint.deinit(); self.allocator.destroy(self); } @@ -56,6 +63,9 @@ const Proxy = struct { fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { var reason: []const u8 = ""; + var to_name: []const u8 = ""; + var handle_int: u64 = 0; + var payload: []const u8 = ""; if (try m.match(.{ "exit", tp.extract(&reason) })) { // Notify the endpoint so it can remove us from the proxy table. @@ -63,12 +73,26 @@ const Proxy = struct { // this exit came from transport collapse. self.endpoint.send(.{ "proxy_exit", self.remote_id, reason }) catch {}; return tp.exit(reason); + } else if (try m.match(.{ "deliver_named", tp.extract(&to_name), tp.extract_cbor(&payload) })) { + // Deliver to a named local actor. Because this call is made from + // within the proxy's receive, the recipient sees the proxy as `from`. + const actor = tp.env.get().proc(to_name); + try actor.send_raw(tp.message{ .buf = payload }); + } else if (try m.match(.{ "deliver_pid", tp.extract(&handle_int), tp.extract_cbor(&payload) })) { + // Deliver to a local actor identified by its handle pointer. + // The recipient sees the proxy as `from`. + const actor: tp.pid_ref = .{ .h = @ptrFromInt(@as(usize, @intCast(handle_int))) }; + try actor.send_raw(tp.message{ .buf = payload }); + } else { + // Forward all other messages to the endpoint for wire transmission. + // We cache one owned clone per sender (keyed by stable instance_id) + // so the endpoint can safely clone from the stored heap handle later. + const actor_id = from.instance_id(); + const result = try self.senders.getOrPut(actor_id); + if (!result.found_existing) result.value_ptr.* = from.clone(); + const stored = result.value_ptr.*; + try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(stored.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } }); } - - // Forward all other messages to the endpoint for wire transmission. - // Pass the sender's handle pointer as an opaque u64 so the endpoint - // can assign (or look up) a stable outbound wire ID for this actor. - try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(from.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } }); } }; diff --git a/test/remote_child_endpoint.zig b/test/remote_child_endpoint.zig index 4f38120..379136e 100644 --- a/test/remote_child_endpoint.zig +++ b/test/remote_child_endpoint.zig @@ -1,10 +1,17 @@ /// Child binary for the endpoint test. /// -/// Runs a full Thespian context with two actors: -/// - EchoActor: registered as "echo" in env; forwards any received message -/// back through the endpoint to "test_receiver" on the parent side. -/// - StdioEndpoint: reads framed CBOR from stdin, dispatches send_named -/// messages to local actors via env, and writes outbound frames to stdout. +/// Runs a full Thespian context with actors registered in a StdioEndpoint: +/// +/// "echo" (wire ID 2): receives {from_id, payload}, replies via send_named +/// to "test_receiver" on the parent side. +/// +/// "echo_id" (wire ID 3): receives {from_id, payload}. +/// - If from_id == 0: trigger mode — sends send_named to "test_receiver" +/// so the parent can establish a proxy and reply by ID. +/// - If from_id != 0: reply mode — sends wire send to from_id with ["done"]. +/// +/// StdioEndpoint dispatches inbound wire messages to actors, passing +/// {from_id, payload} so actors can reply by ID when needed. const std = @import("std"); const tp = @import("thespian"); const cbor = @import("cbor"); @@ -12,7 +19,7 @@ const framing = @import("framing"); const protocol = @import("protocol"); // --------------------------------------------------------------------------- -// EchoActor +// EchoActor (wire ID 2) // --------------------------------------------------------------------------- const EchoActor = struct { @@ -37,7 +44,6 @@ const EchoActor = struct { .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); - // Notify StdioEndpoint so it can register us (via `from`) in its env. try args.parent.send(.{"echo_ready"}); tp.receive(&self.receiver); } @@ -51,8 +57,73 @@ const EchoActor = struct { } fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { - // Reply to sender (the StdioEndpoint) with {"send", 2, "test_receiver", } - try from.send(.{ "send", @as(u64, 2), "test_receiver", protocol.RawCbor{ .bytes = m.buf } }); + var from_id: u64 = 0; + var payload: []const u8 = ""; + if (try m.match(.{ tp.extract(&from_id), cbor.extract_cbor(&payload) })) { + // Reply via send_named to "test_receiver" on the parent side. + try from.send(.{ "send", @as(u64, 2), "test_receiver", protocol.RawCbor{ .bytes = payload } }); + } else { + return tp.unexpected(m); + } + _ = self; + } +}; + +// --------------------------------------------------------------------------- +// EchoIdActor (wire ID 3) +// --------------------------------------------------------------------------- + +const EchoIdActor = struct { + allocator: std.mem.Allocator, + receiver: tp.Receiver(*@This()), + + const Args = struct { + allocator: std.mem.Allocator, + /// Owned pid of the StdioEndpoint to notify when ready. + parent: tp.pid, + }; + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + defer args.parent.deinit(); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + try args.parent.send(.{"echo_id_ready"}); + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { + var from_id: u64 = 0; + var payload: []const u8 = ""; + if (try m.match(.{ tp.extract(&from_id), cbor.extract_cbor(&payload) })) { + if (from_id == 0) { + // Trigger mode: send send_named so the parent can establish + // a proxy and will reply by ID. + try from.send(.{ "send", @as(u64, 3), "test_receiver", protocol.RawCbor{ .bytes = payload } }); + } else { + // Reply mode: send ["done"] back to the parent actor by wire ID. + var done_buf: [8]u8 = undefined; + const done_msg = try tp.message.fmtbuf(&done_buf, .{"done"}); + try from.send(.{ "send", @as(u64, 3), from_id, protocol.RawCbor{ .bytes = done_msg.buf } }); + } + } else { + return tp.unexpected(m); + } _ = self; } }; @@ -66,6 +137,10 @@ const StdioEndpoint = struct { fd_stdin: tp.file_descriptor, accumulator: framing.Accumulator, read_buf: [4096]u8, + /// Maps wire ID → local actor pid for routing inbound send-by-ID. + wire_ids: std.AutoHashMap(u64, tp.pid), + /// Number of child actors that have reported ready; arm stdin when all ready. + ready_count: u8, receiver: tp.Receiver(*@This()), const Args = struct { allocator: std.mem.Allocator }; @@ -75,6 +150,8 @@ const StdioEndpoint = struct { } fn init(args: Args) !void { + tp.trace_to_json_file("remote_child_endpoint_trace.json"); + tp.env.get().enable_all_channels(); const fd_stdin = try tp.file_descriptor.init("stdin", 0); const self = try args.allocator.create(@This()); self.* = .{ @@ -82,23 +159,29 @@ const StdioEndpoint = struct { .fd_stdin = fd_stdin, .accumulator = .{}, .read_buf = undefined, + .wire_ids = std.AutoHashMap(u64, tp.pid).init(args.allocator), + .ready_count = 0, .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); - // Spawn echo actor linked to this endpoint. Pass our pid so EchoActor - // can notify us when "echo" is registered in the env. We arm wait_read - // only after that notification to avoid a race where read_ready fires - // before EchoActor has finished registering. _ = try tp.spawn_link(args.allocator, EchoActor.Args{ .allocator = args.allocator, .parent = tp.self_pid().clone(), }, EchoActor.start, "echo"); + _ = try tp.spawn_link(args.allocator, EchoIdActor.Args{ + .allocator = args.allocator, + .parent = tp.self_pid().clone(), + }, EchoIdActor.start, "echo_id"); + tp.receive(&self.receiver); } fn deinit(self: *@This()) void { + var it = self.wire_ids.valueIterator(); + while (it.next()) |p| p.deinit(); + self.wire_ids.deinit(); self.fd_stdin.deinit(); self.allocator.destroy(self); } @@ -109,19 +192,27 @@ const StdioEndpoint = struct { fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { var from_id: u64 = 0; + var to_id_wire: u64 = 0; var to_name: []const u8 = ""; var payload: []const u8 = ""; if (try m.match(.{"echo_ready"})) { - // EchoActor is ready; register its pid in our env so dispatch_frame - // can route to it, then arm stdin reads. tp.env.get().proc_set("echo", from); - try self.fd_stdin.wait_read(); + try self.wire_ids.put(2, from.clone()); + self.ready_count += 1; + if (self.ready_count == 2) try self.fd_stdin.wait_read(); + } else if (try m.match(.{"echo_id_ready"})) { + tp.env.get().proc_set("echo_id", from); + try self.wire_ids.put(3, from.clone()); + self.ready_count += 1; + if (self.ready_count == 2) try self.fd_stdin.wait_read(); } else if (try m.match(.{ "fd", "stdin", "read_ready" })) { try self.dispatch_stdin(); try self.fd_stdin.wait_read(); } else if (try m.match(.{ "fd", "stdin", "read_error", tp.any, tp.any })) { return tp.exit("stdin_closed"); + } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_id_wire), cbor.extract_cbor(&payload) })) { + try self.send_wire_by_id(from_id, to_id_wire, payload); } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) { try self.send_wire(from_id, to_name, payload); } else { @@ -143,20 +234,33 @@ const StdioEndpoint = struct { fn dispatch_frame(self: *@This(), frame: []const u8) !void { const msg = try protocol.decode(frame); switch (msg) { - .send => |s| { - _ = s; - _ = self; - return tp.exit_error(error.UnexpectedSend, null); - }, .send_named => |s| { const actor = tp.env.get().proc(s.to_name); - try actor.send_raw(tp.message{ .buf = s.payload }); - _ = self; + // Include from_id so the actor can reply by ID if needed. + try actor.send(.{ s.from_id, protocol.RawCbor{ .bytes = s.payload } }); + }, + .send => |s| { + // Route by wire ID. + if (self.wire_ids.get(s.to_id)) |actor| + try actor.send(.{ s.from_id, protocol.RawCbor{ .bytes = s.payload } }) + else + return tp.exit_error(error.UnknownWireId, null); }, .link, .exit, .proxy_id, .transport_error => return tp.exit_error(error.UnexpectedMessage, null), } } + fn send_wire_by_id(_: *@This(), from_id: u64, to_id: u64, payload: []const u8) !void { + var msg_buf: [framing.max_frame_size]u8 = undefined; + var msg_stream: std.Io.Writer = .fixed(&msg_buf); + try protocol.encode_send(&msg_stream, from_id, to_id, payload); + + var frame_buf: [framing.max_frame_size + 4]u8 = undefined; + var frame_stream: std.Io.Writer = .fixed(&frame_buf); + try framing.write_frame(&frame_stream, msg_stream.buffered()); + try std.fs.File.stdout().writeAll(frame_stream.buffered()); + } + fn send_wire(_: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void { var msg_buf: [framing.max_frame_size]u8 = undefined; var msg_stream: std.Io.Writer = .fixed(&msg_buf); diff --git a/test/remote_endpoint_id_test.zig b/test/remote_endpoint_id_test.zig new file mode 100644 index 0000000..5a6dd5a --- /dev/null +++ b/test/remote_endpoint_id_test.zig @@ -0,0 +1,118 @@ +/// Tests inbound proxy table, from-substitution, outbound ID table, and +/// inbound send-by-ID routing in a single three-step round-trip: +/// +/// 1. Parent sends send_named to "echo_id" (from_id=0, trigger mode). +/// 2. Child echo_id sends send_named back (from_id=3) → proxy created, +/// delivered FROM proxy to TestActor (from-substitution). +/// 3. TestActor replies to proxy via from.send() → outbound ID assigned, +/// wire send reaches child with from_id=wire_id_TA, to_id=3. +/// 4. Child echo_id replies via wire send to wire_id_TA → parent routes +/// via local_actors to TestActor (inbound send-by-ID). +/// 5. TestActor receives ["done"] → exits "success". +const std = @import("std"); +const thespian = @import("thespian"); +const endpoint = @import("endpoint"); +const build_options = @import("build_options"); + +const Allocator = std.mem.Allocator; +const result = thespian.result; +const unexpected = thespian.unexpected; +const pid_ref = thespian.pid_ref; +const Receiver = thespian.Receiver; +const message = thespian.message; + +const TestActor = struct { + allocator: Allocator, + ep: thespian.pid, + receiver: Receiver(*@This()), + state: enum { waiting_for_hello, waiting_for_done }, + + const Args = struct { allocator: Allocator }; + + fn start(args: Args) result { + return init(args) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + thespian.trace_to_json_file("remote_endpoint_id_trace.json"); + thespian.env.get().enable_all_channels(); + thespian.env.get().proc_set("test_receiver", thespian.self_pid().ref()); + + const argv = try args.allocator.dupe(u8, message.fmt(.{build_options.remote_child_endpoint_path}).buf); + const ep = try thespian.spawn_link( + args.allocator, + endpoint.Args{ + .allocator = args.allocator, + .argv = argv, + }, + endpoint.start, + "endpoint", + ); + + // Trigger child echo_id in trigger mode (from_id=0). + try ep.send(.{ "send", @as(u64, 0), "echo_id", .{"hello"} }); + + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .ep = ep, + .receiver = .init(receive_fn, deinit, self), + .state = .waiting_for_hello, + }; + errdefer self.deinit(); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.ep.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: pid_ref, m: message) result { + return self.receive(from, m) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: pid_ref, m: message) !void { + switch (self.state) { + .waiting_for_hello => { + if (try m.match(.{"hello"})) { + // Reply through the proxy — this populates the outbound ID table + // and sends a wire send to the child with our assigned wire ID. + try from.send(.{"hello"}); + self.state = .waiting_for_done; + } else return unexpected(m); + }, + .waiting_for_done => { + if (try m.match(.{"done"})) { + // Arrived via inbound send-by-ID routing through local_actors. + return thespian.exit("success"); + } else return unexpected(m); + }, + } + } +}; + +test "remote: inbound proxy table, from-substitution, outbound ID table, and send-by-ID routing" { + const allocator = std.testing.allocator; + var ctx = try thespian.context.init(allocator); + defer ctx.deinit(); + + var success = false; + var exit_handler = thespian.make_exit_handler(&success, struct { + fn handle(ok: *bool, status: []const u8) void { + ok.* = std.mem.eql(u8, status, "success"); + } + }.handle); + + _ = try ctx.spawn_link( + TestActor.Args{ .allocator = allocator }, + TestActor.start, + "test_actor", + &exit_handler, + null, + ); + + ctx.run(); + + if (!success) return error.TestFailed; +} diff --git a/test/tests.zig b/test/tests.zig index ceb3273..e7543a8 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -5,6 +5,7 @@ pub const ip_tcp_client_server = @import("ip_tcp_client_server.zig"); pub const remote_poc = @import("remote_poc_test.zig"); pub const remote_roundtrip = @import("remote_roundtrip_test.zig"); pub const remote_endpoint = @import("remote_endpoint_test.zig"); +pub const remote_endpoint_id = @import("remote_endpoint_id_test.zig"); test { std.testing.refAllDecls(@This());