fix: clean-up and fix ip_tcp_client_server.zig testcase

This commit is contained in:
CJ van den Berg 2026-03-04 19:41:39 +01:00
parent 5a729b6d06
commit 33229ffc11
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -33,61 +33,41 @@ const ClientConnection = struct {
const Args = struct { allocator: Allocator, fd: i32, client_pid: pid }; const Args = struct { allocator: Allocator, fd: i32, client_pid: pid };
fn start(args: Args) result { fn start(args: Args) result {
std.log.err("ClientConnection.start entered fd={d}", .{args.fd}); const sock = socket.init("client_connection", args.fd) catch |e| return exit_error(e, @errorReturnTrace());
const sock = socket.init("client_connection", args.fd) catch |e| {
std.log.err("ClientConnection.start socket.init FAILED: {any}", .{e});
return exit_error(e, @errorReturnTrace());
};
std.log.err("ClientConnection.start socket created", .{});
var self = args.allocator.create(@This()) catch |e| return exit_error(e, @errorReturnTrace()); var self = args.allocator.create(@This()) catch |e| return exit_error(e, @errorReturnTrace());
self.* = .{ self.* = .{
.allocator = args.allocator, .allocator = args.allocator,
.sock = sock, .sock = sock,
.client_pid = args.client_pid, .client_pid = args.client_pid,
.receiver = .init(receive_safe, self), .receiver = .init(receive_fn, deinit, self),
}; };
errdefer self.deinit(); errdefer self.deinit();
self.sock.read() catch |e| { self.sock.read() catch |e| return exit_error(e, @errorReturnTrace());
std.log.err("ClientConnection.start sock.read FAILED: {any}", .{e});
return exit_error(e, @errorReturnTrace());
};
std.log.err("ClientConnection.start calling receive()", .{});
thespian.receive(&self.receiver); thespian.receive(&self.receiver);
std.log.err("ClientConnection.start returning", .{});
} }
fn deinit(self: *@This()) void { fn deinit(self: *@This()) void {
self.sock.deinit(); self.sock.deinit();
self.client_pid.deinit(); self.client_pid.deinit();
self.allocator.destroy(self); self.allocator.destroy(self);
std.log.err("ClientConnection.deinit: client connection destroyed", .{});
} }
fn receive_safe(self: *@This(), from: pid_ref, m: message) result { fn receive_fn(self: *@This(), from: pid_ref, m: message) result {
errdefer self.deinit(); return self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
} }
fn receive(self: *@This(), _: pid_ref, m: message) !void { fn receive(self: *@This(), _: pid_ref, m: message) !void {
std.log.err("ClientConnection.receive msg={f}", .{m});
var buf: []const u8 = ""; var buf: []const u8 = "";
var written: i64 = 0; var written: i64 = 0;
if (try m.match(.{ "socket", "client_connection", "read_complete", extract(&buf) })) { if (try m.match(.{ "socket", "client_connection", "read_complete", extract(&buf) })) {
std.log.err("ClientConnection.receive: read_complete buf={s}", .{buf});
try std.testing.expectEqualSlices(u8, "ping", buf); try std.testing.expectEqualSlices(u8, "ping", buf);
try self.sock.write("pong"); try self.sock.write("pong");
// try self.sock.read();
} else if (try m.match(.{ "socket", "client_connection", "write_complete", extract(&written) })) { } else if (try m.match(.{ "socket", "client_connection", "write_complete", extract(&written) })) {
std.log.err("ClientConnection.receive: write_complete written={d}", .{written});
try std.testing.expectEqual(4, written); try std.testing.expectEqual(4, written);
// wait for server to close - must keep a read pending to detect it
// try self.sock.read();
} else if (try m.match(.{ "socket", "client_connection", "closed" })) { } else if (try m.match(.{ "socket", "client_connection", "closed" })) {
std.log.err("ClientConnection.receive: closed, sending done", .{});
_ = self.client_pid.send(.{ "client_connection", "done" }) catch {}; _ = self.client_pid.send(.{ "client_connection", "done" }) catch {};
return exit("success"); return exit("success");
} else { } else {
std.log.err("ClientConnection.receive: UNEXPECTED msg={f}", .{m});
return unexpected(m); return unexpected(m);
} }
} }
@ -106,50 +86,40 @@ const Client = struct {
} }
fn init(args: Args) !void { fn init(args: Args) !void {
std.log.err("Client.init entered port={d}", .{args.port});
const connector: tcp_connector = try .init("client"); const connector: tcp_connector = try .init("client");
var self = try args.allocator.create(@This()); var self = try args.allocator.create(@This());
self.* = .{ self.* = .{
.allocator = args.allocator, .allocator = args.allocator,
.connector = connector, .connector = connector,
.server_pid = args.server_pid, .server_pid = args.server_pid,
.receiver = .init(receive_safe, self), .receiver = .init(receive_fn, deinit, self),
}; };
errdefer self.deinit(); errdefer self.deinit();
try self.connector.connect(in6addr_loopback, args.port); try self.connector.connect(in6addr_loopback, args.port);
std.log.err("Client.init connect() called, calling receive()", .{});
thespian.receive(&self.receiver); thespian.receive(&self.receiver);
std.log.err("Client.init returning", .{});
} }
fn deinit(self: *@This()) void { fn deinit(self: *@This()) void {
self.server_pid.deinit(); self.server_pid.deinit();
self.allocator.destroy(self); self.allocator.destroy(self);
std.log.err("Client.deinit: client destroyed", .{});
} }
fn receive_safe(self: *@This(), from: pid_ref, m: message) result { fn receive_fn(self: *@This(), from: pid_ref, m: message) result {
errdefer self.deinit(); return self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
} }
fn receive(self: *@This(), _: pid_ref, m: message) !void { fn receive(self: *@This(), _: pid_ref, m: message) !void {
std.log.err("Client.receive msg={f}", .{m});
var fd: i32 = 0; var fd: i32 = 0;
if (try m.match(.{ "connector", "client", "connected", extract(&fd) })) { if (try m.match(.{ "connector", "client", "connected", extract(&fd) })) {
std.log.err("Client.receive: connected fd={d}, spawning ClientConnection", .{fd});
_ = try spawn_link(self.allocator, ClientConnection.Args{ _ = try spawn_link(self.allocator, ClientConnection.Args{
.allocator = self.allocator, .allocator = self.allocator,
.fd = fd, .fd = fd,
.client_pid = self_pid().clone(), .client_pid = self_pid().clone(),
}, ClientConnection.start, "client_connection"); }, ClientConnection.start, "client_connection");
std.log.err("Client.receive: ClientConnection spawned", .{});
} else if (try m.match(.{ "client_connection", "done" })) { } else if (try m.match(.{ "client_connection", "done" })) {
std.log.err("Client.receive: client_connection done, exiting", .{});
_ = try self.server_pid.send(.{ "client", "done" }); _ = try self.server_pid.send(.{ "client", "done" });
return exit_normal(); return exit_normal();
} else { } else {
std.log.err("Client.receive: UNEXPECTED msg={f}", .{m});
return unexpected(m); return unexpected(m);
} }
} }
@ -168,55 +138,45 @@ const ServerConnection = struct {
} }
fn init(args: Args) !void { fn init(args: Args) !void {
std.log.err("ServerConnection.init entered fd={d}", .{args.fd});
const sock = try socket.init("server_connection", args.fd); const sock = try socket.init("server_connection", args.fd);
var self = try args.allocator.create(@This()); var self = try args.allocator.create(@This());
self.* = .{ self.* = .{
.allocator = args.allocator, .allocator = args.allocator,
.sock = sock, .sock = sock,
.server_pid = args.server_pid, .server_pid = args.server_pid,
.receiver = .init(receive_safe, self), .receiver = .init(receive_fn, deinit, self),
}; };
errdefer self.deinit(); errdefer self.deinit();
try self.sock.write("ping"); try self.sock.write("ping");
std.log.err("ServerConnection.init write(ping) called, calling receive()", .{});
thespian.receive(&self.receiver); thespian.receive(&self.receiver);
std.log.err("ServerConnection.init returning", .{});
} }
fn deinit(self: *@This()) void { fn deinit(self: *@This()) void {
self.sock.deinit(); self.sock.deinit();
self.server_pid.deinit(); self.server_pid.deinit();
self.allocator.destroy(self); self.allocator.destroy(self);
std.log.err("ServerConnection.deinit: server connection destroyed", .{});
} }
fn receive_safe(self: *@This(), from: pid_ref, m: message) result { fn receive_fn(self: *@This(), from: pid_ref, m: message) result {
errdefer self.deinit(); return self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
} }
fn receive(self: *@This(), _: pid_ref, m: message) !void { fn receive(self: *@This(), _: pid_ref, m: message) !void {
std.log.err("ServerConnection.receive msg={f}", .{m});
var buf: []const u8 = ""; var buf: []const u8 = "";
var written: i64 = 0; var written: i64 = 0;
if (try m.match(.{ "socket", "server_connection", "write_complete", extract(&written) })) { if (try m.match(.{ "socket", "server_connection", "write_complete", extract(&written) })) {
std.log.err("ServerConnection.receive: write_complete written={d}", .{written});
try std.testing.expectEqual(4, written); try std.testing.expectEqual(4, written);
// ping sent, start reading // ping sent, start reading
try self.sock.read(); try self.sock.read();
} else if (try m.match(.{ "socket", "server_connection", "read_complete", extract(&buf) })) { } else if (try m.match(.{ "socket", "server_connection", "read_complete", extract(&buf) })) {
std.log.err("ServerConnection.receive: read_complete buf={s}", .{buf});
try std.testing.expectEqualSlices(u8, "pong", buf); try std.testing.expectEqualSlices(u8, "pong", buf);
// received pong, close socket // received pong, close socket
try self.sock.close(); try self.sock.close();
} else if (try m.match(.{ "socket", "server_connection", "closed" })) { } else if (try m.match(.{ "socket", "server_connection", "closed" })) {
std.log.err("ServerConnection.receive: closed, sending done", .{});
self.server_pid.send(.{ "server_connection", "done" }) catch |e| return exit_error(e, @errorReturnTrace()); self.server_pid.send(.{ "server_connection", "done" }) catch |e| return exit_error(e, @errorReturnTrace());
return exit("success"); return exit("success");
} else { } else {
std.log.err("ServerConnection.receive: UNEXPECTED msg={f}", .{m});
return unexpected(m); return unexpected(m);
} }
} }
@ -237,68 +197,54 @@ const Server = struct {
} }
fn init(args: Args) !void { fn init(args: Args) !void {
std.log.err("Server.init entered", .{});
const acceptor: tcp_acceptor = try .init("server"); const acceptor: tcp_acceptor = try .init("server");
var self = try args.allocator.create(@This()); var self = try args.allocator.create(@This());
self.* = .{ self.* = .{
.allocator = args.allocator, .allocator = args.allocator,
.acceptor = acceptor, .acceptor = acceptor,
.receiver = .init(receive_safe, self), .receiver = .init(receive_fn, deinit, self),
}; };
errdefer self.deinit(); errdefer self.deinit();
const port = try self.acceptor.listen(in6addr_loopback, 0); const port = try self.acceptor.listen(in6addr_loopback, 0);
std.log.err("Server.init listening on port={d}", .{port});
_ = try spawn_link(args.allocator, Client.Args{ _ = try spawn_link(args.allocator, Client.Args{
.allocator = args.allocator, .allocator = args.allocator,
.server_pid = self_pid().clone(), .server_pid = self_pid().clone(),
.port = port, .port = port,
}, Client.start, "client"); }, Client.start, "client");
std.log.err("Server.init Client spawned, calling receive()", .{});
thespian.receive(&self.receiver); thespian.receive(&self.receiver);
std.log.err("Server.init returning", .{});
} }
fn deinit(self: *@This()) void { fn deinit(self: *@This()) void {
self.acceptor.deinit(); self.acceptor.deinit();
self.allocator.destroy(self); self.allocator.destroy(self);
std.log.err("Server.deinit: server destroyed", .{});
} }
fn receive_safe(self: *@This(), from: pid_ref, m: message) result { fn receive_fn(self: *@This(), from: pid_ref, m: message) result {
errdefer self.deinit(); return self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace());
} }
fn receive(self: *@This(), _: pid_ref, m: message) !void { fn receive(self: *@This(), _: pid_ref, m: message) !void {
std.log.err("Server.receive msg={f}", .{m});
var fd: i32 = 0; var fd: i32 = 0;
if (try m.match(.{ "acceptor", "server", "accept", extract(&fd) })) { if (try m.match(.{ "acceptor", "server", "accept", extract(&fd) })) {
std.log.err("Server.receive: accept fd={d}, spawning ServerConnection", .{fd});
_ = try spawn_link(self.allocator, ServerConnection.Args{ _ = try spawn_link(self.allocator, ServerConnection.Args{
.allocator = self.allocator, .allocator = self.allocator,
.fd = fd, .fd = fd,
.server_pid = self_pid().clone(), .server_pid = self_pid().clone(),
}, ServerConnection.start, "server_connection"); }, ServerConnection.start, "server_connection");
std.log.err("Server.receive: ServerConnection spawned, closing acceptor", .{});
try self.acceptor.close(); try self.acceptor.close();
} else if (try m.match(.{ "acceptor", "server", "closed" })) { } else if (try m.match(.{ "acceptor", "server", "closed" })) {
std.log.err("Server.receive: acceptor closed (flags: client={} conn={})", .{ self.client_done, self.server_conn_done });
self.acceptor_closed = true; self.acceptor_closed = true;
} else if (try m.match(.{ "client", "done" })) { } else if (try m.match(.{ "client", "done" })) {
std.log.err("Server.receive: client done (flags: acceptor={} conn={})", .{ self.acceptor_closed, self.server_conn_done });
self.client_done = true; self.client_done = true;
} else if (try m.match(.{ "server_connection", "done" })) { } else if (try m.match(.{ "server_connection", "done" })) {
std.log.err("Server.receive: server_connection done (flags: acceptor={} client={})", .{ self.acceptor_closed, self.client_done });
self.server_conn_done = true; self.server_conn_done = true;
} else { } else {
std.log.err("Server.receive: UNEXPECTED msg={f}", .{m});
return unexpected(m); return unexpected(m);
} }
if (self.acceptor_closed and self.client_done and self.server_conn_done) { if (self.acceptor_closed and self.client_done and self.server_conn_done) {
std.log.err("Server.receive: all done, exiting with 'success'", .{});
return exit("success"); return exit("success");
} }
} }
@ -315,7 +261,6 @@ test "ip_tcp_client_server test" {
fn handle(ok: *bool, status: []const u8) void { fn handle(ok: *bool, status: []const u8) void {
if (std.mem.eql(u8, status, "success")) { if (std.mem.eql(u8, status, "success")) {
ok.* = true; ok.* = true;
std.log.err("EXITED: {s}", .{status});
} else { } else {
std.log.err("EXITED: {s}", .{status}); std.log.err("EXITED: {s}", .{status});
} }