fix: prevent race on timeout in call()
This commit is contained in:
parent
b5da309fb4
commit
082e8fe292
1 changed files with 31 additions and 15 deletions
|
@ -689,49 +689,65 @@ const CallContext = struct {
|
||||||
receiver: ReceiverT,
|
receiver: ReceiverT,
|
||||||
to: pid_ref,
|
to: pid_ref,
|
||||||
request: message,
|
request: message,
|
||||||
response: ?message,
|
response: *?message,
|
||||||
a: std.mem.Allocator,
|
a: std.mem.Allocator,
|
||||||
mut: std.Thread.Mutex = std.Thread.Mutex{},
|
mut: std.Thread.Mutex = std.Thread.Mutex{},
|
||||||
cond: std.Thread.Condition = std.Thread.Condition{},
|
cond: std.Thread.Condition = std.Thread.Condition{},
|
||||||
|
timeout: bool = false,
|
||||||
|
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
const ReceiverT = Receiver(*Self);
|
const ReceiverT = Receiver(*Self);
|
||||||
|
|
||||||
pub fn call(a: std.mem.Allocator, to: pid_ref, timeout_ns: u64, request: message) error{ OutOfMemory, ThespianSpawnFailed, Timeout }!message {
|
pub fn call(a: std.mem.Allocator, to: pid_ref, timeout_ns: u64, request: message) error{ OutOfMemory, ThespianSpawnFailed, Timeout }!message {
|
||||||
var self: Self = undefined;
|
var response: ?message = null;
|
||||||
const rec = ReceiverT.init(receive_, &self);
|
var self = try a.create(Self);
|
||||||
|
self.* = .{
|
||||||
self = .{
|
.receiver = ReceiverT.init(receive_, self),
|
||||||
.receiver = rec,
|
|
||||||
.to = to,
|
.to = to,
|
||||||
.request = request,
|
.request = request,
|
||||||
.response = null,
|
.response = &response,
|
||||||
.a = a,
|
.a = a,
|
||||||
};
|
};
|
||||||
|
|
||||||
self.mut.lock();
|
self.mut.lock();
|
||||||
errdefer self.mut.unlock();
|
errdefer self.mut.unlock();
|
||||||
|
|
||||||
const proc = try spawn_link(a, &self, start, @typeName(Self));
|
const proc = try spawn_link(a, self, start, @typeName(Self));
|
||||||
defer proc.deinit();
|
defer proc.deinit();
|
||||||
|
|
||||||
try self.cond.timedWait(&self.mut, timeout_ns);
|
self.cond.timedWait(&self.mut, timeout_ns) catch |e| switch (e) {
|
||||||
|
error.Timeout => {
|
||||||
|
self.timeout = true;
|
||||||
|
return e;
|
||||||
|
},
|
||||||
|
else => return e,
|
||||||
|
};
|
||||||
|
|
||||||
return self.response orelse .{};
|
return response orelse .{};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deinit(self: *Self) void {
|
||||||
|
self.a.destroy(self);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start(self: *Self) result {
|
fn start(self: *Self) result {
|
||||||
errdefer self.cond.signal();
|
errdefer {
|
||||||
|
self.cond.signal();
|
||||||
|
self.deinit();
|
||||||
|
}
|
||||||
_ = set_trap(true);
|
_ = set_trap(true);
|
||||||
try self.to.link();
|
try self.to.link();
|
||||||
try self.to.send_raw(self.request);
|
try self.to.send_raw(self.request);
|
||||||
receive(&self.receiver);
|
receive(&self.receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn receive_(self: *Self, from: pid_ref, m: message) result {
|
fn receive_(self: *Self, _: pid_ref, m: message) result {
|
||||||
|
defer self.deinit();
|
||||||
defer self.cond.signal();
|
defer self.cond.signal();
|
||||||
_ = from;
|
self.mut.lock();
|
||||||
self.response = m.clone(self.a) catch |e| return exit_error(e);
|
defer self.mut.unlock();
|
||||||
try exit_normal();
|
if (!self.timeout)
|
||||||
|
self.response.* = m.clone(self.a) catch |e| return exit_error(e);
|
||||||
|
return exit_normal();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Add table
Reference in a new issue