From cb37b9b82b06a6b5f54e4c98a06a7adb4226ba7a Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Sat, 7 Mar 2026 14:51:25 +0100 Subject: [PATCH] WIP: add remaining message types --- src/remote/endpoint.zig | 5 ++ src/remote/protocol.zig | 94 +++++++++++++++++++++++++++++++++- test/remote_child_endpoint.zig | 6 +++ test/remote_poc_test.zig | 1 + 4 files changed, 105 insertions(+), 1 deletion(-) diff --git a/src/remote/endpoint.zig b/src/remote/endpoint.zig index 3f6c6ec..7bb36e8 100644 --- a/src/remote/endpoint.zig +++ b/src/remote/endpoint.zig @@ -77,10 +77,15 @@ const Endpoint = struct { fn dispatch_inbound(_: *@This(), frame: []const u8) !void { const msg = try protocol.decode(frame); switch (msg) { + .send => |s| { + _ = s; + return tp.exit_error(error.UnexpectedSend, null); + }, .send_named => |s| { const actor = tp.env.get().proc(s.to_name); try actor.send_raw(tp.message{ .buf = s.payload }); }, + .link, .exit, .proxy_id, .transport_error => return tp.exit_error(error.UnexpectedMessage, null), } } diff --git a/src/remote/protocol.zig b/src/remote/protocol.zig index 80b92a7..bea8415 100644 --- a/src/remote/protocol.zig +++ b/src/remote/protocol.zig @@ -10,7 +10,17 @@ pub const RawCbor = struct { } }; -/// Encode a send_named wire message into writer. +/// Encode a `send` wire message into writer. +/// payload must already be CBOR-encoded bytes. +pub fn encode_send(writer: *std.Io.Writer, from_id: u64, to_id: u64, payload: []const u8) !void { + try cbor.writeArrayHeader(writer, 4); + try cbor.writeValue(writer, "send"); + try cbor.writeValue(writer, from_id); + try cbor.writeValue(writer, to_id); + _ = try writer.write(payload); +} + +/// Encode a `send_named` wire message into writer. /// payload must already be CBOR-encoded bytes. pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const u8, payload: []const u8) !void { try cbor.writeArrayHeader(writer, 4); @@ -20,25 +30,107 @@ pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const _ = try writer.write(payload); } +/// Encode a `link` wire message into writer. +pub fn encode_link(writer: *std.Io.Writer, local_id: u64, remote_id: u64) !void { + try cbor.writeArrayHeader(writer, 3); + try cbor.writeValue(writer, "link"); + try cbor.writeValue(writer, local_id); + try cbor.writeValue(writer, remote_id); +} + +/// Encode an `exit` wire message into writer. +pub fn encode_exit(writer: *std.Io.Writer, id: u64, reason: []const u8) !void { + try cbor.writeArrayHeader(writer, 3); + try cbor.writeValue(writer, "exit"); + try cbor.writeValue(writer, id); + try cbor.writeValue(writer, reason); +} + +/// Encode a `proxy_id` wire message into writer. +pub fn encode_proxy_id(writer: *std.Io.Writer, name: []const u8, id: u64) !void { + try cbor.writeArrayHeader(writer, 3); + try cbor.writeValue(writer, "proxy_id"); + try cbor.writeValue(writer, name); + try cbor.writeValue(writer, id); +} + +/// Encode a `transport_error` wire message into writer. +pub fn encode_transport_error(writer: *std.Io.Writer, reason: []const u8) !void { + try cbor.writeArrayHeader(writer, 2); + try cbor.writeValue(writer, "transport_error"); + try cbor.writeValue(writer, reason); +} + pub const WireMessage = union(enum) { + send: Send, send_named: SendNamed, + link: Link, + exit: Exit, + proxy_id: ProxyId, + transport_error: TransportError, + + pub const Send = struct { + from_id: u64, + to_id: u64, + payload: []const u8, // raw CBOR bytes + }; pub const SendNamed = struct { from_id: u64, to_name: []const u8, payload: []const u8, // raw CBOR bytes }; + + pub const Link = struct { + local_id: u64, + remote_id: u64, + }; + + pub const Exit = struct { + id: u64, + reason: []const u8, + }; + + pub const ProxyId = struct { + name: []const u8, + id: u64, + }; + + pub const TransportError = struct { + reason: []const u8, + }; }; /// Decode a raw CBOR frame into a WireMessage. /// Returned slices point into the frame buffer and are only valid while it lives. pub fn decode(frame: []const u8) !WireMessage { var from_id: u64 = 0; + var to_id: u64 = 0; var to_name: []const u8 = ""; var payload: []const u8 = ""; + var local_id: u64 = 0; + var remote_id: u64 = 0; + var id: u64 = 0; + var name: []const u8 = ""; + var reason: []const u8 = ""; + + if (try cbor.match(frame, .{ "send", cbor.extract(&from_id), cbor.extract(&to_id), cbor.extract_cbor(&payload) })) + return .{ .send = .{ .from_id = from_id, .to_id = to_id, .payload = payload } }; if (try cbor.match(frame, .{ "send_named", cbor.extract(&from_id), cbor.extract(&to_name), cbor.extract_cbor(&payload) })) return .{ .send_named = .{ .from_id = from_id, .to_name = to_name, .payload = payload } }; + if (try cbor.match(frame, .{ "link", cbor.extract(&local_id), cbor.extract(&remote_id) })) + return .{ .link = .{ .local_id = local_id, .remote_id = remote_id } }; + + if (try cbor.match(frame, .{ "exit", cbor.extract(&id), cbor.extract(&reason) })) + return .{ .exit = .{ .id = id, .reason = reason } }; + + if (try cbor.match(frame, .{ "proxy_id", cbor.extract(&name), cbor.extract(&id) })) + return .{ .proxy_id = .{ .name = name, .id = id } }; + + if (try cbor.match(frame, .{ "transport_error", cbor.extract(&reason) })) + return .{ .transport_error = .{ .reason = reason } }; + return error.UnknownMessageType; } diff --git a/test/remote_child_endpoint.zig b/test/remote_child_endpoint.zig index d51f573..4f38120 100644 --- a/test/remote_child_endpoint.zig +++ b/test/remote_child_endpoint.zig @@ -143,11 +143,17 @@ const StdioEndpoint = struct { fn dispatch_frame(self: *@This(), frame: []const u8) !void { const msg = try protocol.decode(frame); switch (msg) { + .send => |s| { + _ = s; + _ = self; + return tp.exit_error(error.UnexpectedSend, null); + }, .send_named => |s| { const actor = tp.env.get().proc(s.to_name); try actor.send_raw(tp.message{ .buf = s.payload }); _ = self; }, + .link, .exit, .proxy_id, .transport_error => return tp.exit_error(error.UnexpectedMessage, null), } } diff --git a/test/remote_poc_test.zig b/test/remote_poc_test.zig index a49d880..ccc9b58 100644 --- a/test/remote_poc_test.zig +++ b/test/remote_poc_test.zig @@ -69,6 +69,7 @@ const Parent = struct { try std.testing.expectEqualStrings("test_actor", s.to_name); try std.testing.expect(try cbor.match(s.payload, .{ "hello", "from_child" })); }, + else => return unexpected(thespian.message{ .buf = frame }), } self.frame_received = true; }