refactor: switch to blocking thread instead of polling

This commit is contained in:
CJ van den Berg 2026-02-20 16:54:38 +01:00
parent 740ad23c77
commit 30b8fc399b
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -73,7 +73,7 @@ const INotifyBackend = struct {
fd_watcher: tp.file_descriptor, fd_watcher: tp.file_descriptor,
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
const polling = false; const threaded = false;
const IN = std.os.linux.IN; const IN = std.os.linux.IN;
@ -98,8 +98,9 @@ const INotifyBackend = struct {
std.posix.close(self.inotify_fd); std.posix.close(self.inotify_fd);
} }
fn arm(self: *@This()) void { fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
self.fd_watcher.wait_read() catch {}; parent.deinit();
try self.fd_watcher.wait_read();
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
@ -217,16 +218,18 @@ const INotifyBackend = struct {
const KQueueBackend = struct { const KQueueBackend = struct {
kq: std.posix.fd_t, kq: std.posix.fd_t,
poll_timer: ?tp.timeout, shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
const polling = true; const threaded = true;
const poll_interval_ms: u64 = 50;
const EVFILT_VNODE: i16 = -4; const EVFILT_VNODE: i16 = -4;
const EVFILT_READ: i16 = -1;
const EV_ADD: u16 = 0x0001; const EV_ADD: u16 = 0x0001;
const EV_ENABLE: u16 = 0x0004; const EV_ENABLE: u16 = 0x0004;
const EV_CLEAR: u16 = 0x0020; const EV_CLEAR: u16 = 0x0020;
const EV_DELETE: u16 = 0x0002;
const NOTE_WRITE: u32 = 0x00000002; const NOTE_WRITE: u32 = 0x00000002;
const NOTE_DELETE: u32 = 0x00000004; const NOTE_DELETE: u32 = 0x00000004;
const NOTE_RENAME: u32 = 0x00000020; const NOTE_RENAME: u32 = 0x00000020;
@ -235,11 +238,32 @@ const KQueueBackend = struct {
fn init() !@This() { fn init() !@This() {
const kq = std.posix.kqueue() catch return error.FileWatcherFailed; const kq = std.posix.kqueue() catch return error.FileWatcherFailed;
return .{ .kq = kq, .poll_timer = null, .watches = .empty }; errdefer std.posix.close(kq);
const pipe = std.posix.pipe() catch return error.FileWatcherFailed;
errdefer {
std.posix.close(pipe[0]);
std.posix.close(pipe[1]);
}
// Register the read end of the shutdown pipe with kqueue so the thread
// wakes up when we want to shut down.
const shutdown_kev = std.posix.Kevent{
.ident = @intCast(pipe[0]),
.filter = EVFILT_READ,
.flags = EV_ADD | EV_ENABLE,
.fflags = 0,
.data = 0,
.udata = 0,
};
_ = std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null) catch return error.FileWatcherFailed;
return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty };
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
if (self.poll_timer) |*t| t.deinit(); // Signal the thread to exit by writing to the shutdown pipe.
_ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {};
if (self.thread) |t| t.join();
std.posix.close(self.shutdown_pipe[0]);
std.posix.close(self.shutdown_pipe[1]);
var it = self.watches.iterator(); var it = self.watches.iterator();
while (it.next()) |entry| { while (it.next()) |entry| {
std.posix.close(entry.value_ptr.*); std.posix.close(entry.value_ptr.*);
@ -249,9 +273,26 @@ const KQueueBackend = struct {
std.posix.close(self.kq); std.posix.close(self.kq);
} }
fn arm(self: *@This()) void { fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.poll_timer) |*t| t.deinit(); errdefer parent.deinit();
self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent });
}
fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void {
defer parent.deinit();
var events: [64]std.posix.Kevent = undefined;
while (true) {
// Block indefinitely until kqueue has events.
const n = std.posix.kevent(kq, &.{}, &events, null) catch break;
var has_vnode_events = false;
for (events[0..n]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter == EVFILT_VNODE) has_vnode_events = true;
}
if (has_vnode_events)
parent.send(.{"FW_event"}) catch break;
}
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
@ -285,6 +326,7 @@ const KQueueBackend = struct {
const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 }; const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 };
const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return; const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return;
for (events[0..n]) |ev| { for (events[0..n]) |ev| {
if (ev.filter != EVFILT_VNODE) continue;
var it = self.watches.iterator(); var it = self.watches.iterator();
while (it.next()) |entry| { while (it.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
@ -304,7 +346,7 @@ const KQueueBackend = struct {
}; };
const WindowsBackend = struct { const WindowsBackend = struct {
const polling = true; const threaded = true;
const windows = std.os.windows; const windows = std.os.windows;
const win32 = struct { const win32 = struct {
@ -335,13 +377,20 @@ const WindowsBackend = struct {
dwFlagsAndAttributes: windows.DWORD, dwFlagsAndAttributes: windows.DWORD,
hTemplateFile: ?windows.HANDLE, hTemplateFile: ?windows.HANDLE,
) callconv(.winapi) windows.HANDLE; ) callconv(.winapi) windows.HANDLE;
pub extern "kernel32" fn PostQueuedCompletionStatus(
CompletionPort: windows.HANDLE,
dwNumberOfBytesTransferred: windows.DWORD,
dwCompletionKey: windows.ULONG_PTR,
lpOverlapped: ?*windows.OVERLAPPED,
) callconv(.winapi) windows.BOOL;
}; };
iocp: windows.HANDLE, iocp: windows.HANDLE,
poll_timer: ?tp.timeout, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(Watch), watches: std.StringHashMapUnmanaged(Watch),
const poll_interval_ms: u64 = 50; // A completion key of zero is used to signal the background thread to exit.
const SHUTDOWN_KEY: windows.ULONG_PTR = 0;
const Watch = struct { const Watch = struct {
handle: windows.HANDLE, handle: windows.HANDLE,
@ -376,11 +425,13 @@ const WindowsBackend = struct {
fn init() !@This() { fn init() !@This() {
const iocp = windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1) catch const iocp = windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1) catch
return error.FileWatcherFailed; return error.FileWatcherFailed;
return .{ .iocp = iocp, .poll_timer = null, .watches = .empty }; return .{ .iocp = iocp, .thread = null, .watches = .empty };
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
if (self.poll_timer) |*t| t.deinit(); // Wake the background thread with a shutdown key, then wait for it.
_ = win32.PostQueuedCompletionStatus(self.iocp, 0, SHUTDOWN_KEY, null);
if (self.thread) |t| t.join();
var it = self.watches.iterator(); var it = self.watches.iterator();
while (it.next()) |entry| { while (it.next()) |entry| {
_ = win32.CloseHandle(entry.value_ptr.*.handle); _ = win32.CloseHandle(entry.value_ptr.*.handle);
@ -391,9 +442,23 @@ const WindowsBackend = struct {
_ = win32.CloseHandle(self.iocp); _ = win32.CloseHandle(self.iocp);
} }
fn arm(self: *@This()) void { fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.poll_timer) |*t| t.deinit(); errdefer parent.deinit();
self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
}
fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void {
defer parent.deinit();
var bytes: windows.DWORD = 0;
var key: windows.ULONG_PTR = 0;
var overlapped_ptr: ?*windows.OVERLAPPED = null;
while (true) {
// Block indefinitely until IOCP has a completion or shutdown signal.
const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE);
if (ok == 0 or key == SHUTDOWN_KEY) return;
parent.send(.{"FW_event"}) catch return;
}
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
@ -441,8 +506,9 @@ const WindowsBackend = struct {
var key: windows.ULONG_PTR = 0; var key: windows.ULONG_PTR = 0;
var overlapped_ptr: ?*windows.OVERLAPPED = null; var overlapped_ptr: ?*windows.OVERLAPPED = null;
while (true) { while (true) {
// Non-blocking drain, the blocking wait is done in the background thread.
const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0); const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0);
if (ok == 0 or overlapped_ptr == null) break; if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break;
const triggered_handle: windows.HANDLE = @ptrFromInt(key); const triggered_handle: windows.HANDLE = @ptrFromInt(key);
var it = self.watches.iterator(); var it = self.watches.iterator();
while (it.next()) |entry| { while (it.next()) |entry| {
@ -516,7 +582,7 @@ const Process = struct {
errdefer self.deinit(); errdefer self.deinit();
_ = tp.set_trap(true); _ = tp.set_trap(true);
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
self.backend.arm(); self.backend.arm(tp.self_pid().clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
tp.receive(&self.receiver); tp.receive(&self.receiver);
} }
@ -538,15 +604,12 @@ const Process = struct {
var err_code: i64 = 0; var err_code: i64 = 0;
var err_msg: []const u8 = undefined; var err_msg: []const u8 = undefined;
if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) {
self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e);
self.backend.arm(); } else if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
} else if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg });
self.backend.arm(); } else if (Backend.threaded and try cbor.match(m.buf, .{"FW_event"})) {
} else if (Backend.polling and try cbor.match(m.buf, .{"FW_poll"})) {
self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e);
self.backend.arm();
} else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) {
self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e);
} else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) { } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {