diff --git a/src/thespian.zig b/src/thespian.zig index b3441b2..096f531 100644 --- a/src/thespian.zig +++ b/src/thespian.zig @@ -78,6 +78,16 @@ fn Pid(comptime own: Ownership) type { return CallContext.call(a, self.ref(), timeout_ns, message.fmt(request)); } + pub fn delay_send(self: Self, a: std.mem.Allocator, delay_us: u64, m: anytype) result { + var h = self.delay_send_cancellable(a, delay_us, m) catch |e| return exit_error(e); + h.deinit(); + } + + pub fn delay_send_cancellable(self: Self, a: std.mem.Allocator, delay_us: u64, m: anytype) error{ OutOfMemory, ThespianSpawnFailed }!Cancellable { + const msg = message.fmt(m); + return Cancellable.init(try DelayedSender.send(self, a, delay_us, msg)); + } + pub fn forward_error(self: Self, e: anyerror) result { return self.send_raw(switch (e) { error.Exit => .{ .buf = error_message() }, @@ -769,3 +779,67 @@ const CallContext = struct { return exit_normal(); } }; + +const DelayedSender = struct { + a: std.mem.Allocator, + target: pid, + message: ?message, + delay_us: u64, + timeout: timeout = undefined, + receiver: ReceiverT = undefined, + + const ReceiverT = Receiver(*DelayedSender); + + fn send(pid_: pid_ref, a: std.mem.Allocator, delay_us: u64, m: message) error{ OutOfMemory, ThespianSpawnFailed }!pid { + const self = try a.create(DelayedSender); + self.* = .{ + .a = a, + .target = pid_.clone(), + .message = try m.clone(a), + .delay_us = delay_us, + }; + return spawn_link(a, self, start, "delayed_sender"); + } + + fn start(self: *DelayedSender) result { + self.receiver = ReceiverT.init(receive_, self); + const m_ = self.message.?; + self.timeout = timeout.init(self.delay_us, m_) catch |e| return exit_error(e); + self.a.free(m_.buf); + _ = set_trap(true); + receive(&self.receiver); + } + + fn deinit(self: *DelayedSender) void { + self.timeout.deinit(); + self.target.deinit(); + } + + fn receive_(self: *DelayedSender, _: pid_ref, m_: message) result { + if (try m_.match(.{"CANCEL"})) { + self.timeout.cancel() catch |e| return exit_error(e); + return; + } + defer self.deinit(); + if (try m_.match(.{ "exit", "timeout_error", 125, any })) + return exit_normal(); + try self.target.send_raw(m_); + return exit_normal(); + } +}; + +pub const Cancellable = struct { + sender: pid, + + fn init(pid_: pid) Cancellable { + return .{ .sender = pid_ }; + } + + pub fn deinit(self: *Cancellable) void { + self.sender.deinit(); + } + + pub fn cancel(self: *Cancellable) result { + return self.sender.send(.{"CANCEL"}); + } +};