From b6d3780283bae5954d2b08eb2c7a8b9b445f59bc Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Fri, 20 Feb 2026 20:06:12 +0100 Subject: [PATCH] refactor: add directory snapshotting and diffing to kqueue backend --- src/nightwatch.zig | 144 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 19 deletions(-) diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 1f66d2b..373d53f 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -101,7 +101,7 @@ const INotifyBackend = struct { std.posix.close(self.inotify_fd); } - fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { + fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { parent.deinit(); try self.fd_watcher.wait_read(); } @@ -414,6 +414,11 @@ const KQueueBackend = struct { shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread thread: ?std.Thread, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd + // Per-directory snapshots of filenames, used to diff on NOTE_WRITE. + // Key: owned dir path (same as watches key), value: set of owned filenames. + // Accessed from both the main thread (add_watch) and the background thread (scan_dir). + snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: std.Thread.Mutex, const EVFILT_VNODE: i16 = -4; const EVFILT_READ: i16 = -1; @@ -446,7 +451,7 @@ const KQueueBackend = struct { .udata = 0, }; _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); - return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty }; + return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { @@ -461,16 +466,31 @@ const KQueueBackend = struct { allocator.free(entry.key_ptr.*); } self.watches.deinit(allocator); + var sit = self.snapshots.iterator(); + while (sit.next()) |entry| { + var names = entry.value_ptr.*; + var nit = names.iterator(); + while (nit.next()) |ne| allocator.free(ne.key_ptr.*); + names.deinit(allocator); + } + self.snapshots.deinit(allocator); std.posix.close(self.kq); } - fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent }); } - fn thread_fn(kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), parent: tp.pid) void { + fn thread_fn( + kq: std.posix.fd_t, + watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), + snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: *std.Thread.Mutex, + allocator: std.mem.Allocator, + parent: tp.pid, + ) void { defer parent.deinit(); var events: [64]std.posix.Kevent = undefined; while (true) { @@ -479,25 +499,86 @@ const KQueueBackend = struct { for (events[0..n]) |ev| { if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter != EVFILT_VNODE) continue; - var it = watches.iterator(); - while (it.next()) |entry| { + // Find the directory path for this fd. + var wit = watches.iterator(); + while (wit.next()) |entry| { if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; - const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) - .deleted - else if (ev.fflags & NOTE_RENAME != 0) - .renamed - else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) - .modified - else - break; - parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }) catch return; + const dir_path = entry.key_ptr.*; + if (ev.fflags & NOTE_DELETE != 0) { + parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {}; + } break; } } } } - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) (std.posix.OpenError || std.posix.KEventError || error{OutOfMemory})!void { + // Scan a directory and diff against the snapshot, emitting created/deleted events. + fn scan_dir( + dir_path: []const u8, + snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: *std.Thread.Mutex, + allocator: std.mem.Allocator, + parent: tp.pid, + ) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + + // Collect current filenames (no lock needed, reading filesystem only). + var current: std.StringHashMapUnmanaged(void) = .empty; + defer { + var it = current.iterator(); + while (it.next()) |e| allocator.free(e.key_ptr.*); + current.deinit(allocator); + } + var iter = dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .file) continue; + const name = try allocator.dupe(u8, entry.name); + try current.put(allocator, name, {}); + } + + snapshots_mutex.lock(); + defer snapshots_mutex.unlock(); + + // Get or create the snapshot for this directory. + const gop = try snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + + // Emit created events for files in current but not in snapshot. + var cit = current.iterator(); + while (cit.next()) |entry| { + if (snapshot.contains(entry.key_ptr.*)) continue; + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue; + try parent.send(.{ "FW", "change", full_path, EventType.created }); + const owned = try allocator.dupe(u8, entry.key_ptr.*); + try snapshot.put(allocator, owned, {}); + } + + // Emit deleted events for files in snapshot but not in current. + var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; + defer to_delete.deinit(allocator); + var sit = snapshot.iterator(); + while (sit.next()) |entry| { + if (current.contains(entry.key_ptr.*)) continue; + try to_delete.append(allocator, entry.key_ptr.*); + } + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try parent.send(.{ "FW", "change", full_path, EventType.deleted }); + _ = snapshot.fetchRemove(name); + allocator.free(name); + } + } + + fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { if (self.watches.contains(path)) return; const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0); errdefer std.posix.close(path_fd); @@ -513,6 +594,25 @@ const KQueueBackend = struct { const owned_path = try allocator.dupe(u8, path); errdefer allocator.free(owned_path); try self.watches.put(allocator, owned_path, path_fd); + // Take initial snapshot so first NOTE_WRITE has a baseline to diff against. + try self.take_snapshot(allocator, owned_path); + } + + fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + self.snapshots_mutex.lock(); + defer self.snapshots_mutex.unlock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + var snapshot = gop.value_ptr; + var iter = dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .file) continue; + if (snapshot.contains(entry.name)) continue; + const owned = try allocator.dupe(u8, entry.name); + try snapshot.put(allocator, owned, {}); + } } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { @@ -520,6 +620,12 @@ const KQueueBackend = struct { std.posix.close(entry.value); allocator.free(entry.key); } + if (self.snapshots.fetchRemove(path)) |entry| { + var names = entry.value; + var it = names.iterator(); + while (it.next()) |ne| allocator.free(ne.key_ptr.*); + names.deinit(allocator); + } } }; @@ -618,7 +724,7 @@ const WindowsBackend = struct { _ = win32.CloseHandle(self.iocp); } - fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent }); @@ -763,7 +869,7 @@ const Process = struct { errdefer self.deinit(); _ = tp.set_trap(true); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); - self.backend.arm(self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); + self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); tp.receive(&self.receiver); }