From 082e8fe29206e1f842ab7169ef07c5c144b35968 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Sat, 20 Apr 2024 23:24:38 +0200 Subject: [PATCH] fix: prevent race on timeout in call() --- src/thespian.zig | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/src/thespian.zig b/src/thespian.zig index cd90ddd..2874df3 100644 --- a/src/thespian.zig +++ b/src/thespian.zig @@ -689,49 +689,65 @@ const CallContext = struct { receiver: ReceiverT, to: pid_ref, request: message, - response: ?message, + response: *?message, a: std.mem.Allocator, mut: std.Thread.Mutex = std.Thread.Mutex{}, cond: std.Thread.Condition = std.Thread.Condition{}, + timeout: bool = false, const Self = @This(); const ReceiverT = Receiver(*Self); pub fn call(a: std.mem.Allocator, to: pid_ref, timeout_ns: u64, request: message) error{ OutOfMemory, ThespianSpawnFailed, Timeout }!message { - var self: Self = undefined; - const rec = ReceiverT.init(receive_, &self); - - self = .{ - .receiver = rec, + var response: ?message = null; + var self = try a.create(Self); + self.* = .{ + .receiver = ReceiverT.init(receive_, self), .to = to, .request = request, - .response = null, + .response = &response, .a = a, }; self.mut.lock(); errdefer self.mut.unlock(); - const proc = try spawn_link(a, &self, start, @typeName(Self)); + const proc = try spawn_link(a, self, start, @typeName(Self)); defer proc.deinit(); - try self.cond.timedWait(&self.mut, timeout_ns); + self.cond.timedWait(&self.mut, timeout_ns) catch |e| switch (e) { + error.Timeout => { + self.timeout = true; + return e; + }, + else => return e, + }; - return self.response orelse .{}; + return response orelse .{}; + } + + fn deinit(self: *Self) void { + self.a.destroy(self); } fn start(self: *Self) result { - errdefer self.cond.signal(); + errdefer { + self.cond.signal(); + self.deinit(); + } _ = set_trap(true); try self.to.link(); try self.to.send_raw(self.request); receive(&self.receiver); } - fn receive_(self: *Self, from: pid_ref, m: message) result { + fn receive_(self: *Self, _: pid_ref, m: message) result { + defer self.deinit(); defer self.cond.signal(); - _ = from; - self.response = m.clone(self.a) catch |e| return exit_error(e); - try exit_normal(); + self.mut.lock(); + defer self.mut.unlock(); + if (!self.timeout) + self.response.* = m.clone(self.a) catch |e| return exit_error(e); + return exit_normal(); } };