From 0e804fc61f215c848dd5284e0d9b3f4bc121c5a1 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Sat, 7 Mar 2026 14:46:18 +0100 Subject: [PATCH] WIP: add remote_endpoint_test --- build.zig | 51 +++++++-- src/remote/endpoint.zig | 99 +++++++++++++++++ src/remote/protocol.zig | 20 ++++ test/remote_child_endpoint.zig | 198 +++++++++++++++++++++++++++++++++ test/remote_endpoint_test.zig | 100 +++++++++++++++++ test/tests.zig | 1 + 6 files changed, 462 insertions(+), 7 deletions(-) create mode 100644 src/remote/endpoint.zig create mode 100644 test/remote_child_endpoint.zig create mode 100644 test/remote_endpoint_test.zig diff --git a/build.zig b/build.zig index bba029c..8128418 100644 --- a/build.zig +++ b/build.zig @@ -86,6 +86,16 @@ pub fn build(b: *std.Build) void { }); const cbor_mod = cbor_dep.module("cbor"); + // thespian_mod must come before anything that depends on it. + const thespian_mod = b.addModule("thespian", .{ + .root_source_file = b.path("src/thespian.zig"), + .imports = &.{ + .{ .name = "cbor", .module = cbor_mod }, + }, + }); + thespian_mod.addIncludePath(b.path("include")); + thespian_mod.linkLibrary(lib); + const framing_mod = b.createModule(.{ .root_source_file = b.path("src/remote/framing.zig"), }); @@ -97,6 +107,18 @@ pub fn build(b: *std.Build) void { }, }); + const endpoint_mod = b.createModule(.{ + .root_source_file = b.path("src/remote/endpoint.zig"), + .imports = &.{ + .{ .name = "thespian", .module = thespian_mod }, + .{ .name = "cbor", .module = cbor_mod }, + .{ .name = "framing", .module = framing_mod }, + .{ .name = "protocol", .module = protocol_mod }, + }, + }); + + // --- Child binaries (no Thespian context) --- + const remote_child = b.addExecutable(.{ .name = "remote_child_send", .root_module = b.createModule(.{ @@ -123,14 +145,27 @@ pub fn build(b: *std.Build) void { }), }); - const thespian_mod = b.addModule("thespian", .{ - .root_source_file = b.path("src/thespian.zig"), - .imports = &.{ - .{ .name = "cbor", .module = cbor_mod }, - }, + // --- Child endpoint binary (full Thespian context) --- + + const remote_child_endpoint = b.addExecutable(.{ + .name = "remote_child_endpoint", + .root_module = b.createModule(.{ + .root_source_file = b.path("test/remote_child_endpoint.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "thespian", .module = thespian_mod }, + .{ .name = "cbor", .module = cbor_mod }, + .{ .name = "framing", .module = framing_mod }, + .{ .name = "protocol", .module = protocol_mod }, + }, + }), }); - thespian_mod.addIncludePath(b.path("include")); - thespian_mod.linkLibrary(lib); + remote_child_endpoint.linkLibrary(lib); + remote_child_endpoint.linkLibrary(asio_dep.artifact("asio")); + remote_child_endpoint.linkLibCpp(); + + // --- Test suite --- const tests = b.addTest(.{ .root_module = b.createModule(.{ @@ -142,12 +177,14 @@ pub fn build(b: *std.Build) void { options.addOptionPath("remote_child_path", remote_child.getEmittedBin()); options.addOptionPath("remote_child_roundtrip_path", remote_child_roundtrip.getEmittedBin()); + options.addOptionPath("remote_child_endpoint_path", remote_child_endpoint.getEmittedBin()); tests.root_module.addImport("build_options", options_mod); tests.root_module.addImport("cbor", cbor_mod); tests.root_module.addImport("thespian", thespian_mod); tests.root_module.addImport("framing", framing_mod); tests.root_module.addImport("protocol", protocol_mod); + tests.root_module.addImport("endpoint", endpoint_mod); tests.addIncludePath(b.path("test")); tests.addIncludePath(b.path("src")); tests.addIncludePath(b.path("include")); diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig new file mode 100644 index 0000000..3f6c6ec --- /dev/null +++ b/src/remote/endpoint.zig @@ -0,0 +1,99 @@ +/// Parent-side endpoint actor. +/// +/// Wraps a subprocess, accumulates framed CBOR from its stdout, and dispatches +/// decoded send_named messages to named local actors via the env proc table. +/// +/// Local actors send outbound messages to the endpoint with: +/// {"send", from_id: u64, to_name: text, payload: raw_cbor} +const std = @import("std"); +const tp = @import("thespian"); +const cbor = @import("cbor"); +const framing = @import("framing"); +const protocol = @import("protocol"); + +const subprocess = tp.subprocess; + +const proc_tag = "endpoint_proc"; + +pub const Args = struct { + allocator: std.mem.Allocator, + /// CBOR-encoded argv array for the child binary. + /// Must be heap-allocated; endpoint.init will free it. + argv: []const u8, +}; + +const Endpoint = struct { + allocator: std.mem.Allocator, + proc: subprocess, + accumulator: framing.Accumulator, + receiver: tp.Receiver(*@This()), + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + defer args.allocator.free(args.argv); + const proc = try subprocess.init(args.allocator, tp.message{ .buf = args.argv }, proc_tag, .Pipe); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .proc = proc, + .accumulator = .{}, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.proc.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: tp.pid_ref, m: tp.message) !void { + var bytes: []const u8 = ""; + var from_id: u64 = 0; + var to_name: []const u8 = ""; + var payload: []const u8 = ""; + if (try m.match(.{ proc_tag, "stdout", tp.extract(&bytes) })) { + if (self.accumulator.feed(bytes)) |frame| try self.dispatch_inbound(frame); + } else if (try m.match(.{ proc_tag, "stderr", tp.any })) { + } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) { + try self.send_wire(from_id, to_name, payload); + } else if (try m.match(.{ proc_tag, "term", "exited", tp.any })) { + return tp.exit("transport_closed"); + } else if (try m.match(.{ proc_tag, "term", tp.any, tp.any })) { + return tp.exit("transport_closed"); + } else { + return tp.unexpected(m); + } + } + + fn dispatch_inbound(_: *@This(), frame: []const u8) !void { + const msg = try protocol.decode(frame); + switch (msg) { + .send_named => |s| { + const actor = tp.env.get().proc(s.to_name); + try actor.send_raw(tp.message{ .buf = s.payload }); + }, + } + } + + fn send_wire(self: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void { + var msg_buf: [framing.max_frame_size]u8 = undefined; + var msg_stream: std.Io.Writer = .fixed(&msg_buf); + try protocol.encode_send_named(&msg_stream, from_id, to_name, payload); + + var frame_buf: [framing.max_frame_size + 4]u8 = undefined; + var frame_stream: std.Io.Writer = .fixed(&frame_buf); + try framing.write_frame(&frame_stream, msg_stream.buffered()); + try self.proc.send(frame_stream.buffered()); + } +}; + +pub const start = Endpoint.start; diff --git a/src/remote/protocol.zig b/src/remote/protocol.zig index 27f5844..80b92a7 100644 --- a/src/remote/protocol.zig +++ b/src/remote/protocol.zig @@ -1,5 +1,25 @@ +const std = @import("std"); const cbor = @import("cbor"); +/// Wraps pre-encoded CBOR bytes so they can be embedded in a message.fmt() tuple +/// without being re-encoded as a string. +pub const RawCbor = struct { + bytes: []const u8, + pub fn cborEncode(self: @This(), writer: *std.Io.Writer) std.Io.Writer.Error!void { + _ = try writer.write(self.bytes); + } +}; + +/// Encode a send_named wire message into writer. +/// payload must already be CBOR-encoded bytes. +pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const u8, payload: []const u8) !void { + try cbor.writeArrayHeader(writer, 4); + try cbor.writeValue(writer, "send_named"); + try cbor.writeValue(writer, from_id); + try cbor.writeValue(writer, to_name); + _ = try writer.write(payload); +} + pub const WireMessage = union(enum) { send_named: SendNamed, diff --git a/test/remote_child_endpoint.zig b/test/remote_child_endpoint.zig new file mode 100644 index 0000000..d51f573 --- /dev/null +++ b/test/remote_child_endpoint.zig @@ -0,0 +1,198 @@ +/// Child binary for the endpoint test. +/// +/// Runs a full Thespian context with two actors: +/// - EchoActor: registered as "echo" in env; forwards any received message +/// back through the endpoint to "test_receiver" on the parent side. +/// - StdioEndpoint: reads framed CBOR from stdin, dispatches send_named +/// messages to local actors via env, and writes outbound frames to stdout. +const std = @import("std"); +const tp = @import("thespian"); +const cbor = @import("cbor"); +const framing = @import("framing"); +const protocol = @import("protocol"); + +// --------------------------------------------------------------------------- +// EchoActor +// --------------------------------------------------------------------------- + +const EchoActor = struct { + allocator: std.mem.Allocator, + receiver: tp.Receiver(*@This()), + + const Args = struct { + allocator: std.mem.Allocator, + /// Owned pid of the StdioEndpoint to notify when ready. + parent: tp.pid, + }; + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + defer args.parent.deinit(); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + // Notify StdioEndpoint so it can register us (via `from`) in its env. + try args.parent.send(.{"echo_ready"}); + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { + // Reply to sender (the StdioEndpoint) with {"send", 2, "test_receiver", } + try from.send(.{ "send", @as(u64, 2), "test_receiver", protocol.RawCbor{ .bytes = m.buf } }); + _ = self; + } +}; + +// --------------------------------------------------------------------------- +// StdioEndpoint +// --------------------------------------------------------------------------- + +const StdioEndpoint = struct { + allocator: std.mem.Allocator, + fd_stdin: tp.file_descriptor, + accumulator: framing.Accumulator, + read_buf: [4096]u8, + receiver: tp.Receiver(*@This()), + + const Args = struct { allocator: std.mem.Allocator }; + + fn start(args: Args) tp.result { + return init(args) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const fd_stdin = try tp.file_descriptor.init("stdin", 0); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .fd_stdin = fd_stdin, + .accumulator = .{}, + .read_buf = undefined, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + + // Spawn echo actor linked to this endpoint. Pass our pid so EchoActor + // can notify us when "echo" is registered in the env. We arm wait_read + // only after that notification to avoid a race where read_ready fires + // before EchoActor has finished registering. + _ = try tp.spawn_link(args.allocator, EchoActor.Args{ + .allocator = args.allocator, + .parent = tp.self_pid().clone(), + }, EchoActor.start, "echo"); + + tp.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.fd_stdin.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { + return self.receive(from, m) catch |e| return tp.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) !void { + var from_id: u64 = 0; + var to_name: []const u8 = ""; + var payload: []const u8 = ""; + + if (try m.match(.{"echo_ready"})) { + // EchoActor is ready; register its pid in our env so dispatch_frame + // can route to it, then arm stdin reads. + tp.env.get().proc_set("echo", from); + try self.fd_stdin.wait_read(); + } else if (try m.match(.{ "fd", "stdin", "read_ready" })) { + try self.dispatch_stdin(); + try self.fd_stdin.wait_read(); + } else if (try m.match(.{ "fd", "stdin", "read_error", tp.any, tp.any })) { + return tp.exit("stdin_closed"); + } else if (try m.match(.{ "send", tp.extract(&from_id), tp.extract(&to_name), cbor.extract_cbor(&payload) })) { + try self.send_wire(from_id, to_name, payload); + } else { + return tp.unexpected(m); + } + } + + fn dispatch_stdin(self: *@This()) !void { + const n = std.fs.File.stdin().read(&self.read_buf) catch |e| switch (e) { + error.WouldBlock => return, + else => return tp.exit_error(e, @errorReturnTrace()), + }; + if (n == 0) return tp.exit("stdin_closed"); + if (self.accumulator.feed(self.read_buf[0..n])) |frame| { + try self.dispatch_frame(frame); + } + } + + fn dispatch_frame(self: *@This(), frame: []const u8) !void { + const msg = try protocol.decode(frame); + switch (msg) { + .send_named => |s| { + const actor = tp.env.get().proc(s.to_name); + try actor.send_raw(tp.message{ .buf = s.payload }); + _ = self; + }, + } + } + + fn send_wire(_: *@This(), from_id: u64, to_name: []const u8, payload: []const u8) !void { + var msg_buf: [framing.max_frame_size]u8 = undefined; + var msg_stream: std.Io.Writer = .fixed(&msg_buf); + try protocol.encode_send_named(&msg_stream, from_id, to_name, payload); + + var frame_buf: [framing.max_frame_size + 4]u8 = undefined; + var frame_stream: std.Io.Writer = .fixed(&frame_buf); + try framing.write_frame(&frame_stream, msg_stream.buffered()); + try std.fs.File.stdout().writeAll(frame_stream.buffered()); + } +}; + +// --------------------------------------------------------------------------- +// main +// --------------------------------------------------------------------------- + +pub fn main() !void { + var gpa: std.heap.GeneralPurposeAllocator(.{}) = .{}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + + var ctx = try tp.context.init(allocator); + defer ctx.deinit(); + + var exit_ok = true; + var exit_handler = tp.make_exit_handler(&exit_ok, struct { + fn handle(ok: *bool, status: []const u8) void { + if (!std.mem.eql(u8, status, "stdin_closed") and + !std.mem.eql(u8, status, "normal")) + ok.* = false; + } + }.handle); + + _ = try ctx.spawn_link( + StdioEndpoint.Args{ .allocator = allocator }, + StdioEndpoint.start, + "stdio_endpoint", + &exit_handler, + null, + ); + + ctx.run(); + + std.process.exit(if (exit_ok) 0 else 1); +} diff --git a/test/remote_endpoint_test.zig b/test/remote_endpoint_test.zig new file mode 100644 index 0000000..bab076f --- /dev/null +++ b/test/remote_endpoint_test.zig @@ -0,0 +1,100 @@ +const std = @import("std"); +const thespian = @import("thespian"); +const cbor = @import("cbor"); +const framing = @import("framing"); +const protocol = @import("protocol"); +const endpoint = @import("endpoint"); +const build_options = @import("build_options"); + +const Allocator = std.mem.Allocator; +const result = thespian.result; +const unexpected = thespian.unexpected; +const pid_ref = thespian.pid_ref; +const Receiver = thespian.Receiver; +const message = thespian.message; +const extract = thespian.extract; + +/// Top-level test actor. Registers as "test_receiver", spawns the endpoint, +/// sends ["hello"] to the child's echo actor, and verifies the reply. +const TestActor = struct { + allocator: Allocator, + ep: thespian.pid, + receiver: Receiver(*@This()), + + const Args = struct { allocator: Allocator }; + + fn start(args: Args) result { + return init(args) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + // Register self so the endpoint can deliver the echo reply here. + thespian.env.get().proc_set("test_receiver", thespian.self_pid().ref()); + + const argv = try args.allocator.dupe(u8, message.fmt(.{build_options.remote_child_endpoint_path}).buf); + const ep = try thespian.spawn_link( + args.allocator, + endpoint.Args{ + .allocator = args.allocator, + .argv = argv, + }, + endpoint.start, + "endpoint", + ); + + // Send ["hello"] to the child's "echo" actor through the endpoint. + try ep.send(.{ "send", @as(u64, 1), "echo", .{"hello"} }); + + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .ep = ep, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.ep.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: pid_ref, m: message) result { + return self.receive(from, m) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn receive(_: *@This(), _: pid_ref, m: message) !void { + // The endpoint delivers the echo reply as the raw payload: ["hello"] + if (try m.match(.{"hello"})) { + return thespian.exit("success"); + } else { + return unexpected(m); + } + } +}; + +test "remote: endpoint delivers message cross-process and receives reply" { + const allocator = std.testing.allocator; + var ctx = try thespian.context.init(allocator); + defer ctx.deinit(); + + var success = false; + var exit_handler = thespian.make_exit_handler(&success, struct { + fn handle(ok: *bool, status: []const u8) void { + ok.* = std.mem.eql(u8, status, "success"); + } + }.handle); + + _ = try ctx.spawn_link( + TestActor.Args{ .allocator = allocator }, + TestActor.start, + "test_actor", + &exit_handler, + null, + ); + + ctx.run(); + + if (!success) return error.TestFailed; +} diff --git a/test/tests.zig b/test/tests.zig index 2c179f3..ceb3273 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -4,6 +4,7 @@ pub const thespian = @import("tests_thespian.zig"); pub const ip_tcp_client_server = @import("ip_tcp_client_server.zig"); pub const remote_poc = @import("remote_poc_test.zig"); pub const remote_roundtrip = @import("remote_roundtrip_test.zig"); +pub const remote_endpoint = @import("remote_endpoint_test.zig"); test { std.testing.refAllDecls(@This());