diff --git a/build.zig b/build.zig index 96fb563..5df6da1 100644 --- a/build.zig +++ b/build.zig @@ -86,6 +86,30 @@ pub fn build(b: *std.Build) void { }); const cbor_mod = cbor_dep.module("cbor"); + const framing_mod = b.createModule(.{ + .root_source_file = b.path("src/remote/framing.zig"), + }); + + const protocol_mod = b.createModule(.{ + .root_source_file = b.path("src/remote/protocol.zig"), + .imports = &.{ + .{ .name = "cbor", .module = cbor_mod }, + }, + }); + + const remote_child = b.addExecutable(.{ + .name = "remote_child_send", + .root_module = b.createModule(.{ + .root_source_file = b.path("test/remote_child_send.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "cbor", .module = cbor_mod }, + .{ .name = "framing", .module = framing_mod }, + }, + }), + }); + const thespian_mod = b.addModule("thespian", .{ .root_source_file = b.path("src/thespian.zig"), .imports = &.{ @@ -103,9 +127,13 @@ pub fn build(b: *std.Build) void { }), }); + options.addOptionPath("remote_child_path", remote_child.getEmittedBin()); + tests.root_module.addImport("build_options", options_mod); tests.root_module.addImport("cbor", cbor_mod); tests.root_module.addImport("thespian", thespian_mod); + tests.root_module.addImport("framing", framing_mod); + tests.root_module.addImport("protocol", protocol_mod); tests.addIncludePath(b.path("test")); tests.addIncludePath(b.path("src")); tests.addIncludePath(b.path("include")); diff --git a/src/remote/framing.zig b/src/remote/framing.zig new file mode 100644 index 0000000..674db38 --- /dev/null +++ b/src/remote/framing.zig @@ -0,0 +1,36 @@ +const std = @import("std"); + +pub const max_frame_size = 8 * 4096; // matches thespian.max_message_size + +/// Write a length-prefixed frame. writer must support writeAll. +pub fn write_frame(writer: anytype, payload: []const u8) !void { + var len_buf: [4]u8 = undefined; + std.mem.writeInt(u32, &len_buf, @intCast(payload.len), .big); + try writer.writeAll(&len_buf); + try writer.writeAll(payload); +} + +/// Accumulates bytes until a complete length-prefixed frame is available. +/// Returns a slice into the internal buffer on completion; null if more bytes +/// are needed. The returned slice is only valid until the next call to feed(). +pub const Accumulator = struct { + buf: [max_frame_size + 4]u8 = undefined, + pos: usize = 0, + + pub fn feed(self: *Accumulator, bytes: []const u8) ?[]const u8 { + const to_copy = @min(bytes.len, self.buf.len - self.pos); + @memcpy(self.buf[self.pos..][0..to_copy], bytes[0..to_copy]); + self.pos += to_copy; + + if (self.pos < 4) return null; + + const payload_len = std.mem.readInt(u32, self.buf[0..4], .big); + if (payload_len > max_frame_size) return null; + + const total = 4 + payload_len; + if (self.pos < total) return null; + + self.pos = 0; + return self.buf[4..total]; + } +}; diff --git a/src/remote/protocol.zig b/src/remote/protocol.zig new file mode 100644 index 0000000..27f5844 --- /dev/null +++ b/src/remote/protocol.zig @@ -0,0 +1,24 @@ +const cbor = @import("cbor"); + +pub const WireMessage = union(enum) { + send_named: SendNamed, + + pub const SendNamed = struct { + from_id: u64, + to_name: []const u8, + payload: []const u8, // raw CBOR bytes + }; +}; + +/// Decode a raw CBOR frame into a WireMessage. +/// Returned slices point into the frame buffer and are only valid while it lives. +pub fn decode(frame: []const u8) !WireMessage { + var from_id: u64 = 0; + var to_name: []const u8 = ""; + var payload: []const u8 = ""; + + if (try cbor.match(frame, .{ "send_named", cbor.extract(&from_id), cbor.extract(&to_name), cbor.extract_cbor(&payload) })) + return .{ .send_named = .{ .from_id = from_id, .to_name = to_name, .payload = payload } }; + + return error.UnknownMessageType; +} diff --git a/test/remote_child_send.zig b/test/remote_child_send.zig new file mode 100644 index 0000000..450c8bb --- /dev/null +++ b/test/remote_child_send.zig @@ -0,0 +1,13 @@ +/// Child process for the remoting POC test. +/// Writes one framed CBOR send_named message to stdout and exits. +const std = @import("std"); +const cbor = @import("cbor"); +const framing = @import("framing"); + +pub fn main() !void { + var msg_buf: [256]u8 = undefined; + var stream: std.Io.Writer = .fixed(&msg_buf); + try cbor.writeValue(&stream, .{ "send_named", @as(u64, 1), "test_actor", .{ "hello", "from_child" } }); + const payload = stream.buffered(); + try framing.write_frame(std.fs.File.stdout(), payload); +} diff --git a/test/remote_poc_test.zig b/test/remote_poc_test.zig new file mode 100644 index 0000000..a49d880 --- /dev/null +++ b/test/remote_poc_test.zig @@ -0,0 +1,110 @@ +const std = @import("std"); +const thespian = @import("thespian"); +const cbor = @import("cbor"); +const framing = @import("framing"); +const protocol = @import("protocol"); +const build_options = @import("build_options"); + +const Allocator = std.mem.Allocator; +const result = thespian.result; +const unexpected = thespian.unexpected; +const pid_ref = thespian.pid_ref; +const Receiver = thespian.Receiver; +const message = thespian.message; +const extract = thespian.extract; +const subprocess = thespian.subprocess; + +const tag = "subprocess"; + +const Parent = struct { + allocator: Allocator, + proc: subprocess, + accumulator: framing.Accumulator, + receiver: Receiver(*@This()), + frame_received: bool = false, + + const Args = struct { allocator: Allocator }; + + fn start(args: Args) result { + return init(args) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const proc = try subprocess.init( + args.allocator, + message.fmt(.{build_options.remote_child_path}), + tag, + .Ignore, + ); + const self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .proc = proc, + .accumulator = .{}, + .receiver = .init(receive_fn, deinit, self), + }; + errdefer self.deinit(); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.proc.deinit(); + self.allocator.destroy(self); + } + + fn receive_fn(self: *@This(), from: pid_ref, m: message) result { + return self.receive(from, m) catch |e| return thespian.exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: pid_ref, m: message) !void { + var bytes: []const u8 = ""; + var exit_code: i64 = 0; + + if (try m.match(.{ tag, "stdout", extract(&bytes) })) { + if (self.accumulator.feed(bytes)) |frame| { + const msg = try protocol.decode(frame); + switch (msg) { + .send_named => |s| { + try std.testing.expectEqual(@as(u64, 1), s.from_id); + try std.testing.expectEqualStrings("test_actor", s.to_name); + try std.testing.expect(try cbor.match(s.payload, .{ "hello", "from_child" })); + }, + } + self.frame_received = true; + } + } else if (try m.match(.{ tag, "term", "exited", extract(&exit_code) })) { + try std.testing.expect(self.frame_received); + try std.testing.expectEqual(@as(i64, 0), exit_code); + return thespian.exit("success"); + } else if (try m.match(.{ tag, "stderr", extract(&bytes) })) { + // ignore stderr from child + } else { + return unexpected(m); + } + } +}; + +test "remote: child process sends a message to parent" { + const allocator = std.testing.allocator; + var ctx = try thespian.context.init(allocator); + defer ctx.deinit(); + + var success = false; + var exit_handler = thespian.make_exit_handler(&success, struct { + fn handle(ok: *bool, status: []const u8) void { + ok.* = std.mem.eql(u8, status, "success"); + } + }.handle); + + _ = try ctx.spawn_link( + Parent.Args{ .allocator = allocator }, + Parent.start, + "remote_poc", + &exit_handler, + null, + ); + + ctx.run(); + + if (!success) return error.TestFailed; +} diff --git a/test/tests.zig b/test/tests.zig index 81af45b..6331354 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -2,6 +2,7 @@ const std = @import("std"); pub const cpp = @import("tests_cpp.zig"); pub const thespian = @import("tests_thespian.zig"); pub const ip_tcp_client_server = @import("ip_tcp_client_server.zig"); +pub const remote_poc = @import("remote_poc_test.zig"); test { std.testing.refAllDecls(@This());