fix: prevent another race in call()
This commit is contained in:
parent
e5983cdc8f
commit
a98cb1f5e6
1 changed files with 14 additions and 25 deletions
|
@ -691,38 +691,34 @@ const CallContext = struct {
|
|||
request: message,
|
||||
response: *?message,
|
||||
a: std.mem.Allocator,
|
||||
mut: std.Thread.Mutex = std.Thread.Mutex{},
|
||||
cond: std.Thread.Condition = std.Thread.Condition{},
|
||||
timeout: bool = false,
|
||||
done: std.Thread.ResetEvent = .{},
|
||||
|
||||
const Self = @This();
|
||||
const ReceiverT = Receiver(*Self);
|
||||
|
||||
pub fn call(a: std.mem.Allocator, to: pid_ref, timeout_ns: u64, request: message) error{ OutOfMemory, ThespianSpawnFailed, Timeout }!message {
|
||||
var response: ?message = null;
|
||||
var self = try a.create(Self);
|
||||
self.* = .{
|
||||
.receiver = ReceiverT.init(receive_, self),
|
||||
var self: Self = .{
|
||||
.receiver = undefined,
|
||||
.to = to,
|
||||
.request = request,
|
||||
.response = &response,
|
||||
.a = a,
|
||||
};
|
||||
|
||||
self.mut.lock();
|
||||
errdefer self.mut.unlock();
|
||||
|
||||
const proc = try spawn_link(a, self, start, @typeName(Self));
|
||||
self.receiver = ReceiverT.init(receive_, &self);
|
||||
const proc = try spawn_link(a, &self, start, @typeName(Self));
|
||||
defer proc.deinit();
|
||||
|
||||
self.cond.timedWait(&self.mut, timeout_ns) catch |e| switch (e) {
|
||||
errdefer {
|
||||
proc.send(.{"timeout"}) catch @panic("CallContext.send.timeout");
|
||||
self.done.wait();
|
||||
if (response) |resp| a.free(resp.buf);
|
||||
}
|
||||
self.done.timedWait(timeout_ns) catch |e| switch (e) {
|
||||
error.Timeout => {
|
||||
self.timeout = true;
|
||||
return e;
|
||||
},
|
||||
else => return e,
|
||||
};
|
||||
|
||||
return response orelse .{};
|
||||
}
|
||||
|
||||
|
@ -731,10 +727,7 @@ const CallContext = struct {
|
|||
}
|
||||
|
||||
fn start(self: *Self) result {
|
||||
errdefer {
|
||||
self.cond.signal();
|
||||
self.deinit();
|
||||
}
|
||||
errdefer self.done.set();
|
||||
_ = set_trap(true);
|
||||
try self.to.link();
|
||||
try self.to.send_raw(self.request);
|
||||
|
@ -742,12 +735,8 @@ const CallContext = struct {
|
|||
}
|
||||
|
||||
fn receive_(self: *Self, _: pid_ref, m: message) result {
|
||||
defer self.deinit();
|
||||
defer self.cond.signal();
|
||||
self.mut.lock();
|
||||
defer self.mut.unlock();
|
||||
if (!self.timeout)
|
||||
self.response.* = m.clone(self.a) catch |e| return exit_error(e);
|
||||
defer self.done.set();
|
||||
self.response.* = m.clone(self.a) catch |e| return exit_error(e);
|
||||
return exit_normal();
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue