feat(remote): implement outbound ID table for local actor identity on the wire

- Proxy passes sender's handle pointer as from_id instead of 0
- Endpoint assigns stable wire IDs to local actors on first send, keyed by handle pointer
- Reverse table (wire_id → pid) enables inbound send routing to local actors by ID
- Full bidirectional send-by-ID is now functional end-to-end
This commit is contained in:
CJ van den Berg 2026-03-07 17:53:07 +01:00
parent dfb377ab6e
commit 685710c45b
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9
2 changed files with 40 additions and 11 deletions

View file

@ -31,6 +31,11 @@ const Endpoint = struct {
accumulator: framing.Accumulator,
/// Inbound proxy table: remote actor ID local proxy pid.
proxies: std.AutoHashMap(u64, tp.pid),
/// Outbound table: local actor handle pointer assigned wire ID.
outbound: std.AutoHashMap(usize, u64),
/// Local actor table: wire ID local actor pid (reverse of outbound).
local_actors: std.AutoHashMap(u64, tp.pid),
next_id: u64,
receiver: tp.Receiver(*@This()),
fn start(args: Args) tp.result {
@ -46,6 +51,9 @@ const Endpoint = struct {
.proc = proc,
.accumulator = .{},
.proxies = std.AutoHashMap(u64, tp.pid).init(args.allocator),
.outbound = std.AutoHashMap(usize, u64).init(args.allocator),
.local_actors = std.AutoHashMap(u64, tp.pid).init(args.allocator),
.next_id = 1,
.receiver = .init(receive_fn, deinit, self),
};
errdefer self.deinit();
@ -54,12 +62,16 @@ const Endpoint = struct {
fn deinit(self: *@This()) void {
// Exit all live proxies (best-effort; endpoint may already be shutting down).
var it = self.proxies.valueIterator();
while (it.next()) |p| {
var proxy_it = self.proxies.valueIterator();
while (proxy_it.next()) |p| {
p.send(.{ "exit", "transport_closed" }) catch {};
p.deinit();
}
self.proxies.deinit();
var actor_it = self.local_actors.valueIterator();
while (actor_it.next()) |p| p.deinit();
self.local_actors.deinit();
self.outbound.deinit();
self.proc.deinit();
self.allocator.destroy(self);
}
@ -81,7 +93,9 @@ const Endpoint = struct {
} else if (try m.match(.{ proc_tag, "stderr", tp.any })) {
} else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_id), cbor.extract_cbor(&payload) })) {
// Outbound send by remote ID from a local proxy.
try self.send_wire_by_id(from_id, to_id, payload);
// from_id carries the sender's handle pointer; resolve to wire ID.
const wire_from_id = try self.get_or_assign_outbound_id(from_id);
try self.send_wire_by_id(wire_from_id, to_id, payload);
} else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) {
// Outbound send_named from a local actor addressing a well-known remote actor by name.
try self.send_wire_named(from_id, to_name, payload);
@ -95,6 +109,20 @@ const Endpoint = struct {
}
}
/// Look up or assign a stable wire ID for a local actor identified by its
/// handle pointer. Also records the pid so inbound sends can be routed back.
fn get_or_assign_outbound_id(self: *@This(), handle_int: u64) !u64 {
const key: usize = @intCast(handle_int);
if (self.outbound.get(key)) |id| return id;
const id = self.next_id;
self.next_id += 1;
// Reconstruct a pid_ref from the handle pointer and clone it for storage.
const pid_ref: tp.pid_ref = .{ .h = @ptrFromInt(key) };
try self.local_actors.put(id, pid_ref.clone());
try self.outbound.put(key, id);
return id;
}
/// Get the local proxy for remote_id, creating one if it doesn't exist yet.
fn get_or_create_proxy(self: *@This(), remote_id: u64) !tp.pid_ref {
if (self.proxies.getPtr(remote_id)) |p| return p.ref();
@ -113,11 +141,11 @@ const Endpoint = struct {
.send => |s| {
// Ensure a local proxy exists for the sender.
_ = try self.get_or_create_proxy(s.from_id);
// Deliver payload to the local actor identified by to_id.
// Requires the outbound ID table (not yet implemented); error for now.
_ = s.to_id;
_ = s.payload;
return tp.exit_error(error.OutboundTableNotImplemented, null);
// Route to the local actor that owns this wire ID.
if (self.local_actors.get(s.to_id)) |actor|
try actor.send_raw(tp.message{ .buf = s.payload })
else
return tp.exit_error(error.UnknownLocalActor, null);
},
.send_named => |s| {
// Ensure a local proxy exists for the sender so it can receive replies.

View file

@ -54,7 +54,7 @@ const Proxy = struct {
return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace());
}
fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void {
fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void {
var reason: []const u8 = "";
if (try m.match(.{ "exit", tp.extract(&reason) })) {
@ -66,8 +66,9 @@ const Proxy = struct {
}
// Forward all other messages to the endpoint for wire transmission.
// from_id = 0 until outbound ID tracking is implemented.
try self.endpoint.send(.{ "send", @as(u64, 0), self.remote_id, protocol.RawCbor{ .bytes = m.buf } });
// Pass the sender's handle pointer as an opaque u64 so the endpoint
// can assign (or look up) a stable outbound wire ID for this actor.
try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(from.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } });
}
};