From 54e047f63cddcb850755c01087ef6d0ffa3492b0 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Fri, 20 Feb 2026 16:54:38 +0100 Subject: [PATCH] refactor: switch to blocking thread instead of watcher polling --- src/file_watcher.zig | 117 +++++++++++++++++++++++++++++++++---------- 1 file changed, 90 insertions(+), 27 deletions(-) diff --git a/src/file_watcher.zig b/src/file_watcher.zig index 9901c98..ec4097e 100644 --- a/src/file_watcher.zig +++ b/src/file_watcher.zig @@ -73,7 +73,7 @@ const INotifyBackend = struct { fd_watcher: tp.file_descriptor, watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path - const polling = false; + const threaded = false; const IN = std.os.linux.IN; @@ -98,8 +98,9 @@ const INotifyBackend = struct { std.posix.close(self.inotify_fd); } - fn arm(self: *@This()) void { - self.fd_watcher.wait_read() catch {}; + fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { + parent.deinit(); + try self.fd_watcher.wait_read(); } fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { @@ -217,16 +218,18 @@ const INotifyBackend = struct { const KQueueBackend = struct { kq: std.posix.fd_t, - poll_timer: ?tp.timeout, + shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread + thread: ?std.Thread, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd - const polling = true; - const poll_interval_ms: u64 = 50; + const threaded = true; const EVFILT_VNODE: i16 = -4; + const EVFILT_READ: i16 = -1; const EV_ADD: u16 = 0x0001; const EV_ENABLE: u16 = 0x0004; const EV_CLEAR: u16 = 0x0020; + const EV_DELETE: u16 = 0x0002; const NOTE_WRITE: u32 = 0x00000002; const NOTE_DELETE: u32 = 0x00000004; const NOTE_RENAME: u32 = 0x00000020; @@ -235,11 +238,32 @@ const KQueueBackend = struct { fn init() !@This() { const kq = std.posix.kqueue() catch return error.FileWatcherFailed; - return .{ .kq = kq, .poll_timer = null, .watches = .empty }; + errdefer std.posix.close(kq); + const pipe = std.posix.pipe() catch return error.FileWatcherFailed; + errdefer { + std.posix.close(pipe[0]); + std.posix.close(pipe[1]); + } + // Register the read end of the shutdown pipe with kqueue so the thread + // wakes up when we want to shut down. + const shutdown_kev = std.posix.Kevent{ + .ident = @intCast(pipe[0]), + .filter = EVFILT_READ, + .flags = EV_ADD | EV_ENABLE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null) catch return error.FileWatcherFailed; + return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - if (self.poll_timer) |*t| t.deinit(); + // Signal the thread to exit by writing to the shutdown pipe. + _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; + if (self.thread) |t| t.join(); + std.posix.close(self.shutdown_pipe[0]); + std.posix.close(self.shutdown_pipe[1]); var it = self.watches.iterator(); while (it.next()) |entry| { std.posix.close(entry.value_ptr.*); @@ -249,9 +273,26 @@ const KQueueBackend = struct { std.posix.close(self.kq); } - fn arm(self: *@This()) void { - if (self.poll_timer) |*t| t.deinit(); - self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null; + fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + errdefer parent.deinit(); + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent }); + } + + fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void { + defer parent.deinit(); + var events: [64]std.posix.Kevent = undefined; + while (true) { + // Block indefinitely until kqueue has events. + const n = std.posix.kevent(kq, &.{}, &events, null) catch break; + var has_vnode_events = false; + for (events[0..n]) |ev| { + if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit + if (ev.filter == EVFILT_VNODE) has_vnode_events = true; + } + if (has_vnode_events) + parent.send(.{"FW_event"}) catch break; + } } fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { @@ -285,6 +326,7 @@ const KQueueBackend = struct { const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 }; const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return; for (events[0..n]) |ev| { + if (ev.filter != EVFILT_VNODE) continue; var it = self.watches.iterator(); while (it.next()) |entry| { if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; @@ -304,7 +346,7 @@ const KQueueBackend = struct { }; const WindowsBackend = struct { - const polling = true; + const threaded = true; const windows = std.os.windows; const win32 = struct { @@ -335,13 +377,20 @@ const WindowsBackend = struct { dwFlagsAndAttributes: windows.DWORD, hTemplateFile: ?windows.HANDLE, ) callconv(.winapi) windows.HANDLE; + pub extern "kernel32" fn PostQueuedCompletionStatus( + CompletionPort: windows.HANDLE, + dwNumberOfBytesTransferred: windows.DWORD, + dwCompletionKey: windows.ULONG_PTR, + lpOverlapped: ?*windows.OVERLAPPED, + ) callconv(.winapi) windows.BOOL; }; iocp: windows.HANDLE, - poll_timer: ?tp.timeout, + thread: ?std.Thread, watches: std.StringHashMapUnmanaged(Watch), - const poll_interval_ms: u64 = 50; + // A completion key of zero is used to signal the background thread to exit. + const SHUTDOWN_KEY: windows.ULONG_PTR = 0; const Watch = struct { handle: windows.HANDLE, @@ -376,11 +425,13 @@ const WindowsBackend = struct { fn init() !@This() { const iocp = windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1) catch return error.FileWatcherFailed; - return .{ .iocp = iocp, .poll_timer = null, .watches = .empty }; + return .{ .iocp = iocp, .thread = null, .watches = .empty }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - if (self.poll_timer) |*t| t.deinit(); + // Wake the background thread with a shutdown key, then wait for it. + _ = win32.PostQueuedCompletionStatus(self.iocp, 0, SHUTDOWN_KEY, null); + if (self.thread) |t| t.join(); var it = self.watches.iterator(); while (it.next()) |entry| { _ = win32.CloseHandle(entry.value_ptr.*.handle); @@ -391,9 +442,23 @@ const WindowsBackend = struct { _ = win32.CloseHandle(self.iocp); } - fn arm(self: *@This()) void { - if (self.poll_timer) |*t| t.deinit(); - self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null; + fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + errdefer parent.deinit(); + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent }); + } + + fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void { + defer parent.deinit(); + var bytes: windows.DWORD = 0; + var key: windows.ULONG_PTR = 0; + var overlapped_ptr: ?*windows.OVERLAPPED = null; + while (true) { + // Block indefinitely until IOCP has a completion or shutdown signal. + const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE); + if (ok == 0 or key == SHUTDOWN_KEY) return; + parent.send(.{"FW_event"}) catch return; + } } fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { @@ -441,8 +506,9 @@ const WindowsBackend = struct { var key: windows.ULONG_PTR = 0; var overlapped_ptr: ?*windows.OVERLAPPED = null; while (true) { + // Non-blocking drain, the blocking wait is done in the background thread. const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0); - if (ok == 0 or overlapped_ptr == null) break; + if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break; const triggered_handle: windows.HANDLE = @ptrFromInt(key); var it = self.watches.iterator(); while (it.next()) |entry| { @@ -516,7 +582,7 @@ const Process = struct { errdefer self.deinit(); _ = tp.set_trap(true); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); - self.backend.arm(); + self.backend.arm(tp.self_pid().clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); tp.receive(&self.receiver); } @@ -538,15 +604,12 @@ const Process = struct { var err_code: i64 = 0; var err_msg: []const u8 = undefined; - if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { + if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); - self.backend.arm(); - } else if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { + } else if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); - self.backend.arm(); - } else if (Backend.polling and try cbor.match(m.buf, .{"FW_poll"})) { + } else if (Backend.threaded and try cbor.match(m.buf, .{"FW_event"})) { self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); - self.backend.arm(); } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {