From da6a759583fcc369e02a52330533d0460aab0ec6 Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Fri, 20 Feb 2026 19:18:38 +0100 Subject: [PATCH] refactor: direct send from kqueue watcher background thread --- src/file_watcher.zig | 49 ++++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/src/file_watcher.zig b/src/file_watcher.zig index f0141f8..e159ae1 100644 --- a/src/file_watcher.zig +++ b/src/file_watcher.zig @@ -419,7 +419,7 @@ const KQueueBackend = struct { thread: ?std.Thread, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd - const threaded = true; + const threaded = false; // events processed directly in thread_fn; no FW_event needed const EVFILT_VNODE: i16 = -4; const EVFILT_READ: i16 = -1; @@ -473,22 +473,33 @@ const KQueueBackend = struct { fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, parent }); } - fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void { + fn thread_fn(kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), parent: tp.pid) void { defer parent.deinit(); var events: [64]std.posix.Kevent = undefined; while (true) { // Block indefinitely until kqueue has events. const n = std.posix.kevent(kq, &.{}, &events, null) catch break; - var has_vnode_events = false; for (events[0..n]) |ev| { if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit - if (ev.filter == EVFILT_VNODE) has_vnode_events = true; + if (ev.filter != EVFILT_VNODE) continue; + var it = watches.iterator(); + while (it.next()) |entry| { + if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; + const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) + .deleted + else if (ev.fflags & NOTE_RENAME != 0) + .renamed + else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) + .modified + else + break; + parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }) catch return; + break; + } } - if (has_vnode_events) - parent.send(.{"FW_event"}) catch break; } } @@ -517,28 +528,8 @@ const KQueueBackend = struct { } } - fn drain(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) tp.result { - _ = allocator; - var events: [64]std.posix.Kevent = undefined; - const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 }; - const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return; - for (events[0..n]) |ev| { - if (ev.filter != EVFILT_VNODE) continue; - var it = self.watches.iterator(); - while (it.next()) |entry| { - if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; - const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) - .deleted - else if (ev.fflags & NOTE_RENAME != 0) - .renamed - else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) - .modified - else - continue; - try parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }); - break; - } - } + fn drain(_: *@This(), _: std.mem.Allocator, _: tp.pid_ref) tp.result { + // Events are sent directly from thread_fn; nothing to do here. } };