From 685710c45be26c312574b828419b5baa3588b446 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Sat, 7 Mar 2026 17:53:07 +0100 Subject: [PATCH] feat(remote): implement outbound ID table for local actor identity on the wire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Proxy passes sender's handle pointer as from_id instead of 0 - Endpoint assigns stable wire IDs to local actors on first send, keyed by handle pointer - Reverse table (wire_id → pid) enables inbound send routing to local actors by ID - Full bidirectional send-by-ID is now functional end-to-end --- src/remote/endpoint.zig | 44 +++++++++++++++++++++++++++++++++-------- src/remote/proxy.zig | 7 ++++--- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 9bb882e..0aa2e66 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -31,6 +31,11 @@ const Endpoint = struct { accumulator: framing.Accumulator, /// Inbound proxy table: remote actor ID → local proxy pid. proxies: std.AutoHashMap(u64, tp.pid), + /// Outbound table: local actor handle pointer → assigned wire ID. + outbound: std.AutoHashMap(usize, u64), + /// Local actor table: wire ID → local actor pid (reverse of outbound). + local_actors: std.AutoHashMap(u64, tp.pid), + next_id: u64, receiver: tp.Receiver(*@This()), fn start(args: Args) tp.result { @@ -46,6 +51,9 @@ const Endpoint = struct { .proc = proc, .accumulator = .{}, .proxies = std.AutoHashMap(u64, tp.pid).init(args.allocator), + .outbound = std.AutoHashMap(usize, u64).init(args.allocator), + .local_actors = std.AutoHashMap(u64, tp.pid).init(args.allocator), + .next_id = 1, .receiver = .init(receive_fn, deinit, self), }; errdefer self.deinit(); @@ -54,12 +62,16 @@ const Endpoint = struct { fn deinit(self: *@This()) void { // Exit all live proxies (best-effort; endpoint may already be shutting down). - var it = self.proxies.valueIterator(); - while (it.next()) |p| { + var proxy_it = self.proxies.valueIterator(); + while (proxy_it.next()) |p| { p.send(.{ "exit", "transport_closed" }) catch {}; p.deinit(); } self.proxies.deinit(); + var actor_it = self.local_actors.valueIterator(); + while (actor_it.next()) |p| p.deinit(); + self.local_actors.deinit(); + self.outbound.deinit(); self.proc.deinit(); self.allocator.destroy(self); } @@ -81,7 +93,9 @@ const Endpoint = struct { } else if (try m.match(.{ proc_tag, "stderr", tp.any })) { } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_id), cbor.extract_cbor(&payload) })) { // Outbound send by remote ID — from a local proxy. - try self.send_wire_by_id(from_id, to_id, payload); + // from_id carries the sender's handle pointer; resolve to wire ID. + const wire_from_id = try self.get_or_assign_outbound_id(from_id); + try self.send_wire_by_id(wire_from_id, to_id, payload); } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) { // Outbound send_named — from a local actor addressing a well-known remote actor by name. try self.send_wire_named(from_id, to_name, payload); @@ -95,6 +109,20 @@ const Endpoint = struct { } } + /// Look up or assign a stable wire ID for a local actor identified by its + /// handle pointer. Also records the pid so inbound sends can be routed back. + fn get_or_assign_outbound_id(self: *@This(), handle_int: u64) !u64 { + const key: usize = @intCast(handle_int); + if (self.outbound.get(key)) |id| return id; + const id = self.next_id; + self.next_id += 1; + // Reconstruct a pid_ref from the handle pointer and clone it for storage. + const pid_ref: tp.pid_ref = .{ .h = @ptrFromInt(key) }; + try self.local_actors.put(id, pid_ref.clone()); + try self.outbound.put(key, id); + return id; + } + /// Get the local proxy for remote_id, creating one if it doesn't exist yet. fn get_or_create_proxy(self: *@This(), remote_id: u64) !tp.pid_ref { if (self.proxies.getPtr(remote_id)) |p| return p.ref(); @@ -113,11 +141,11 @@ const Endpoint = struct { .send => |s| { // Ensure a local proxy exists for the sender. _ = try self.get_or_create_proxy(s.from_id); - // Deliver payload to the local actor identified by to_id. - // Requires the outbound ID table (not yet implemented); error for now. - _ = s.to_id; - _ = s.payload; - return tp.exit_error(error.OutboundTableNotImplemented, null); + // Route to the local actor that owns this wire ID. + if (self.local_actors.get(s.to_id)) |actor| + try actor.send_raw(tp.message{ .buf = s.payload }) + else + return tp.exit_error(error.UnknownLocalActor, null); }, .send_named => |s| { // Ensure a local proxy exists for the sender so it can receive replies. diff --git a/src/remote/proxy.zig b/src/remote/proxy.zig index b0b1435..bb4c4e4 100644 --- a/src/remote/proxy.zig +++ b/src/remote/proxy.zig @@ -54,7 +54,7 @@ const Proxy = struct { return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); } - fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { var reason: []const u8 = ""; if (try m.match(.{ "exit", tp.extract(&reason) })) { @@ -66,8 +66,9 @@ const Proxy = struct { } // Forward all other messages to the endpoint for wire transmission. - // from_id = 0 until outbound ID tracking is implemented. - try self.endpoint.send(.{ "send", @as(u64, 0), self.remote_id, protocol.RawCbor{ .bytes = m.buf } }); + // Pass the sender's handle pointer as an opaque u64 so the endpoint + // can assign (or look up) a stable outbound wire ID for this actor. + try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(from.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } }); } };