diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 1c64b6e..34aa096 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -440,6 +440,7 @@ const KQueueBackend = struct { shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread thread: ?std.Thread, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd + watches_mutex: std.Thread.Mutex, // Per-directory snapshots of filenames, used to diff on NOTE_WRITE. // Key: owned dir path (same as watches key), value: set of owned filenames. // Accessed from both the main thread (add_watch) and the background thread (scan_dir). @@ -477,7 +478,16 @@ const KQueueBackend = struct { .udata = 0, }; _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); - return .{ .handler = handler, .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} }; + return .{ + .handler = handler, + .kq = kq, + .shutdown_pipe = pipe, + .thread = null, + .watches = .empty, + .watches_mutex = .{}, + .snapshots = .empty, + .snapshots_mutex = .{}, + }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { @@ -506,12 +516,21 @@ const KQueueBackend = struct { fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, self.handler }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ + self.kq, + &self.watches, + &self.watches_mutex, + &self.snapshots, + &self.snapshots_mutex, + allocator, + self.handler, + }); } fn thread_fn( kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), + watches_mutex: *std.Thread.Mutex, snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), snapshots_mutex: *std.Thread.Mutex, allocator: std.mem.Allocator, @@ -525,18 +544,20 @@ const KQueueBackend = struct { if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter != EVFILT_VNODE) continue; // Find the directory path for this fd. + watches_mutex.lock(); var wit = watches.iterator(); - while (wit.next()) |entry| { - if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; - const dir_path = entry.key_ptr.*; - if (ev.fflags & NOTE_DELETE != 0) { - handler.change(dir_path, EventType.deleted) catch return; - } else if (ev.fflags & NOTE_RENAME != 0) { - handler.change(dir_path, EventType.renamed) catch return; - } else if (ev.fflags & NOTE_WRITE != 0) { - scan_dir(dir_path, snapshots, snapshots_mutex, allocator, handler) catch {}; - } - break; + const dir_path: ?[]const u8 = while (wit.next()) |entry| { + if (entry.value_ptr.* == @as(std.posix.fd_t, @intCast(ev.ident))) + break entry.key_ptr.*; + } else null; + watches_mutex.unlock(); + if (dir_path == null) continue; + if (ev.fflags & NOTE_DELETE != 0) { + handler.change(dir_path.?, EventType.deleted) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + handler.change(dir_path.?, EventType.renamed) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + scan_dir(dir_path.?, snapshots, snapshots_mutex, allocator, handler) catch {}; } } } @@ -617,7 +638,10 @@ const KQueueBackend = struct { } fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { - if (self.watches.contains(path)) return; + self.watches_mutex.lock(); + const already = self.watches.contains(path); + self.watches_mutex.unlock(); + if (already) return; const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { error.AccessDenied, error.PermissionDenied, @@ -669,8 +693,13 @@ const KQueueBackend = struct { }, }; const owned_path = try allocator.dupe(u8, path); - errdefer allocator.free(owned_path); - try self.watches.put(allocator, owned_path, path_fd); + self.watches_mutex.lock(); + self.watches.put(allocator, owned_path, path_fd) catch |e| { + self.watches_mutex.unlock(); + allocator.free(owned_path); + return e; + }; + self.watches_mutex.unlock(); // Take initial snapshot so first NOTE_WRITE has a baseline to diff against. self.take_snapshot(allocator, owned_path) catch |e| switch (e) { error.AccessDenied, @@ -704,7 +733,10 @@ const KQueueBackend = struct { } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - if (self.watches.fetchRemove(path)) |entry| { + self.watches_mutex.lock(); + const watches_entry = self.watches.fetchRemove(path); + self.watches_mutex.unlock(); + if (watches_entry) |entry| { std.posix.close(entry.value); allocator.free(entry.key); }