diff --git a/build.zig b/build.zig index 8128418..eed5335 100644 --- a/build.zig +++ b/build.zig @@ -107,6 +107,14 @@ pub fn build(b: *std.Build) void { }, }); + const proxy_mod = b.createModule(.{ + .root_source_file = b.path("src/remote/proxy.zig"), + .imports = &.{ + .{ .name = "thespian", .module = thespian_mod }, + .{ .name = "protocol", .module = protocol_mod }, + }, + }); + const endpoint_mod = b.createModule(.{ .root_source_file = b.path("src/remote/endpoint.zig"), .imports = &.{ @@ -114,6 +122,7 @@ pub fn build(b: *std.Build) void { .{ .name = "cbor", .module = cbor_mod }, .{ .name = "framing", .module = framing_mod }, .{ .name = "protocol", .module = protocol_mod }, + .{ .name = "proxy", .module = proxy_mod }, }, }); diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 7bb36e8..af5dc60 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -10,6 +10,7 @@ const tp = @import("thespian"); const cbor = @import("cbor"); const framing = @import("framing"); const protocol = @import("protocol"); +const proxy = @import("proxy"); const subprocess = tp.subprocess; @@ -58,13 +59,20 @@ const Endpoint = struct { fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { var bytes: []const u8 = ""; var from_id: u64 = 0; + var to_id: u64 = 0; var to_name: []const u8 = ""; var payload: []const u8 = ""; if (try m.match(.{ proc_tag, "stdout", tp.extract(&bytes) })) { if (self.accumulator.feed(bytes)) |frame| try self.dispatch_inbound(frame); } else if (try m.match(.{ proc_tag, "stderr", tp.any })) { + } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_id), cbor.extract_cbor(&payload) })) { + // Outbound send by remote ID — from a local proxy. + try self.send_wire_by_id(from_id, to_id, payload); } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) { - try self.send_wire(from_id, to_name, payload); + // Outbound send_named — from a local actor addressing a well-known remote actor by name. + try self.send_wire_named(from_id, to_name, payload); + } else if (try m.match(.{ "proxy_exit", tp.any, tp.any })) { + // A local proxy has exited; remove it from the proxy table (stub until table is implemented). } else if (try m.match(.{ proc_tag, "term", "exited", tp.any })) { return tp.exit("transport_closed"); } else if (try m.match(.{ proc_tag, "term", tp.any, tp.any })) { @@ -89,7 +97,18 @@ const Endpoint = struct { } } - fn send_wire(self: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void { + fn send_wire_by_id(self: *@This(), from_id: u64, to_id: u64, payload: []const u8) !void { + var msg_buf: [framing.max_frame_size]u8 = undefined; + var msg_stream: std.Io.Writer = .fixed(&msg_buf); + try protocol.encode_send(&msg_stream, from_id, to_id, payload); + + var frame_buf: [framing.max_frame_size + 4]u8 = undefined; + var frame_stream: std.Io.Writer = .fixed(&frame_buf); + try framing.write_frame(&frame_stream, msg_stream.buffered()); + try self.proc.send(frame_stream.buffered()); + } + + fn send_wire_named(self: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void { var msg_buf: [framing.max_frame_size]u8 = undefined; var msg_stream: std.Io.Writer = .fixed(&msg_buf); try protocol.encode_send_named(&msg_stream, from_id, to_name, payload); diff --git a/src/remote/proxy.zig b/src/remote/proxy.zig new file mode 100644 index 0000000..b0b1435 --- /dev/null +++ b/src/remote/proxy.zig @@ -0,0 +1,74 @@ +/// Remote actor proxy. +/// +/// Represents a single remote actor within the local system. Local actors +/// hold a pid to a proxy and send messages to it; the proxy forwards them +/// to the endpoint for wire transmission. +/// +/// When the transport collapses or the remote actor exits, the endpoint +/// sends {"exit", reason} to the proxy. Because the proxy traps exits, this +/// arrives as a normal message, and the proxy propagates it by exiting with +/// the same reason, which in turn signals any local actors linked to it. +const std = @import("std"); +const tp = @import("thespian"); +const protocol = @import("protocol"); + +pub const Args = struct { + allocator: std.mem.Allocator, + /// Owned pid of the local endpoint actor. + endpoint: tp.pid, + /// Opaque remote actor ID assigned by the remote system. + remote_id: u64, +}; + +const Proxy = struct { + allocator: std.mem.Allocator, + endpoint: tp.pid, + remote_id: u64, + receiver: tp.Receiver(*@This()), + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .endpoint = args.endpoint, + .remote_id = args.remote_id, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + // Trap exit signals so they arrive as ["exit", reason] messages, + // allowing us to propagate them to linked local actors. + _ = tp.set_trap(true); + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.endpoint.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { + var reason: []const u8 = ""; + + if (try m.match(.{ "exit", tp.extract(&reason) })) { + // Notify the endpoint so it can remove us from the proxy table. + // Silently ignore failure — the endpoint may already be dead if + // this exit came from transport collapse. + self.endpoint.send(.{ "proxy_exit", self.remote_id, reason }) catch {}; + return tp.exit(reason); + } + + // Forward all other messages to the endpoint for wire transmission. + // from_id = 0 until outbound ID tracking is implemented. + try self.endpoint.send(.{ "send", @as(u64, 0), self.remote_id, protocol.RawCbor{ .bytes = m.buf } }); + } +}; + +pub const start = Proxy.start;