WIP: add remaining message types
This commit is contained in:
parent
0e804fc61f
commit
cb37b9b82b
4 changed files with 105 additions and 1 deletions
|
|
@ -77,10 +77,15 @@ const Endpoint = struct {
|
||||||
fn dispatch_inbound(_: *@This(), frame: []const u8) !void {
|
fn dispatch_inbound(_: *@This(), frame: []const u8) !void {
|
||||||
const msg = try protocol.decode(frame);
|
const msg = try protocol.decode(frame);
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
|
.send => |s| {
|
||||||
|
_ = s;
|
||||||
|
return tp.exit_error(error.UnexpectedSend, null);
|
||||||
|
},
|
||||||
.send_named => |s| {
|
.send_named => |s| {
|
||||||
const actor = tp.env.get().proc(s.to_name);
|
const actor = tp.env.get().proc(s.to_name);
|
||||||
try actor.send_raw(tp.message{ .buf = s.payload });
|
try actor.send_raw(tp.message{ .buf = s.payload });
|
||||||
},
|
},
|
||||||
|
.link, .exit, .proxy_id, .transport_error => return tp.exit_error(error.UnexpectedMessage, null),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,17 @@ pub const RawCbor = struct {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Encode a send_named wire message into writer.
|
/// Encode a `send` wire message into writer.
|
||||||
|
/// payload must already be CBOR-encoded bytes.
|
||||||
|
pub fn encode_send(writer: *std.Io.Writer, from_id: u64, to_id: u64, payload: []const u8) !void {
|
||||||
|
try cbor.writeArrayHeader(writer, 4);
|
||||||
|
try cbor.writeValue(writer, "send");
|
||||||
|
try cbor.writeValue(writer, from_id);
|
||||||
|
try cbor.writeValue(writer, to_id);
|
||||||
|
_ = try writer.write(payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode a `send_named` wire message into writer.
|
||||||
/// payload must already be CBOR-encoded bytes.
|
/// payload must already be CBOR-encoded bytes.
|
||||||
pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const u8, payload: []const u8) !void {
|
pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const u8, payload: []const u8) !void {
|
||||||
try cbor.writeArrayHeader(writer, 4);
|
try cbor.writeArrayHeader(writer, 4);
|
||||||
|
|
@ -20,25 +30,107 @@ pub fn encode_send_named(writer: *std.Io.Writer, from_id: u64, to_name: []const
|
||||||
_ = try writer.write(payload);
|
_ = try writer.write(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Encode a `link` wire message into writer.
|
||||||
|
pub fn encode_link(writer: *std.Io.Writer, local_id: u64, remote_id: u64) !void {
|
||||||
|
try cbor.writeArrayHeader(writer, 3);
|
||||||
|
try cbor.writeValue(writer, "link");
|
||||||
|
try cbor.writeValue(writer, local_id);
|
||||||
|
try cbor.writeValue(writer, remote_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode an `exit` wire message into writer.
|
||||||
|
pub fn encode_exit(writer: *std.Io.Writer, id: u64, reason: []const u8) !void {
|
||||||
|
try cbor.writeArrayHeader(writer, 3);
|
||||||
|
try cbor.writeValue(writer, "exit");
|
||||||
|
try cbor.writeValue(writer, id);
|
||||||
|
try cbor.writeValue(writer, reason);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode a `proxy_id` wire message into writer.
|
||||||
|
pub fn encode_proxy_id(writer: *std.Io.Writer, name: []const u8, id: u64) !void {
|
||||||
|
try cbor.writeArrayHeader(writer, 3);
|
||||||
|
try cbor.writeValue(writer, "proxy_id");
|
||||||
|
try cbor.writeValue(writer, name);
|
||||||
|
try cbor.writeValue(writer, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Encode a `transport_error` wire message into writer.
|
||||||
|
pub fn encode_transport_error(writer: *std.Io.Writer, reason: []const u8) !void {
|
||||||
|
try cbor.writeArrayHeader(writer, 2);
|
||||||
|
try cbor.writeValue(writer, "transport_error");
|
||||||
|
try cbor.writeValue(writer, reason);
|
||||||
|
}
|
||||||
|
|
||||||
pub const WireMessage = union(enum) {
|
pub const WireMessage = union(enum) {
|
||||||
|
send: Send,
|
||||||
send_named: SendNamed,
|
send_named: SendNamed,
|
||||||
|
link: Link,
|
||||||
|
exit: Exit,
|
||||||
|
proxy_id: ProxyId,
|
||||||
|
transport_error: TransportError,
|
||||||
|
|
||||||
|
pub const Send = struct {
|
||||||
|
from_id: u64,
|
||||||
|
to_id: u64,
|
||||||
|
payload: []const u8, // raw CBOR bytes
|
||||||
|
};
|
||||||
|
|
||||||
pub const SendNamed = struct {
|
pub const SendNamed = struct {
|
||||||
from_id: u64,
|
from_id: u64,
|
||||||
to_name: []const u8,
|
to_name: []const u8,
|
||||||
payload: []const u8, // raw CBOR bytes
|
payload: []const u8, // raw CBOR bytes
|
||||||
};
|
};
|
||||||
|
|
||||||
|
pub const Link = struct {
|
||||||
|
local_id: u64,
|
||||||
|
remote_id: u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const Exit = struct {
|
||||||
|
id: u64,
|
||||||
|
reason: []const u8,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const ProxyId = struct {
|
||||||
|
name: []const u8,
|
||||||
|
id: u64,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const TransportError = struct {
|
||||||
|
reason: []const u8,
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Decode a raw CBOR frame into a WireMessage.
|
/// Decode a raw CBOR frame into a WireMessage.
|
||||||
/// Returned slices point into the frame buffer and are only valid while it lives.
|
/// Returned slices point into the frame buffer and are only valid while it lives.
|
||||||
pub fn decode(frame: []const u8) !WireMessage {
|
pub fn decode(frame: []const u8) !WireMessage {
|
||||||
var from_id: u64 = 0;
|
var from_id: u64 = 0;
|
||||||
|
var to_id: u64 = 0;
|
||||||
var to_name: []const u8 = "";
|
var to_name: []const u8 = "";
|
||||||
var payload: []const u8 = "";
|
var payload: []const u8 = "";
|
||||||
|
var local_id: u64 = 0;
|
||||||
|
var remote_id: u64 = 0;
|
||||||
|
var id: u64 = 0;
|
||||||
|
var name: []const u8 = "";
|
||||||
|
var reason: []const u8 = "";
|
||||||
|
|
||||||
|
if (try cbor.match(frame, .{ "send", cbor.extract(&from_id), cbor.extract(&to_id), cbor.extract_cbor(&payload) }))
|
||||||
|
return .{ .send = .{ .from_id = from_id, .to_id = to_id, .payload = payload } };
|
||||||
|
|
||||||
if (try cbor.match(frame, .{ "send_named", cbor.extract(&from_id), cbor.extract(&to_name), cbor.extract_cbor(&payload) }))
|
if (try cbor.match(frame, .{ "send_named", cbor.extract(&from_id), cbor.extract(&to_name), cbor.extract_cbor(&payload) }))
|
||||||
return .{ .send_named = .{ .from_id = from_id, .to_name = to_name, .payload = payload } };
|
return .{ .send_named = .{ .from_id = from_id, .to_name = to_name, .payload = payload } };
|
||||||
|
|
||||||
|
if (try cbor.match(frame, .{ "link", cbor.extract(&local_id), cbor.extract(&remote_id) }))
|
||||||
|
return .{ .link = .{ .local_id = local_id, .remote_id = remote_id } };
|
||||||
|
|
||||||
|
if (try cbor.match(frame, .{ "exit", cbor.extract(&id), cbor.extract(&reason) }))
|
||||||
|
return .{ .exit = .{ .id = id, .reason = reason } };
|
||||||
|
|
||||||
|
if (try cbor.match(frame, .{ "proxy_id", cbor.extract(&name), cbor.extract(&id) }))
|
||||||
|
return .{ .proxy_id = .{ .name = name, .id = id } };
|
||||||
|
|
||||||
|
if (try cbor.match(frame, .{ "transport_error", cbor.extract(&reason) }))
|
||||||
|
return .{ .transport_error = .{ .reason = reason } };
|
||||||
|
|
||||||
return error.UnknownMessageType;
|
return error.UnknownMessageType;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -143,11 +143,17 @@ const StdioEndpoint = struct {
|
||||||
fn dispatch_frame(self: *@This(), frame: []const u8) !void {
|
fn dispatch_frame(self: *@This(), frame: []const u8) !void {
|
||||||
const msg = try protocol.decode(frame);
|
const msg = try protocol.decode(frame);
|
||||||
switch (msg) {
|
switch (msg) {
|
||||||
|
.send => |s| {
|
||||||
|
_ = s;
|
||||||
|
_ = self;
|
||||||
|
return tp.exit_error(error.UnexpectedSend, null);
|
||||||
|
},
|
||||||
.send_named => |s| {
|
.send_named => |s| {
|
||||||
const actor = tp.env.get().proc(s.to_name);
|
const actor = tp.env.get().proc(s.to_name);
|
||||||
try actor.send_raw(tp.message{ .buf = s.payload });
|
try actor.send_raw(tp.message{ .buf = s.payload });
|
||||||
_ = self;
|
_ = self;
|
||||||
},
|
},
|
||||||
|
.link, .exit, .proxy_id, .transport_error => return tp.exit_error(error.UnexpectedMessage, null),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,7 @@ const Parent = struct {
|
||||||
try std.testing.expectEqualStrings("test_actor", s.to_name);
|
try std.testing.expectEqualStrings("test_actor", s.to_name);
|
||||||
try std.testing.expect(try cbor.match(s.payload, .{ "hello", "from_child" }));
|
try std.testing.expect(try cbor.match(s.payload, .{ "hello", "from_child" }));
|
||||||
},
|
},
|
||||||
|
else => return unexpected(thespian.message{ .buf = frame }),
|
||||||
}
|
}
|
||||||
self.frame_received = true;
|
self.frame_received = true;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue