From 2b380bbef442cd80842176f87517ef5726480a7a Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Tue, 22 Oct 2024 20:52:41 +0200 Subject: [PATCH] fix: improve CallContext robustness --- src/thespian.zig | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/src/thespian.zig b/src/thespian.zig index 316b798..f3fc207 100644 --- a/src/thespian.zig +++ b/src/thespian.zig @@ -109,7 +109,7 @@ fn Pid(comptime own: Ownership) type { return c.thespian_handle_is_expired(self.h); } - pub fn wait_expired(self: Self, timeout_ns: isize) !void { + pub fn wait_expired(self: Self, timeout_ns: isize) error{Timeout}!void { var max_sleep: isize = timeout_ns; while (!self.expired()) { if (max_sleep <= 0) return error.Timeout; @@ -798,9 +798,10 @@ pub const file_stream = struct { const CallContext = struct { receiver: ReceiverT, + from: pid, to: pid_ref, request: message, - response: *?message, + response: ?message, a: std.mem.Allocator, done: std.Thread.ResetEvent = .{}, @@ -808,29 +809,21 @@ const CallContext = struct { const ReceiverT = Receiver(*Self); pub fn call(a: std.mem.Allocator, to: pid_ref, timeout_ns: u64, request: message) CallError!message { - var response: ?message = null; - var self: Self = .{ + const self = try a.create(Self); + self.* = .{ .receiver = undefined, + .from = self_pid().clone(), .to = to, .request = request, - .response = &response, + .response = null, .a = a, }; - self.receiver = ReceiverT.init(receive_, &self); - const proc = try spawn_link(a, &self, start, @typeName(Self)); + self.receiver = ReceiverT.init(receive_, self); + const proc = try spawn_link(a, self, start, @typeName(Self)); defer proc.deinit(); - errdefer { - proc.send(.{"timeout"}) catch @panic("CallContext.send.timeout"); - self.done.wait(); - if (response) |resp| a.free(resp.buf); - } - self.done.timedWait(timeout_ns) catch |e| switch (e) { - error.Timeout => { - return e; - }, - else => return e, - }; - return response orelse .{}; + try self.done.timedWait(timeout_ns); + defer self.deinit(); // only deinit on success. if we timed out proc will have to deinit + return self.response orelse .{}; } fn deinit(self: *Self) void { @@ -846,8 +839,13 @@ const CallContext = struct { } fn receive_(self: *Self, _: pid_ref, m: message) result { - defer self.done.set(); - self.response.* = m.clone(self.a) catch |e| return exit_error(e, @errorReturnTrace()); + defer { + const expired = self.from.expired(); + self.from.deinit(); + self.done.set(); + if (expired) self.deinit(); + } + self.response = m.clone(self.a) catch |e| return exit_error(e, @errorReturnTrace()); return exit_normal(); } };