thespian/test/remote_endpoint_id_test.zig
CJ van den Berg 077d47f8b2
feat: clean up local_actors/outbound tables when senders exit
The endpoint now links to each local actor when it is first assigned an
outbound wire ID, trapping the exit to remove stale entries from the
outbound and local_actors tables.

To preserve spawn_link semantics, the endpoint stores the spawner's
instance_id() at init time. Trapped exits from the spawner are always
re-propagated regardless of whether the spawner also appears as an outbound
sender.
2026-03-12 22:21:27 +01:00

146 lines
5.1 KiB
Zig

/// Tests inbound proxy table, from-substitution, outbound ID table, and
/// inbound send-by-ID routing in a single three-step round-trip:
///
/// 1. Parent sends send_named to "echo_id" (from_id=0, trigger mode).
/// 2. Child echo_id sends send_named back (from_id=3) → proxy created,
/// delivered FROM proxy to TestActor (from-substitution).
/// 3. TestActor replies to proxy via from.send() → outbound ID assigned,
/// wire send reaches child with from_id=wire_id_TA, to_id=3.
/// 4. Child echo_id replies via wire send to wire_id_TA → parent routes
/// via local_actors to TestActor (inbound send-by-ID).
/// 5. TestActor receives ["done"] → exits "success".
const std = @import("std");
const thespian = @import("thespian");
const cbor = @import("cbor");
const endpoint = @import("endpoint");
const build_options = @import("build_options");
var trace_file: ?std.fs.File = null;
var trace_buf: [4096]u8 = undefined;
var trace_file_writer: std.fs.File.Writer = undefined;
fn trace_handler(buf: thespian.message.c_buffer_type) callconv(.c) void {
if (trace_file == null) return;
cbor.toJsonWriter(buf.base[0..buf.len], &trace_file_writer.interface, .{}) catch return;
trace_file_writer.interface.writeByte('\n') catch return;
}
const Allocator = std.mem.Allocator;
const result = thespian.result;
const unexpected = thespian.unexpected;
const pid_ref = thespian.pid_ref;
const Receiver = thespian.Receiver;
const message = thespian.message;
const TestActor = struct {
allocator: Allocator,
ep: thespian.pid,
receiver: Receiver(*@This()),
state: enum { waiting_for_hello, waiting_for_done },
const Args = struct { allocator: Allocator };
fn start(args: Args) result {
return init(args) catch |e| return thespian.exit_error(e, @errorReturnTrace());
}
fn init(args: Args) !void {
thespian.env.get().proc_set("test_receiver", thespian.self_pid().ref());
const argv = try args.allocator.dupe(u8, message.fmt(.{build_options.remote_child_endpoint_path}).buf);
const ep = try thespian.spawn_link(
args.allocator,
endpoint.Args{
.allocator = args.allocator,
.argv = argv,
.spawner = thespian.self_pid().ref(),
},
endpoint.start,
"endpoint",
);
// Trigger child echo_id in trigger mode (from_id=0).
try ep.send(.{ "send", @as(u64, 0), "echo_id", .{"hello"} });
const self = try args.allocator.create(@This());
self.* = .{
.allocator = args.allocator,
.ep = ep,
.receiver = .init(receive_fn, deinit, self),
.state = .waiting_for_hello,
};
errdefer self.deinit();
thespian.receive(&self.receiver);
}
fn deinit(self: *@This()) void {
self.ep.deinit();
self.allocator.destroy(self);
}
fn receive_fn(self: *@This(), from: pid_ref, m: message) result {
return self.receive(from, m) catch |e| return thespian.exit_error(e, @errorReturnTrace());
}
fn receive(self: *@This(), from: pid_ref, m: message) !void {
switch (self.state) {
.waiting_for_hello => {
if (try m.match(.{"hello"})) {
// Reply through the proxy — this populates the outbound ID table
// and sends a wire send to the child with our assigned wire ID.
try from.send(.{"hello"});
self.state = .waiting_for_done;
} else return unexpected(m);
},
.waiting_for_done => {
if (try m.match(.{"done"})) {
// Arrived via inbound send-by-ID routing through local_actors.
return thespian.exit("success");
} else return unexpected(m);
},
}
}
};
test "remote: inbound proxy table, from-substitution, outbound ID table, and send-by-ID routing" {
const allocator = std.testing.allocator;
var initial_env: ?thespian.env = null;
if (std.posix.getenv("TRACE") != null) {
const f = try std.fs.cwd().createFile("remote_endpoint_id_trace.json", .{});
trace_file = f;
trace_file_writer = f.writer(&trace_buf);
var e = thespian.env.init();
e.on_trace(&trace_handler);
e.enable_all_channels();
initial_env = e;
}
defer if (initial_env) |e| {
trace_file_writer.interface.flush() catch {};
trace_file.?.close();
trace_file = null;
e.deinit();
};
var ctx = try thespian.context.init(allocator);
defer ctx.deinit();
var success = false;
var exit_handler = thespian.make_exit_handler(&success, struct {
fn handle(ok: *bool, status: []const u8) void {
ok.* = std.mem.eql(u8, status, "success");
}
}.handle);
_ = try ctx.spawn_link(
TestActor.Args{ .allocator = allocator },
TestActor.start,
"test_actor",
&exit_handler,
if (initial_env) |*e| e else null,
);
ctx.run();
if (!success) return error.TestFailed;
}