diff --git a/src/thespian.zig b/src/thespian.zig index 64f7ba5..1afd814 100644 --- a/src/thespian.zig +++ b/src/thespian.zig @@ -9,7 +9,6 @@ const c = @cImport({ @cInclude("thespian/c/unx.h"); @cInclude("thespian/c/tcp.h"); @cInclude("thespian/c/socket.h"); - @cInclude("netinet/in.h"); }); const c_posix = if (builtin.os.tag != .windows) @cImport({ @cInclude("thespian/backtrace.h"); @@ -23,6 +22,9 @@ pub var stack_trace_on_errors: bool = false; pub const subprocess = if (builtin.os.tag == .windows) @import("subprocess_windows.zig") else @import("subprocess.zig"); +pub const in6_addr = [16]u8; +pub const in6addr_loopback: in6_addr = [_]u8{0} ** 15 ++ [_]u8{1}; + pub const install_debugger = c.install_debugger; pub const install_remote_debugger = c.install_remote_debugger; pub const install_backtrace = c.install_backtrace; @@ -745,7 +747,7 @@ pub const metronome = struct { } }; -pub const unx_mode = enum(u8) { +pub const unx_mode = enum(c_uint) { file = 0, abstract = 1, }; @@ -755,13 +757,12 @@ pub const unx_acceptor = struct { const Self = @This(); - pub fn init(tag_: []const u8) !Self { - return .{ .handle = c.thespian_unx_acceptor_create(tag_) orelse return log_last_error(error.ThespianUnxAcceptorInitFailed) }; + pub fn init(tag_: [:0]const u8) !Self { + return .{ .handle = c.thespian_unx_acceptor_create(tag_.ptr) orelse return log_last_error(error.ThespianUnxAcceptorInitFailed) }; } - pub fn listen(self: *const Self, path: []const u8, mode: unx_mode) !void { - const mval: u8 = @intCast(mode); - const ret = c.thespian_unx_acceptor_listen(self.handle, path, mval); + pub fn listen(self: *const Self, path: [:0]const u8, mode: unx_mode) !void { + const ret = c.thespian_unx_acceptor_listen(self.handle, path.ptr, @intFromEnum(mode)); if (ret < 0) return error.ThespianUnxAcceptorListenFailed; } @@ -780,13 +781,12 @@ pub const unx_connector = struct { const Self = @This(); - pub fn init(tag_: []const u8) !Self { - return .{ .handle = c.thespian_unx_connector_create(tag_) orelse return log_last_error(error.ThespianUnxConnectorInitFailed) }; + pub fn init(tag_: [:0]const u8) !Self { + return .{ .handle = c.thespian_unx_connector_create(tag_.ptr) orelse return log_last_error(error.ThespianUnxConnectorInitFailed) }; } - pub fn connect(self: *const Self, path: []const u8, mode: unx_mode) !void { - const mval: u8 = @intCast(mode); - const ret = c.thespian_unx_connector_connect(self.handle, path, mval); + pub fn connect(self: *const Self, path: [:0]const u8, mode: unx_mode) !void { + const ret = c.thespian_unx_connector_connect(self.handle, path.ptr, @intFromEnum(mode)); if (ret < 0) return error.ThespianUnxConnectorConnectFailed; } @@ -805,12 +805,12 @@ pub const tcp_acceptor = struct { const Self = @This(); - pub fn init(tag_: []const u8) !Self { - return .{ .handle = c.thespian_tcp_acceptor_create(tag_) orelse return log_last_error(error.ThespianTcpAcceptorInitFailed) }; + pub fn init(tag_: [:0]const u8) !Self { + return .{ .handle = c.thespian_tcp_acceptor_create(tag_.ptr) orelse return log_last_error(error.ThespianTcpAcceptorInitFailed) }; } - pub fn listen(self: *const Self, ip: c.in6_addr, port: u16) !u16 { - const ret = c.thespian_tcp_acceptor_listen(self.handle, ip, port); + pub fn listen(self: *const Self, ip: in6_addr, port: u16) !u16 { + const ret = c.thespian_tcp_acceptor_listen(self.handle, @bitCast(ip), port); if (ret == 0) return error.ThespianTcpAcceptorListenFailed; return ret; } @@ -830,12 +830,12 @@ pub const tcp_connector = struct { const Self = @This(); - pub fn init(tag_: []const u8) !Self { - return .{ .handle = c.thespian_tcp_connector_create(tag_) orelse return log_last_error(error.ThespianTcpConnectorInitFailed) }; + pub fn init(tag_: [:0]const u8) !Self { + return .{ .handle = c.thespian_tcp_connector_create(tag_.ptr) orelse return log_last_error(error.ThespianTcpConnectorInitFailed) }; } - pub fn connect(self: *const Self, ip: c.in6_addr, port: u16) !void { - const ret = c.thespian_tcp_connector_connect(self.handle, ip, port); + pub fn connect(self: *const Self, ip: in6_addr, port: u16) !void { + const ret = c.thespian_tcp_connector_connect(self.handle, @bitCast(ip), port); if (ret < 0) return error.ThespianTcpConnectorConnectFailed; } @@ -854,8 +854,8 @@ pub const socket = struct { const Self = @This(); - pub fn init(tag_: []const u8, fd: i32) !Self { - return .{ .handle = c.thespian_socket_create(tag_, fd) orelse return log_last_error(error.ThespianSocketInitFailed) }; + pub fn init(tag_: [:0]const u8, fd: i32) !Self { + return .{ .handle = c.thespian_socket_create(tag_.ptr, fd) orelse return log_last_error(error.ThespianSocketInitFailed) }; } pub fn write(self: *const Self, data: []const u8) !void { diff --git a/test/ip_tcp_client_server.zig b/test/ip_tcp_client_server.zig new file mode 100644 index 0000000..8bc696f --- /dev/null +++ b/test/ip_tcp_client_server.zig @@ -0,0 +1,281 @@ +const std = @import("std"); +const thespian = @import("thespian"); + +const Allocator = std.mem.Allocator; + +const exit_normal = thespian.exit_normal; +const exit_error = thespian.exit_error; +const result = thespian.result; +const unexpected = thespian.unexpected; + +const pid = thespian.pid; +const pid_ref = thespian.pid_ref; +const self_pid = thespian.self_pid; + +const Receiver = thespian.Receiver; +const spawn_link = thespian.spawn_link; + +const message = thespian.message; +const extract = thespian.extract; + +const socket = thespian.socket; +const tcp_acceptor = thespian.tcp_acceptor; +const tcp_connector = thespian.tcp_connector; +const in6addr_loopback = thespian.in6addr_loopback; + +pub const std_options = .{ + .log_level = .err, +}; + +const ClientConnection = struct { + allocator: Allocator, + sock: socket, + client_pid: pid, + receiver: Receiver(*@This()), + + const Args = struct { allocator: Allocator, fd: i32, client_pid: pid }; + + fn start(args: Args) result { + const sock = socket.init("client_connection", args.fd) catch |e| return exit_error(e, @errorReturnTrace()); + var self = args.allocator.create(@This()) catch |e| return exit_error(e, @errorReturnTrace()); + self.* = .{ + .allocator = args.allocator, + .sock = sock, + .client_pid = args.client_pid, + .receiver = .init(receive_safe, self), + }; + errdefer self.deinit(); + self.sock.read() catch |e| return exit_error(e, @errorReturnTrace()); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.client_pid.deinit(); + self.allocator.destroy(self); + } + + fn receive_safe(self: *@This(), from: pid_ref, m: message) result { + errdefer self.deinit(); + self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: pid_ref, m: message) !void { + var buf: []const u8 = ""; + var written: i64 = 0; + if (try m.match(.{ "socket", "client_connection", "read_complete", extract(&buf) })) { + if (std.mem.eql(u8, buf, "ping")) + try self.sock.write("pong"); + } else if (try m.match(.{ "socket", "client_connection", "write_complete", extract(&written) })) { + // just wait for server to close + } else if (try m.match(.{ "socket", "client_connection", "closed" })) { + // connection done + _ = self.client_pid.send(.{ "client_connection", "done" }) catch {}; + return exit_normal(); + } else { + return unexpected(m); + } + } +}; + +const Client = struct { + allocator: Allocator, + connector: tcp_connector, + server_pid: pid, + receiver: Receiver(*@This()), + + const Args = struct { allocator: Allocator, server_pid: pid, port: u16 }; + + fn start(args: Args) result { + return init(args) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const connector: tcp_connector = try .init("client"); + var self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .connector = connector, + .server_pid = args.server_pid, + .receiver = .init(receive_safe, self), + }; + errdefer self.deinit(); + try self.connector.connect(in6addr_loopback, args.port); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.server_pid.deinit(); + self.allocator.destroy(self); + } + + fn receive_safe(self: *@This(), from: pid_ref, m: message) result { + errdefer self.deinit(); + self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: pid_ref, m: message) !void { + var fd: i32 = 0; + if (try m.match(.{ "connector", "client", "connected", extract(&fd) })) { + _ = try spawn_link(self.allocator, ClientConnection.Args{ + .allocator = self.allocator, + .fd = fd, + .client_pid = self_pid().clone(), + }, ClientConnection.start, "client_connection"); + } else if (try m.match(.{ "client_connection", "done" })) { + _ = try self.server_pid.send(.{ "client", "done" }); + return exit_normal(); + } else { + return unexpected(m); + } + } +}; + +const ServerConnection = struct { + allocator: Allocator, + sock: socket, + server_pid: pid, + receiver: Receiver(*@This()), + + const Args = struct { allocator: Allocator, fd: i32, server_pid: pid }; + + fn start(args: Args) result { + return init(args) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const sock = try socket.init("server_connection", args.fd); + var self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .sock = sock, + .server_pid = args.server_pid, + .receiver = .init(receive_safe, self), + }; + errdefer self.deinit(); + try self.sock.write("ping"); + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.server_pid.deinit(); + self.allocator.destroy(self); + } + + fn receive_safe(self: *@This(), from: pid_ref, m: message) result { + errdefer self.deinit(); + self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: pid_ref, m: message) !void { + var buf: []const u8 = ""; + var written: i64 = 0; + + if (try m.match(.{ "socket", "server_connection", "write_complete", extract(&written) })) { + // ping sent, start reading + try self.sock.read(); + } else if (try m.match(.{ "socket", "server_connection", "read_complete", extract(&buf) })) { + // received pong, close socket + if (std.mem.eql(u8, buf, "pong")) + try self.sock.close(); + } else if (try m.match(.{ "socket", "server_connection", "closed" })) { + // connection done, notify server + _ = self.server_pid.send(.{ "server_connection", "done" }) catch {}; + return exit_normal(); + } else { + return unexpected(m); + } + } +}; + +const Server = struct { + allocator: Allocator, + acceptor: tcp_acceptor, + client_done: bool = false, + server_conn_done: bool = false, + acceptor_closed: bool = false, + receiver: Receiver(*@This()), + + const Args = struct { allocator: Allocator }; + + fn start(args: Args) result { + return init(args) catch |e| exit_error(e, @errorReturnTrace()); + } + + fn init(args: Args) !void { + const acceptor: tcp_acceptor = try .init("server"); + var self = try args.allocator.create(@This()); + self.* = .{ + .allocator = args.allocator, + .acceptor = acceptor, + .receiver = .init(receive_safe, self), + }; + errdefer self.deinit(); + + const port = try self.acceptor.listen(in6addr_loopback, 0); + + _ = try spawn_link(args.allocator, Client.Args{ + .allocator = args.allocator, + .server_pid = self_pid().clone(), + .port = port, + }, Client.start, "client"); + + thespian.receive(&self.receiver); + } + + fn deinit(self: *@This()) void { + self.acceptor.deinit(); + self.allocator.destroy(self); + } + + fn receive_safe(self: *@This(), from: pid_ref, m: message) result { + errdefer self.deinit(); + self.receive(from, m) catch |e| return exit_error(e, @errorReturnTrace()); + } + + fn receive(self: *@This(), _: pid_ref, m: message) !void { + var fd: i32 = 0; + if (try m.match(.{ "acceptor", "server", "accept", extract(&fd) })) { + _ = try spawn_link(self.allocator, ServerConnection.Args{ + .allocator = self.allocator, + .fd = fd, + .server_pid = self_pid().clone(), + }, ServerConnection.start, "server_connection"); + // just one connection for this test + try self.acceptor.close(); + } else if (try m.match(.{ "acceptor", "server", "closed" })) { + self.acceptor_closed = true; + } else if (try m.match(.{ "client", "done" })) { + self.client_done = true; + } else if (try m.match(.{ "server_connection", "done" })) { + self.server_conn_done = true; + } else { + return unexpected(m); + } + if (self.acceptor_closed and self.client_done and self.server_conn_done) + return exit_normal(); + } +}; + +test "ip_tcp_client_server test" { + const allocator = std.testing.allocator; + var ctx = try thespian.context.init(allocator); + defer ctx.deinit(); + + var success = false; + + var exit_handler = thespian.make_exit_handler(&success, struct { + fn handle(ok: *bool, status: []const u8) void { + if (std.mem.eql(u8, status, "normal")) { + ok.* = true; + } else { + std.log.err("EXITED: {s}", .{status}); + } + } + }.handle); + + _ = try ctx.spawn_link(Server.Args{ .allocator = allocator }, Server.start, "ip_tcp_client_server", &exit_handler, null); + + ctx.run(); + + if (!success) return error.TestFailed; +} diff --git a/test/tests.zig b/test/tests.zig index 0f24486..81af45b 100644 --- a/test/tests.zig +++ b/test/tests.zig @@ -1,6 +1,7 @@ const std = @import("std"); pub const cpp = @import("tests_cpp.zig"); pub const thespian = @import("tests_thespian.zig"); +pub const ip_tcp_client_server = @import("ip_tcp_client_server.zig"); test { std.testing.refAllDecls(@This());