feat: add pid.delay_send and pid.delay_send_cancellable
This commit is contained in:
parent
8218bda8e3
commit
f716b924e2
1 changed files with 74 additions and 0 deletions
|
@ -78,6 +78,16 @@ fn Pid(comptime own: Ownership) type {
|
|||
return CallContext.call(a, self.ref(), timeout_ns, message.fmt(request));
|
||||
}
|
||||
|
||||
pub fn delay_send(self: Self, a: std.mem.Allocator, delay_us: u64, m: anytype) result {
|
||||
var h = self.delay_send_cancellable(a, delay_us, m) catch |e| return exit_error(e);
|
||||
h.deinit();
|
||||
}
|
||||
|
||||
pub fn delay_send_cancellable(self: Self, a: std.mem.Allocator, delay_us: u64, m: anytype) error{ OutOfMemory, ThespianSpawnFailed }!Cancellable {
|
||||
const msg = message.fmt(m);
|
||||
return Cancellable.init(try DelayedSender.send(self, a, delay_us, msg));
|
||||
}
|
||||
|
||||
pub fn forward_error(self: Self, e: anyerror) result {
|
||||
return self.send_raw(switch (e) {
|
||||
error.Exit => .{ .buf = error_message() },
|
||||
|
@ -769,3 +779,67 @@ const CallContext = struct {
|
|||
return exit_normal();
|
||||
}
|
||||
};
|
||||
|
||||
const DelayedSender = struct {
|
||||
a: std.mem.Allocator,
|
||||
target: pid,
|
||||
message: ?message,
|
||||
delay_us: u64,
|
||||
timeout: timeout = undefined,
|
||||
receiver: ReceiverT = undefined,
|
||||
|
||||
const ReceiverT = Receiver(*DelayedSender);
|
||||
|
||||
fn send(pid_: pid_ref, a: std.mem.Allocator, delay_us: u64, m: message) error{ OutOfMemory, ThespianSpawnFailed }!pid {
|
||||
const self = try a.create(DelayedSender);
|
||||
self.* = .{
|
||||
.a = a,
|
||||
.target = pid_.clone(),
|
||||
.message = try m.clone(a),
|
||||
.delay_us = delay_us,
|
||||
};
|
||||
return spawn_link(a, self, start, "delayed_sender");
|
||||
}
|
||||
|
||||
fn start(self: *DelayedSender) result {
|
||||
self.receiver = ReceiverT.init(receive_, self);
|
||||
const m_ = self.message.?;
|
||||
self.timeout = timeout.init(self.delay_us, m_) catch |e| return exit_error(e);
|
||||
self.a.free(m_.buf);
|
||||
_ = set_trap(true);
|
||||
receive(&self.receiver);
|
||||
}
|
||||
|
||||
fn deinit(self: *DelayedSender) void {
|
||||
self.timeout.deinit();
|
||||
self.target.deinit();
|
||||
}
|
||||
|
||||
fn receive_(self: *DelayedSender, _: pid_ref, m_: message) result {
|
||||
if (try m_.match(.{"CANCEL"})) {
|
||||
self.timeout.cancel() catch |e| return exit_error(e);
|
||||
return;
|
||||
}
|
||||
defer self.deinit();
|
||||
if (try m_.match(.{ "exit", "timeout_error", 125, any }))
|
||||
return exit_normal();
|
||||
try self.target.send_raw(m_);
|
||||
return exit_normal();
|
||||
}
|
||||
};
|
||||
|
||||
pub const Cancellable = struct {
|
||||
sender: pid,
|
||||
|
||||
fn init(pid_: pid) Cancellable {
|
||||
return .{ .sender = pid_ };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *Cancellable) void {
|
||||
self.sender.deinit();
|
||||
}
|
||||
|
||||
pub fn cancel(self: *Cancellable) result {
|
||||
return self.sender.send(.{"CANCEL"});
|
||||
}
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue