diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 72ba9d4..cf753a9 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -513,8 +513,10 @@ const KQueueBackend = struct { kq: std.posix.fd_t, shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread thread: ?std.Thread, - watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd + watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned dir path -> fd watches_mutex: std.Thread.Mutex, + file_watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned file path -> fd + file_watches_mutex: std.Thread.Mutex, // Per-directory snapshots of filenames, used to diff on NOTE_WRITE. // Key: owned dir path (same as watches key), value: set of owned filenames. // Accessed from both the main thread (add_watch) and the background thread (scan_dir). @@ -527,11 +529,11 @@ const KQueueBackend = struct { const EV_ENABLE: u16 = 0x0004; const EV_CLEAR: u16 = 0x0020; const EV_DELETE: u16 = 0x0002; + const NOTE_DELETE: u32 = 0x00000001; const NOTE_WRITE: u32 = 0x00000002; - const NOTE_DELETE: u32 = 0x00000004; - const NOTE_RENAME: u32 = 0x00000020; - const NOTE_ATTRIB: u32 = 0x00000008; const NOTE_EXTEND: u32 = 0x00000004; + const NOTE_ATTRIB: u32 = 0x00000008; + const NOTE_RENAME: u32 = 0x00000020; fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { const kq = try std.posix.kqueue(); @@ -559,6 +561,8 @@ const KQueueBackend = struct { .thread = null, .watches = .empty, .watches_mutex = .{}, + .file_watches = .empty, + .file_watches_mutex = .{}, .snapshots = .empty, .snapshots_mutex = .{}, }; @@ -576,6 +580,12 @@ const KQueueBackend = struct { allocator.free(entry.key_ptr.*); } self.watches.deinit(allocator); + var fit = self.file_watches.iterator(); + while (fit.next()) |entry| { + std.posix.close(entry.value_ptr.*); + allocator.free(entry.key_ptr.*); + } + self.file_watches.deinit(allocator); var sit = self.snapshots.iterator(); while (sit.next()) |entry| { // Keys are borrowed from self.watches and freed in the watches loop above. @@ -590,61 +600,53 @@ const KQueueBackend = struct { fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ - self.kq, - &self.watches, - &self.watches_mutex, - &self.snapshots, - &self.snapshots_mutex, - allocator, - self.handler, - }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); } - fn thread_fn( - kq: std.posix.fd_t, - watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), - watches_mutex: *std.Thread.Mutex, - snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), - snapshots_mutex: *std.Thread.Mutex, - allocator: std.mem.Allocator, - handler: *Handler, - ) void { + fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { var events: [64]std.posix.Kevent = undefined; while (true) { // Block indefinitely until kqueue has events. - const n = std.posix.kevent(kq, &.{}, &events, null) catch break; + const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; for (events[0..n]) |ev| { if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter != EVFILT_VNODE) continue; - // Find the directory path for this fd. - watches_mutex.lock(); - var wit = watches.iterator(); - const dir_path: ?[]const u8 = while (wit.next()) |entry| { - if (entry.value_ptr.* == @as(std.posix.fd_t, @intCast(ev.ident))) - break entry.key_ptr.*; + const fd: std.posix.fd_t = @intCast(ev.ident); + + // Check if this is a file watch: NOTE_WRITE/NOTE_EXTEND → modified. + self.file_watches_mutex.lock(); + var fwit = self.file_watches.iterator(); + const file_path: ?[]const u8 = while (fwit.next()) |entry| { + if (entry.value_ptr.* == fd) break entry.key_ptr.*; } else null; - watches_mutex.unlock(); + self.file_watches_mutex.unlock(); + if (file_path) |fp| { + if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) + self.handler.change(fp, EventType.modified) catch return; + continue; + } + + // Otherwise look up the directory path for this fd. + self.watches_mutex.lock(); + var wit = self.watches.iterator(); + const dir_path: ?[]const u8 = while (wit.next()) |entry| { + if (entry.value_ptr.* == fd) break entry.key_ptr.*; + } else null; + self.watches_mutex.unlock(); if (dir_path == null) continue; if (ev.fflags & NOTE_DELETE != 0) { - handler.change(dir_path.?, EventType.deleted) catch return; + self.handler.change(dir_path.?, EventType.deleted) catch return; } else if (ev.fflags & NOTE_RENAME != 0) { - handler.change(dir_path.?, EventType.renamed) catch return; + self.handler.change(dir_path.?, EventType.renamed) catch return; } else if (ev.fflags & NOTE_WRITE != 0) { - scan_dir(dir_path.?, snapshots, snapshots_mutex, allocator, handler) catch {}; + self.scan_dir(allocator, dir_path.?) catch {}; } } } } // Scan a directory and diff against the snapshot, emitting created/deleted events. - fn scan_dir( - dir_path: []const u8, - snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), - snapshots_mutex: *std.Thread.Mutex, - allocator: std.mem.Allocator, - handler: *Handler, - ) !void { + fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; defer dir.close(); @@ -683,17 +685,17 @@ const KQueueBackend = struct { var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; defer new_dirs.deinit(allocator); - snapshots_mutex.lock(); + self.snapshots_mutex.lock(); { for (current_dirs.items) |name| { var path_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - if (!snapshots.contains(full_path)) + if (!self.snapshots.contains(full_path)) try new_dirs.append(allocator, full_path); } - const gop = snapshots.getOrPut(allocator, dir_path) catch |e| { - snapshots_mutex.unlock(); + const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { + self.snapshots_mutex.unlock(); return e; }; if (!gop.found_existing) gop.value_ptr.* = .empty; @@ -703,12 +705,12 @@ const KQueueBackend = struct { while (cit.next()) |entry| { if (snapshot.contains(entry.key_ptr.*)) continue; const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { - snapshots_mutex.unlock(); + self.snapshots_mutex.unlock(); return e; }; snapshot.put(allocator, owned, {}) catch |e| { allocator.free(owned); - snapshots_mutex.unlock(); + self.snapshots_mutex.unlock(); return e; }; try to_create.append(allocator, owned); @@ -721,21 +723,70 @@ const KQueueBackend = struct { } for (to_delete.items) |name| _ = snapshot.fetchRemove(name); } - snapshots_mutex.unlock(); + self.snapshots_mutex.unlock(); // Emit all events outside the lock so handlers may safely call watch()/unwatch(). + // Emit dir_created, then deletions, then creations. Deletions first ensures that + // a rename (old disappears, new appears) reports the source path before the dest. for (new_dirs.items) |full_path| - try handler.change(full_path, EventType.dir_created); + try self.handler.change(full_path, EventType.dir_created); + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { + allocator.free(name); + continue; + }; + self.deregister_file_watch(allocator, full_path); + try self.handler.change(full_path, EventType.deleted); + allocator.free(name); + } for (to_create.items) |name| { var path_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - try handler.change(full_path, EventType.created); + self.register_file_watch(allocator, full_path); + try self.handler.change(full_path, EventType.created); } - for (to_delete.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - try handler.change(full_path, EventType.deleted); - allocator.free(name); + } + + fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.file_watches_mutex.lock(); + const already = self.file_watches.contains(path); + self.file_watches_mutex.unlock(); + if (already) return; + const fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return; + const kev = std.posix.Kevent{ + .ident = @intCast(fd), + .filter = EVFILT_VNODE, + .flags = EV_ADD | EV_ENABLE | EV_CLEAR, + .fflags = NOTE_WRITE | NOTE_EXTEND, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch { + std.posix.close(fd); + return; + }; + const owned = allocator.dupe(u8, path) catch { + std.posix.close(fd); + return; + }; + self.file_watches_mutex.lock(); + self.file_watches.put(allocator, owned, fd) catch { + self.file_watches_mutex.unlock(); + std.posix.close(fd); + allocator.free(owned); + return; + }; + self.file_watches_mutex.unlock(); + } + + fn deregister_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.file_watches_mutex.lock(); + const kv = self.file_watches.fetchRemove(path); + self.file_watches_mutex.unlock(); + if (kv) |entry| { + std.posix.close(entry.value); + allocator.free(entry.key); } } @@ -820,18 +871,33 @@ const KQueueBackend = struct { fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; defer dir.close(); - self.snapshots_mutex.lock(); - defer self.snapshots_mutex.unlock(); - const gop = try self.snapshots.getOrPut(allocator, dir_path); - if (!gop.found_existing) gop.value_ptr.* = .empty; - var snapshot = gop.value_ptr; + // Collect file names first so we can register file watches without holding the lock. + var names: std.ArrayListUnmanaged([]u8) = .empty; + defer { + for (names.items) |n| allocator.free(n); + names.deinit(allocator); + } var iter = dir.iterate(); while (try iter.next()) |entry| { if (entry.kind != .file) continue; - if (snapshot.contains(entry.name)) continue; - const owned = try allocator.dupe(u8, entry.name); + try names.append(allocator, try allocator.dupe(u8, entry.name)); + } + self.snapshots_mutex.lock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + var snapshot = gop.value_ptr; + for (names.items) |name| { + if (snapshot.contains(name)) continue; + const owned = try allocator.dupe(u8, name); try snapshot.put(allocator, owned, {}); } + self.snapshots_mutex.unlock(); + // Register a kqueue watch for each existing file so writes are detected. + for (names.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + self.register_file_watch(allocator, full_path); + } } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { @@ -845,7 +911,15 @@ const KQueueBackend = struct { if (self.snapshots.fetchRemove(path)) |entry| { var names = entry.value; var it = names.iterator(); - while (it.next()) |ne| allocator.free(ne.key_ptr.*); + while (it.next()) |ne| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ path, ne.key_ptr.* }) catch { + allocator.free(ne.key_ptr.*); + continue; + }; + self.deregister_file_watch(allocator, full_path); + allocator.free(ne.key_ptr.*); + } names.deinit(allocator); } }