diff --git a/README.md b/README.md index ade2a53..03c6ffc 100644 --- a/README.md +++ b/README.md @@ -42,12 +42,37 @@ It simply keeps watch. ### Platform backends -| Platform | Backend | Notes | -| --------------------------------------- | ------------------------------------------------------ | ------------------------------------------------------------------ | -| Linux | inotify | Threaded mode (default) or poll mode (`-Dlinux_read_thread=false`) | -| macOS | kqueue (default) or FSEvents (`-Dmacos_fsevents=true`) | FSEvents requires Xcode frameworks | -| FreeBSD, OpenBSD, NetBSD, DragonFly BSD | kqueue | | -| Windows | ReadDirectoryChangesW | | +| Platform | Backend | Notes | +| ------------------------------------------------ | ------------------------------------------------------ | ------------------------------------------------------------------ | +| Linux | inotify | Threaded mode (default) or poll mode (`-Dlinux_read_thread=false`) | +| macOS | kqueue (default) or FSEvents (`-Dmacos_fsevents=true`) | FSEvents requires Xcode frameworks | +| macOS (opt-in) | kqueue dir-only (`-Dkqueue_dir_only=true`) | Low fd usage; see note below | +| FreeBSD, OpenBSD, NetBSD, DragonFly BSD | kqueue | | +| FreeBSD, OpenBSD, NetBSD, DragonFly BSD (opt-in) | kqueue dir-only (`-Dkqueue_dir_only=true`) | Low fd usage; see note below | +| Windows | ReadDirectoryChangesW | | + +#### `kqueue_dir_only` mode + +By default the kqueue backend opens one file descriptor per watched _file_ +in order to detect `modified` events in real time via `EVFILT_VNODE`. At +scale (e.g. 500k files) this exhausts the process fd limit. + +Build with `-Dkqueue_dir_only=true` to use directory-only kqueue watches +instead. This drops fd usage from O(files) to O(directories) and removes +the `setrlimit` call. The trade-off: + +- **`modified` events are not generated reliably.** The backend detects + file modifications opportunistically by comparing mtimes during a + directory scan, which only runs when a directory entry changes (file + created, deleted, or renamed). A pure content write to an existing file + - with no sibling changes - will not trigger a scan and the + modification will be missed until the next scan. + +- **Workaround:** Watch individual files directly (e.g. + `watcher.watch("/path/to/file.txt")`). When a path passed to `watch()` is + a regular file, `kqueue_dir_only` mode attaches a per-file kqueue watch + and emits real-time `modified` events exactly like the default backend. + Only _directory tree_ watches are affected by the limitation above. --- diff --git a/build.zig b/build.zig index 0edd98e..5c9fcd8 100644 --- a/build.zig +++ b/build.zig @@ -18,9 +18,16 @@ pub fn build(b: *std.Build) void { "Use a background thread on Linux (like macOS/Windows) instead of requiring the caller to drive the event loop via poll_fd/handle_read_ready", ) orelse true; + const kqueue_dir_only = b.option( + bool, + "kqueue_dir_only", + "Use directory-only kqueue watches (lower fd usage, no real-time file modification detection). Default: false", + ) orelse false; + const options = b.addOptions(); options.addOption(bool, "macos_fsevents", macos_fsevents); options.addOption(bool, "linux_read_thread", linux_read_thread); + options.addOption(bool, "kqueue_dir_only", kqueue_dir_only); const options_mod = options.createModule(); const mod = b.addModule("nightwatch", .{ diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 23e241a..27ea5ad 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -30,6 +30,17 @@ pub const Error = error{ /// and on Linux when the `linux_read_thread` build option is set. pub const linux_poll_mode = builtin.os.tag == .linux and !build_options.linux_read_thread; +/// True if the current backend detects file content modifications in real time. +/// False only when kqueue_dir_only=true, where directory-level watches are used +/// and file writes do not trigger a directory NOTE_WRITE event. +pub const detects_file_modifications = switch (builtin.os.tag) { + .linux => true, + .macos => !build_options.kqueue_dir_only or build_options.macos_fsevents, + .freebsd, .openbsd, .netbsd, .dragonfly => !build_options.kqueue_dir_only, + .windows => true, + else => false, +}; + pub const Handler = struct { vtable: *const VTable, @@ -179,8 +190,8 @@ fn recurse_watch(backend: *Backend, allocator: std.mem.Allocator, dir_path: []co const Backend = switch (builtin.os.tag) { .linux => INotifyBackend, - .macos => if (build_options.macos_fsevents) FSEventsBackend else KQueueBackend, - .freebsd, .openbsd, .netbsd, .dragonfly => KQueueBackend, + .macos => if (build_options.macos_fsevents) FSEventsBackend else if (build_options.kqueue_dir_only) KQueueDirBackend else KQueueBackend, + .freebsd, .openbsd, .netbsd, .dragonfly => if (build_options.kqueue_dir_only) KQueueDirBackend else KQueueBackend, .windows => WindowsBackend, else => @compileError("file_watcher: unsupported OS"), }; @@ -1068,6 +1079,364 @@ const KQueueBackend = struct { } }; +const KQueueDirBackend = struct { + const watches_recursively = false; + const WatchEntry = struct { fd: std.posix.fd_t, is_file: bool }; + + handler: *Handler, + kq: std.posix.fd_t, + shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread + thread: ?std.Thread, + watches: std.StringHashMapUnmanaged(WatchEntry), // owned path -> {fd, is_file} + watches_mutex: std.Thread.Mutex, + // Per-directory snapshots: owned filename -> mtime_ns. + // Used to diff on NOTE_WRITE: detects creates, deletes, and (opportunistically) + // modifications when the same directory fires another event later. + // Key: owned dir path (same as watches key), value: map of owned filename -> mtime_ns. + snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(i128)), + snapshots_mutex: std.Thread.Mutex, + + const EVFILT_VNODE: i16 = -4; + const EVFILT_READ: i16 = -1; + const EV_ADD: u16 = 0x0001; + const EV_ENABLE: u16 = 0x0004; + const EV_CLEAR: u16 = 0x0020; + const EV_DELETE: u16 = 0x0002; + const NOTE_DELETE: u32 = 0x00000001; + const NOTE_WRITE: u32 = 0x00000002; + const NOTE_EXTEND: u32 = 0x00000004; + const NOTE_ATTRIB: u32 = 0x00000008; + const NOTE_RENAME: u32 = 0x00000020; + + fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { + const kq = try std.posix.kqueue(); + errdefer std.posix.close(kq); + const pipe = try std.posix.pipe(); + errdefer { + std.posix.close(pipe[0]); + std.posix.close(pipe[1]); + } + // Register the read end of the shutdown pipe with kqueue so the thread + // wakes up when we want to shut down. + const shutdown_kev = std.posix.Kevent{ + .ident = @intCast(pipe[0]), + .filter = EVFILT_READ, + .flags = EV_ADD | EV_ENABLE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); + return .{ + .handler = handler, + .kq = kq, + .shutdown_pipe = pipe, + .thread = null, + .watches = .empty, + .watches_mutex = .{}, + .snapshots = .empty, + .snapshots_mutex = .{}, + }; + } + + fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + // Signal the thread to exit by writing to the shutdown pipe. + _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; + if (self.thread) |t| t.join(); + std.posix.close(self.shutdown_pipe[0]); + std.posix.close(self.shutdown_pipe[1]); + var it = self.watches.iterator(); + while (it.next()) |entry| { + std.posix.close(entry.value_ptr.*.fd); + allocator.free(entry.key_ptr.*); + } + self.watches.deinit(allocator); + var sit = self.snapshots.iterator(); + while (sit.next()) |entry| { + // Keys are borrowed from self.watches and freed in the watches loop above. + var snap = entry.value_ptr.*; + var nit = snap.iterator(); + while (nit.next()) |ne| allocator.free(ne.key_ptr.*); + snap.deinit(allocator); + } + self.snapshots.deinit(allocator); + std.posix.close(self.kq); + } + + fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); + } + + fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { + var events: [64]std.posix.Kevent = undefined; + while (true) { + // Block indefinitely until kqueue has events. + const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; + for (events[0..n]) |ev| { + if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit + if (ev.filter != EVFILT_VNODE) continue; + const fd: std.posix.fd_t = @intCast(ev.ident); + + self.watches_mutex.lock(); + var wit = self.watches.iterator(); + var watch_path: ?[]const u8 = null; + var is_file: bool = false; + while (wit.next()) |entry| { + if (entry.value_ptr.*.fd == fd) { + watch_path = entry.key_ptr.*; + is_file = entry.value_ptr.*.is_file; + break; + } + } + self.watches_mutex.unlock(); + if (watch_path == null) continue; + if (is_file) { + // Explicit file watch: emit events with .file type directly. + if (ev.fflags & NOTE_DELETE != 0) { + self.handler.change(watch_path.?, EventType.deleted, .file) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + self.handler.change(watch_path.?, EventType.renamed, .file) catch return; + } else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) { + self.handler.change(watch_path.?, EventType.modified, .file) catch return; + } + } else { + if (ev.fflags & NOTE_DELETE != 0) { + self.handler.change(watch_path.?, EventType.deleted, .dir) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + self.handler.change(watch_path.?, EventType.renamed, .dir) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + self.scan_dir(allocator, watch_path.?) catch {}; + } + } + } + } + } + + // Scan a directory and diff against the snapshot, emitting created/deleted/modified events. + // File modifications are detected opportunistically via mtime changes: if a file was + // written before a NOTE_WRITE fires for another reason (create/delete/rename of a sibling), + // the mtime diff will catch it. + fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + + // Arena for all temporaries — freed in one shot at the end. + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const tmp = arena.allocator(); + + // Collect current files (name → mtime_ns) and subdirectories. + // No lock held while doing filesystem I/O. + var current_files: std.StringHashMapUnmanaged(i128) = .empty; + var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; + var iter = dir.iterate(); + while (try iter.next()) |entry| { + switch (entry.kind) { + .file => { + const mtime = (dir.statFile(entry.name) catch continue).mtime; + const name = try tmp.dupe(u8, entry.name); + try current_files.put(tmp, name, mtime); + }, + .directory => { + const name = try tmp.dupe(u8, entry.name); + try current_dirs.append(tmp, name); + }, + else => {}, + } + } + + // Diff against snapshot under the lock; collect events to emit after releasing it. + // to_create / to_delete / to_modify borrow pointers from the snapshot (allocator), + // only list metadata uses tmp. + var to_create: std.ArrayListUnmanaged([]const u8) = .empty; + var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; + var to_modify: std.ArrayListUnmanaged([]const u8) = .empty; + var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; + + self.snapshots_mutex.lock(); + { + for (current_dirs.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + if (!self.snapshots.contains(full_path)) { + const owned = tmp.dupe(u8, full_path) catch continue; + new_dirs.append(tmp, owned) catch continue; + } + } + + const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + + var cit = current_files.iterator(); + while (cit.next()) |entry| { + if (snapshot.getPtr(entry.key_ptr.*)) |stored_mtime| { + // File exists in both — check for modification via mtime change. + if (stored_mtime.* != entry.value_ptr.*) { + stored_mtime.* = entry.value_ptr.*; + try to_modify.append(tmp, entry.key_ptr.*); // borrow from current (tmp) + } + } else { + // New file — add to snapshot and to_create list. + const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + snapshot.put(allocator, owned, entry.value_ptr.*) catch |e| { + allocator.free(owned); + self.snapshots_mutex.unlock(); + return e; + }; + try to_create.append(tmp, owned); // borrow from snapshot + } + } + + var sit = snapshot.iterator(); + while (sit.next()) |entry| { + if (current_files.contains(entry.key_ptr.*)) continue; + try to_delete.append(tmp, entry.key_ptr.*); // borrow from snapshot + } + for (to_delete.items) |name| _ = snapshot.fetchRemove(name); + } + self.snapshots_mutex.unlock(); + + // Emit all events outside the lock so handlers may safely call watch()/unwatch(). + // Order: new dirs, deletions (source before dest for renames), creations, modifications. + for (new_dirs.items) |full_path| + try self.handler.change(full_path, EventType.created, .dir); + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { + allocator.free(name); + continue; + }; + try self.handler.change(full_path, EventType.deleted, .file); + allocator.free(name); // snapshot key, owned by allocator + } + for (to_create.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try self.handler.change(full_path, EventType.created, .file); + } + for (to_modify.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try self.handler.change(full_path, EventType.modified, .file); + } + // arena.deinit() frees current_files, current_dirs, new_dirs, and list metadata + } + + fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { + self.watches_mutex.lock(); + const already = self.watches.contains(path); + self.watches_mutex.unlock(); + if (already) return; + const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { + error.AccessDenied, + error.PermissionDenied, + error.PathAlreadyExists, + error.SymLinkLoop, + error.NameTooLong, + error.FileNotFound, + error.SystemResources, + error.NoSpaceLeft, + error.NotDir, + error.InvalidUtf8, + error.InvalidWtf8, + error.BadPathName, + error.NoDevice, + error.NetworkNotFound, + error.Unexpected, + error.ProcessFdQuotaExceeded, + error.SystemFdQuotaExceeded, + error.ProcessNotFound, + error.FileTooBig, + error.IsDir, + error.DeviceBusy, + error.FileLocksNotSupported, + error.FileBusy, + error.WouldBlock, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + errdefer std.posix.close(path_fd); + const kev = std.posix.Kevent{ + .ident = @intCast(path_fd), + .filter = EVFILT_VNODE, + .flags = EV_ADD | EV_ENABLE | EV_CLEAR, + .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { + error.AccessDenied, + error.SystemResources, + error.EventNotFound, + error.ProcessNotFound, + error.Overflow, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + // Determine if the path is a regular file or a directory. + const stat = std.posix.fstat(path_fd) catch null; + const is_file = if (stat) |s| std.posix.S.ISREG(s.mode) else false; + const owned_path = try allocator.dupe(u8, path); + self.watches_mutex.lock(); + self.watches.put(allocator, owned_path, .{ .fd = path_fd, .is_file = is_file }) catch |e| { + self.watches_mutex.unlock(); + allocator.free(owned_path); + return e; + }; + self.watches_mutex.unlock(); + // For directory watches only: take initial snapshot so first NOTE_WRITE has a baseline. + if (!is_file) { + self.take_snapshot(allocator, owned_path) catch return error.OutOfMemory; + } + } + + fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + self.snapshots_mutex.lock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + var iter = dir.iterate(); + while (iter.next() catch null) |entry| { + if (entry.kind != .file) continue; + if (snapshot.contains(entry.name)) continue; + const mtime = (dir.statFile(entry.name) catch continue).mtime; + const owned = allocator.dupe(u8, entry.name) catch continue; + snapshot.put(allocator, owned, mtime) catch allocator.free(owned); + } + self.snapshots_mutex.unlock(); + } + + fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + const watches_entry = self.watches.fetchRemove(path); + self.watches_mutex.unlock(); + if (watches_entry) |entry| { + std.posix.close(entry.value.fd); + allocator.free(entry.key); + } + if (self.snapshots.fetchRemove(path)) |entry| { + var snap = entry.value; + var it = snap.iterator(); + while (it.next()) |ne| allocator.free(ne.key_ptr.*); + snap.deinit(allocator); + } + } +}; + const WindowsBackend = struct { const watches_recursively = true; // ReadDirectoryChangesW with bWatchSubtree=1 diff --git a/src/nightwatch_test.zig b/src/nightwatch_test.zig index ad8b5f4..2847f65 100644 --- a/src/nightwatch_test.zig +++ b/src/nightwatch_test.zig @@ -221,6 +221,10 @@ test "creating a file emits a 'created' event" { } test "writing to a file emits a 'modified' event" { + // kqueue watches directories only; file writes don't trigger a directory event, + // so modifications are not reliably detectable in real time on this backend. + if (comptime !nw.detects_file_modifications) return error.SkipZigTest; + const allocator = std.testing.allocator; const tmp = try makeTempDir(allocator); @@ -512,6 +516,8 @@ test "rename-then-modify: rename event precedes the subsequent modify event" { // After renaming a file, a write to the new name should produce events in // the order [rename/old-name, rename/new-name, modify] so that a consumer // always knows the current identity of the file before seeing changes to it. + if (comptime !nw.detects_file_modifications) return error.SkipZigTest; + const allocator = std.testing.allocator; const tmp = try makeTempDir(allocator);