feat(remote): implement proxy from-substitution and outbound ID routing
Inbound wire messages are now delivered FROM the proxy representing the remote sender, so local actors see a replyable `from`. This enables full two-way communication across the wire: - endpoint: deliver send_named/send via proxy (deliver_named/deliver_pid) instead of sending raw; from_id=0 bypasses proxy for anonymous sends - proxy: handle deliver_named and deliver_pid to send from within actor scope (providing from-substitution); cache one owned pid clone per sender keyed by stable instance_id() to avoid use-after-free when forwarding reply handles to the endpoint asynchronously - test: add remote_endpoint_id_test covering the full inbound proxy table / from-substitution / outbound ID table / send-by-ID round-trip - test: extend remote_child_endpoint with echo_id actor and send_wire_by_id to support the new test
This commit is contained in:
parent
639999f37e
commit
3dcb9f0e2f
5 changed files with 290 additions and 36 deletions
|
|
@ -140,18 +140,25 @@ const Endpoint = struct {
|
|||
switch (msg) {
|
||||
.send => |s| {
|
||||
// Ensure a local proxy exists for the sender.
|
||||
_ = try self.get_or_create_proxy(s.from_id);
|
||||
// Route to the local actor that owns this wire ID.
|
||||
if (self.local_actors.get(s.to_id)) |actor|
|
||||
try actor.send_raw(tp.message{ .buf = s.payload })
|
||||
const prx = try self.get_or_create_proxy(s.from_id);
|
||||
// Route to the local actor that owns this wire ID, delivering
|
||||
// FROM the proxy so the recipient can reply naturally.
|
||||
if (self.local_actors.getPtr(s.to_id)) |actor_ptr|
|
||||
try prx.send(.{ "deliver_pid", @as(u64, @intFromPtr(actor_ptr.h)), protocol.RawCbor{ .bytes = s.payload } })
|
||||
else
|
||||
return tp.exit_error(error.UnknownLocalActor, null);
|
||||
},
|
||||
.send_named => |s| {
|
||||
// Ensure a local proxy exists for the sender so it can receive replies.
|
||||
_ = try self.get_or_create_proxy(s.from_id);
|
||||
const actor = tp.env.get().proc(s.to_name);
|
||||
try actor.send_raw(tp.message{ .buf = s.payload });
|
||||
if (s.from_id != 0) {
|
||||
// Deliver FROM the proxy so the recipient sees the proxy as
|
||||
// `from` and can reply by sending back through it.
|
||||
const prx = try self.get_or_create_proxy(s.from_id);
|
||||
try prx.send(.{ "deliver_named", s.to_name, protocol.RawCbor{ .bytes = s.payload } });
|
||||
} else {
|
||||
// No sender identity; deliver directly without from substitution.
|
||||
const actor = tp.env.get().proc(s.to_name);
|
||||
try actor.send_raw(tp.message{ .buf = s.payload });
|
||||
}
|
||||
},
|
||||
.exit => |e| {
|
||||
// Remote actor has exited; notify its local proxy.
|
||||
|
|
|
|||
|
|
@ -24,6 +24,9 @@ const Proxy = struct {
|
|||
allocator: std.mem.Allocator,
|
||||
endpoint: tp.pid,
|
||||
remote_id: u64,
|
||||
/// One cloned pid per sender, keyed by stable instance_id().
|
||||
/// Kept alive so the endpoint can safely clone from the stored handle.
|
||||
senders: std.AutoHashMap(usize, tp.pid),
|
||||
receiver: tp.Receiver(*@This()),
|
||||
|
||||
fn start(args: Args) tp.result {
|
||||
|
|
@ -36,6 +39,7 @@ const Proxy = struct {
|
|||
.allocator = args.allocator,
|
||||
.endpoint = args.endpoint,
|
||||
.remote_id = args.remote_id,
|
||||
.senders = std.AutoHashMap(usize, tp.pid).init(args.allocator),
|
||||
.receiver = .init(receive_fn, deinit, self),
|
||||
};
|
||||
errdefer self.deinit();
|
||||
|
|
@ -46,6 +50,9 @@ const Proxy = struct {
|
|||
}
|
||||
|
||||
fn deinit(self: *@This()) void {
|
||||
var it = self.senders.valueIterator();
|
||||
while (it.next()) |p| p.deinit();
|
||||
self.senders.deinit();
|
||||
self.endpoint.deinit();
|
||||
self.allocator.destroy(self);
|
||||
}
|
||||
|
|
@ -56,6 +63,9 @@ const Proxy = struct {
|
|||
|
||||
fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void {
|
||||
var reason: []const u8 = "";
|
||||
var to_name: []const u8 = "";
|
||||
var handle_int: u64 = 0;
|
||||
var payload: []const u8 = "";
|
||||
|
||||
if (try m.match(.{ "exit", tp.extract(&reason) })) {
|
||||
// Notify the endpoint so it can remove us from the proxy table.
|
||||
|
|
@ -63,12 +73,26 @@ const Proxy = struct {
|
|||
// this exit came from transport collapse.
|
||||
self.endpoint.send(.{ "proxy_exit", self.remote_id, reason }) catch {};
|
||||
return tp.exit(reason);
|
||||
} else if (try m.match(.{ "deliver_named", tp.extract(&to_name), tp.extract_cbor(&payload) })) {
|
||||
// Deliver to a named local actor. Because this call is made from
|
||||
// within the proxy's receive, the recipient sees the proxy as `from`.
|
||||
const actor = tp.env.get().proc(to_name);
|
||||
try actor.send_raw(tp.message{ .buf = payload });
|
||||
} else if (try m.match(.{ "deliver_pid", tp.extract(&handle_int), tp.extract_cbor(&payload) })) {
|
||||
// Deliver to a local actor identified by its handle pointer.
|
||||
// The recipient sees the proxy as `from`.
|
||||
const actor: tp.pid_ref = .{ .h = @ptrFromInt(@as(usize, @intCast(handle_int))) };
|
||||
try actor.send_raw(tp.message{ .buf = payload });
|
||||
} else {
|
||||
// Forward all other messages to the endpoint for wire transmission.
|
||||
// We cache one owned clone per sender (keyed by stable instance_id)
|
||||
// so the endpoint can safely clone from the stored heap handle later.
|
||||
const actor_id = from.instance_id();
|
||||
const result = try self.senders.getOrPut(actor_id);
|
||||
if (!result.found_existing) result.value_ptr.* = from.clone();
|
||||
const stored = result.value_ptr.*;
|
||||
try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(stored.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } });
|
||||
}
|
||||
|
||||
// Forward all other messages to the endpoint for wire transmission.
|
||||
// Pass the sender's handle pointer as an opaque u64 so the endpoint
|
||||
// can assign (or look up) a stable outbound wire ID for this actor.
|
||||
try self.endpoint.send(.{ "send", @as(u64, @intFromPtr(from.h)), self.remote_id, protocol.RawCbor{ .bytes = m.buf } });
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue