refactor: direct send from kqueue watcher background thread

This commit is contained in:
CJ van den Berg 2026-02-20 19:18:38 +01:00
parent 2e38bbf4ee
commit da6a759583
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -419,7 +419,7 @@ const KQueueBackend = struct {
thread: ?std.Thread, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
const threaded = true; const threaded = false; // events processed directly in thread_fn; no FW_event needed
const EVFILT_VNODE: i16 = -4; const EVFILT_VNODE: i16 = -4;
const EVFILT_READ: i16 = -1; const EVFILT_READ: i16 = -1;
@ -473,22 +473,33 @@ const KQueueBackend = struct {
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
errdefer parent.deinit(); errdefer parent.deinit();
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent }); self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, parent });
} }
fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void { fn thread_fn(kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), parent: tp.pid) void {
defer parent.deinit(); defer parent.deinit();
var events: [64]std.posix.Kevent = undefined; var events: [64]std.posix.Kevent = undefined;
while (true) { while (true) {
// Block indefinitely until kqueue has events. // Block indefinitely until kqueue has events.
const n = std.posix.kevent(kq, &.{}, &events, null) catch break; const n = std.posix.kevent(kq, &.{}, &events, null) catch break;
var has_vnode_events = false;
for (events[0..n]) |ev| { for (events[0..n]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter == EVFILT_VNODE) has_vnode_events = true; if (ev.filter != EVFILT_VNODE) continue;
var it = watches.iterator();
while (it.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0)
.deleted
else if (ev.fflags & NOTE_RENAME != 0)
.renamed
else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0)
.modified
else
break;
parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }) catch return;
break;
}
} }
if (has_vnode_events)
parent.send(.{"FW_event"}) catch break;
} }
} }
@ -517,28 +528,8 @@ const KQueueBackend = struct {
} }
} }
fn drain(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) tp.result { fn drain(_: *@This(), _: std.mem.Allocator, _: tp.pid_ref) tp.result {
_ = allocator; // Events are sent directly from thread_fn; nothing to do here.
var events: [64]std.posix.Kevent = undefined;
const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 };
const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return;
for (events[0..n]) |ev| {
if (ev.filter != EVFILT_VNODE) continue;
var it = self.watches.iterator();
while (it.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0)
.deleted
else if (ev.fflags & NOTE_RENAME != 0)
.renamed
else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0)
.modified
else
continue;
try parent.send(.{ "FW", "change", entry.key_ptr.*, event_type });
break;
}
}
} }
}; };