fix: add a watches mutex to kqueue

This commit is contained in:
CJ van den Berg 2026-02-26 14:55:38 +01:00
parent 9679b0cedf
commit 8dc759db61
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -440,6 +440,7 @@ const KQueueBackend = struct {
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
watches_mutex: std.Thread.Mutex,
// Per-directory snapshots of filenames, used to diff on NOTE_WRITE. // Per-directory snapshots of filenames, used to diff on NOTE_WRITE.
// Key: owned dir path (same as watches key), value: set of owned filenames. // Key: owned dir path (same as watches key), value: set of owned filenames.
// Accessed from both the main thread (add_watch) and the background thread (scan_dir). // Accessed from both the main thread (add_watch) and the background thread (scan_dir).
@ -477,7 +478,16 @@ const KQueueBackend = struct {
.udata = 0, .udata = 0,
}; };
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null);
return .{ .handler = handler, .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} }; return .{
.handler = handler,
.kq = kq,
.shutdown_pipe = pipe,
.thread = null,
.watches = .empty,
.watches_mutex = .{},
.snapshots = .empty,
.snapshots_mutex = .{},
};
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
@ -506,12 +516,21 @@ const KQueueBackend = struct {
fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, self.handler }); self.thread = try std.Thread.spawn(.{}, thread_fn, .{
self.kq,
&self.watches,
&self.watches_mutex,
&self.snapshots,
&self.snapshots_mutex,
allocator,
self.handler,
});
} }
fn thread_fn( fn thread_fn(
kq: std.posix.fd_t, kq: std.posix.fd_t,
watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), watches: *const std.StringHashMapUnmanaged(std.posix.fd_t),
watches_mutex: *std.Thread.Mutex,
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex, snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
@ -525,18 +544,20 @@ const KQueueBackend = struct {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
// Find the directory path for this fd. // Find the directory path for this fd.
watches_mutex.lock();
var wit = watches.iterator(); var wit = watches.iterator();
while (wit.next()) |entry| { const dir_path: ?[]const u8 = while (wit.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; if (entry.value_ptr.* == @as(std.posix.fd_t, @intCast(ev.ident)))
const dir_path = entry.key_ptr.*; break entry.key_ptr.*;
if (ev.fflags & NOTE_DELETE != 0) { } else null;
handler.change(dir_path, EventType.deleted) catch return; watches_mutex.unlock();
} else if (ev.fflags & NOTE_RENAME != 0) { if (dir_path == null) continue;
handler.change(dir_path, EventType.renamed) catch return; if (ev.fflags & NOTE_DELETE != 0) {
} else if (ev.fflags & NOTE_WRITE != 0) { handler.change(dir_path.?, EventType.deleted) catch return;
scan_dir(dir_path, snapshots, snapshots_mutex, allocator, handler) catch {}; } else if (ev.fflags & NOTE_RENAME != 0) {
} handler.change(dir_path.?, EventType.renamed) catch return;
break; } else if (ev.fflags & NOTE_WRITE != 0) {
scan_dir(dir_path.?, snapshots, snapshots_mutex, allocator, handler) catch {};
} }
} }
} }
@ -617,7 +638,10 @@ const KQueueBackend = struct {
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void {
if (self.watches.contains(path)) return; self.watches_mutex.lock();
const already = self.watches.contains(path);
self.watches_mutex.unlock();
if (already) return;
const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) {
error.AccessDenied, error.AccessDenied,
error.PermissionDenied, error.PermissionDenied,
@ -669,8 +693,13 @@ const KQueueBackend = struct {
}, },
}; };
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path); self.watches_mutex.lock();
try self.watches.put(allocator, owned_path, path_fd); self.watches.put(allocator, owned_path, path_fd) catch |e| {
self.watches_mutex.unlock();
allocator.free(owned_path);
return e;
};
self.watches_mutex.unlock();
// Take initial snapshot so first NOTE_WRITE has a baseline to diff against. // Take initial snapshot so first NOTE_WRITE has a baseline to diff against.
self.take_snapshot(allocator, owned_path) catch |e| switch (e) { self.take_snapshot(allocator, owned_path) catch |e| switch (e) {
error.AccessDenied, error.AccessDenied,
@ -704,7 +733,10 @@ const KQueueBackend = struct {
} }
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
if (self.watches.fetchRemove(path)) |entry| { self.watches_mutex.lock();
const watches_entry = self.watches.fetchRemove(path);
self.watches_mutex.unlock();
if (watches_entry) |entry| {
std.posix.close(entry.value); std.posix.close(entry.value);
allocator.free(entry.key); allocator.free(entry.key);
} }