From 6930adae7fae6c5337fa76974823b23b7ada9f1b Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Mon, 9 Mar 2026 09:51:01 +0100 Subject: [PATCH] refactor: re-organize backends into separate files --- src/backend/FSEvents.zig | 237 ++++++ src/backend/INotify.zig | 231 ++++++ src/backend/KQueue.zig | 428 ++++++++++ src/backend/KQueueDir.zig | 362 +++++++++ src/backend/Windows.zig | 254 ++++++ src/nightwatch.zig | 1574 +------------------------------------ src/types.zig | 63 ++ 7 files changed, 1588 insertions(+), 1561 deletions(-) create mode 100644 src/backend/FSEvents.zig create mode 100644 src/backend/INotify.zig create mode 100644 src/backend/KQueue.zig create mode 100644 src/backend/KQueueDir.zig create mode 100644 src/backend/Windows.zig create mode 100644 src/types.zig diff --git a/src/backend/FSEvents.zig b/src/backend/FSEvents.zig new file mode 100644 index 0000000..ce44e76 --- /dev/null +++ b/src/backend/FSEvents.zig @@ -0,0 +1,237 @@ +const std = @import("std"); +const types = @import("../types.zig"); +const Handler = types.Handler; +const EventType = types.EventType; +const ObjectType = types.ObjectType; + +pub const watches_recursively = true; // FSEventStreamCreate watches the entire subtree +pub const detects_file_modifications = true; + +handler: *Handler, +stream: ?*anyopaque, // FSEventStreamRef +queue: ?*anyopaque, // dispatch_queue_t +ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped +watches: std.StringArrayHashMapUnmanaged(void), // owned paths + +const threaded = false; // callback fires on GCD thread; no FW_event needed + +const kFSEventStreamCreateFlagNoDefer: u32 = 0x00000002; +const kFSEventStreamCreateFlagFileEvents: u32 = 0x00000010; +const kFSEventStreamEventFlagItemCreated: u32 = 0x00000100; +const kFSEventStreamEventFlagItemRemoved: u32 = 0x00000200; +const kFSEventStreamEventFlagItemRenamed: u32 = 0x00000800; +const kFSEventStreamEventFlagItemModified: u32 = 0x00001000; +const kFSEventStreamEventFlagItemIsDir: u32 = 0x00020000; +const kFSEventStreamEventIdSinceNow: u64 = 0xFFFFFFFFFFFFFFFF; +const kCFStringEncodingUTF8: u32 = 0x08000100; + +// Mirror of FSEventStreamContext (Apple SDK struct; version must be 0). +const FSEventStreamContext = extern struct { + version: isize = 0, + info: ?*anyopaque = null, + retain: ?*anyopaque = null, + release: ?*anyopaque = null, + copy_description: ?*anyopaque = null, +}; + +const cf = struct { + pub extern "c" fn CFStringCreateWithBytesNoCopy( + alloc: ?*anyopaque, + bytes: [*]const u8, + numBytes: isize, + encoding: u32, + isExternalRepresentation: u8, + contentsDeallocator: ?*anyopaque, + ) ?*anyopaque; + pub extern "c" fn CFArrayCreate( + allocator: ?*anyopaque, + values: [*]const ?*anyopaque, + numValues: isize, + callBacks: ?*anyopaque, + ) ?*anyopaque; + pub extern "c" fn CFRelease(cf: *anyopaque) void; + pub extern "c" fn FSEventStreamCreate( + allocator: ?*anyopaque, + callback: *const anyopaque, + context: *const FSEventStreamContext, + pathsToWatch: *anyopaque, + sinceWhen: u64, + latency: f64, + flags: u32, + ) ?*anyopaque; + pub extern "c" fn FSEventStreamSetDispatchQueue(stream: *anyopaque, queue: *anyopaque) void; + pub extern "c" fn FSEventStreamStart(stream: *anyopaque) u8; + pub extern "c" fn FSEventStreamStop(stream: *anyopaque) void; + pub extern "c" fn FSEventStreamInvalidate(stream: *anyopaque) void; + pub extern "c" fn FSEventStreamRelease(stream: *anyopaque) void; + pub extern "c" fn dispatch_queue_create(label: [*:0]const u8, attr: ?*anyopaque) *anyopaque; + pub extern "c" fn dispatch_release(obj: *anyopaque) void; + pub extern "c" var kCFAllocatorNull: *anyopaque; +}; + +const CallbackContext = struct { + handler: *Handler, + // Snapshot of the watched root paths at arm() time, used to filter out + // spurious events for the root directories themselves that FSEvents + // sometimes delivers as historical events at stream start. + watched_roots: []const []const u8, // owned slice of owned strings +}; + +pub fn init(handler: *Handler) error{}!@This() { + return .{ + .handler = handler, + .stream = null, + .queue = null, + .ctx = null, + .watches = .empty, + }; +} + +fn stop_stream(self: *@This(), allocator: std.mem.Allocator) void { + if (self.stream) |s| { + cf.FSEventStreamStop(s); + cf.FSEventStreamInvalidate(s); + cf.FSEventStreamRelease(s); + self.stream = null; + } + if (self.queue) |q| { + cf.dispatch_release(q); + self.queue = null; + } + if (self.ctx) |c| { + for (c.watched_roots) |r| allocator.free(r); + allocator.free(c.watched_roots); + allocator.destroy(c); + self.ctx = null; + } +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + self.stop_stream(allocator); + var it = self.watches.iterator(); + while (it.next()) |entry| allocator.free(entry.key_ptr.*); + self.watches.deinit(allocator); +} + +pub fn arm(self: *@This(), allocator: std.mem.Allocator) error{ OutOfMemory, ArmFailed }!void { + if (self.stream != null) return; + if (self.watches.count() == 0) return; // no paths yet; will arm on first add_watch + + var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; + defer cf_strings.deinit(allocator); + var it = self.watches.iterator(); + while (it.next()) |entry| { + const path = entry.key_ptr.*; + const s = cf.CFStringCreateWithBytesNoCopy( + null, + path.ptr, + @intCast(path.len), + kCFStringEncodingUTF8, + 0, + cf.kCFAllocatorNull, + ) orelse continue; + cf_strings.append(allocator, s) catch { + cf.CFRelease(s); + break; + }; + } + defer for (cf_strings.items) |s| cf.CFRelease(s.?); + + const paths_array = cf.CFArrayCreate( + null, + cf_strings.items.ptr, + @intCast(cf_strings.items.len), + null, + ) orelse return error.ArmFailed; + defer cf.CFRelease(paths_array); + + // Snapshot watched root paths so the callback can filter them out. + const roots = try allocator.alloc([]const u8, self.watches.count()); + errdefer allocator.free(roots); + var ri: usize = 0; + errdefer for (roots[0..ri]) |r| allocator.free(r); + var wit2 = self.watches.iterator(); + while (wit2.next()) |entry| { + roots[ri] = try allocator.dupe(u8, entry.key_ptr.*); + ri += 1; + } + + const ctx = try allocator.create(CallbackContext); + errdefer allocator.destroy(ctx); + ctx.* = .{ .handler = self.handler, .watched_roots = roots }; + + // FSEventStreamCreate copies the context struct; stack allocation is fine. + const stream_ctx = FSEventStreamContext{ .version = 0, .info = ctx }; + const stream = cf.FSEventStreamCreate( + null, + @ptrCast(&callback), + &stream_ctx, + paths_array, + kFSEventStreamEventIdSinceNow, + 0.1, + kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, + ) orelse return error.ArmFailed; + errdefer cf.FSEventStreamRelease(stream); + + const queue = cf.dispatch_queue_create("flow.file_watcher", null); + cf.FSEventStreamSetDispatchQueue(stream, queue); + _ = cf.FSEventStreamStart(stream); + + self.stream = stream; + self.queue = queue; + self.ctx = ctx; +} + +fn callback( + _: *anyopaque, + info: ?*anyopaque, + num_events: usize, + event_paths: *anyopaque, + event_flags: [*]const u32, + _: [*]const u64, +) callconv(.c) void { + const ctx: *CallbackContext = @ptrCast(@alignCast(info orelse return)); + const paths: [*][*:0]const u8 = @ptrCast(@alignCast(event_paths)); + outer: for (0..num_events) |i| { + const path = std.mem.sliceTo(paths[i], 0); + const flags = event_flags[i]; + + // Skip events for the watched root dirs themselves; FSEvents often + // delivers spurious historical events for them at stream start. + for (ctx.watched_roots) |root| { + if (std.mem.eql(u8, path, root)) continue :outer; + } + + // FSEvents coalesces operations, so multiple flags may be set on + // a single event. Emit one change call per applicable flag so + // callers see all relevant event types (e.g. created + modified). + const ot: ObjectType = if (flags & kFSEventStreamEventFlagItemIsDir != 0) .dir else .file; + if (flags & kFSEventStreamEventFlagItemCreated != 0) { + ctx.handler.change(path, .created, ot) catch {}; + } + if (flags & kFSEventStreamEventFlagItemRemoved != 0) { + ctx.handler.change(path, .deleted, ot) catch {}; + } + if (flags & kFSEventStreamEventFlagItemRenamed != 0) { + ctx.handler.change(path, .renamed, ot) catch {}; + } + if (flags & kFSEventStreamEventFlagItemModified != 0) { + ctx.handler.change(path, .modified, ot) catch {}; + } + } +} + +pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void { + if (self.watches.contains(path)) return; + const owned = try allocator.dupe(u8, path); + errdefer allocator.free(owned); + try self.watches.put(allocator, owned, {}); + self.stop_stream(allocator); + self.arm(allocator) catch {}; +} + +pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + if (self.watches.fetchSwapRemove(path)) |entry| allocator.free(entry.key); + self.stop_stream(allocator); + self.arm(allocator) catch {}; +} diff --git a/src/backend/INotify.zig b/src/backend/INotify.zig new file mode 100644 index 0000000..a8ceeff --- /dev/null +++ b/src/backend/INotify.zig @@ -0,0 +1,231 @@ +const std = @import("std"); +const build_options = @import("build_options"); +const types = @import("../types.zig"); +const Handler = types.Handler; +const EventType = types.EventType; +const ObjectType = types.ObjectType; + +pub const watches_recursively = false; +pub const detects_file_modifications = true; + +const PendingRename = struct { + cookie: u32, + path: []u8, // owned + object_type: ObjectType, +}; + +handler: *Handler, +inotify_fd: std.posix.fd_t, +watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path +pending_renames: std.ArrayListUnmanaged(PendingRename), +// Used only in linux_read_thread mode: +stop_pipe: if (build_options.linux_read_thread) [2]std.posix.fd_t else void, +thread: if (build_options.linux_read_thread) ?std.Thread else void, + +const IN = std.os.linux.IN; + +const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY | + IN.MOVED_FROM | IN.MOVED_TO | IN.DELETE_SELF | + IN.MOVE_SELF | IN.CLOSE_WRITE; + +const in_flags: std.os.linux.O = .{ .NONBLOCK = true }; + +pub fn init(handler: *Handler) !@This() { + const inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags)); + errdefer std.posix.close(inotify_fd); + if (comptime build_options.linux_read_thread) { + const stop_pipe = try std.posix.pipe(); + return .{ + .handler = handler, + .inotify_fd = inotify_fd, + .watches = .empty, + .pending_renames = .empty, + .stop_pipe = stop_pipe, + .thread = null, + }; + } else { + return .{ + .handler = handler, + .inotify_fd = inotify_fd, + .watches = .empty, + .pending_renames = .empty, + .stop_pipe = {}, + .thread = {}, + }; + } +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + if (comptime build_options.linux_read_thread) { + // Signal thread to stop and wait for it to exit. + _ = std.posix.write(self.stop_pipe[1], "x") catch {}; + if (self.thread) |t| t.join(); + std.posix.close(self.stop_pipe[0]); + std.posix.close(self.stop_pipe[1]); + } + var it = self.watches.iterator(); + while (it.next()) |entry| allocator.free(entry.value_ptr.*); + self.watches.deinit(allocator); + for (self.pending_renames.items) |r| allocator.free(r.path); + self.pending_renames.deinit(allocator); + std.posix.close(self.inotify_fd); +} + +pub fn arm(self: *@This(), allocator: std.mem.Allocator) error{HandlerFailed}!void { + if (comptime build_options.linux_read_thread) { + if (self.thread != null) return; // already running + self.thread = std.Thread.spawn(.{}, thread_fn, .{ self, allocator }) catch return error.HandlerFailed; + } else { + return switch (self.handler.wait_readable() catch |e| switch (e) { + error.HandlerFailed => |e_| return e_, + }) { + .will_notify => {}, + }; + } +} + +fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { + var pfds = [_]std.posix.pollfd{ + .{ .fd = self.inotify_fd, .events = std.posix.POLL.IN, .revents = 0 }, + .{ .fd = self.stop_pipe[0], .events = std.posix.POLL.IN, .revents = 0 }, + }; + while (true) { + _ = std.posix.poll(&pfds, -1) catch return; + if (pfds[1].revents & std.posix.POLL.IN != 0) return; // stop signal + if (pfds[0].revents & std.posix.POLL.IN != 0) { + self.handle_read_ready(allocator) catch return; + } + } +} + +pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void { + const path_z = try allocator.dupeZ(u8, path); + defer allocator.free(path_z); + const wd = std.os.linux.inotify_add_watch(self.inotify_fd, path_z, watch_mask); + switch (std.posix.errno(wd)) { + .SUCCESS => {}, + else => |e| { + std.log.err("nightwatch.add_watch failed: {t}", .{e}); + return error.WatchFailed; + }, + } + const owned_path = try allocator.dupe(u8, path); + errdefer allocator.free(owned_path); + const result = try self.watches.getOrPut(allocator, @intCast(wd)); + if (result.found_existing) allocator.free(result.value_ptr.*); + result.value_ptr.* = owned_path; +} + +pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + var it = self.watches.iterator(); + while (it.next()) |entry| { + if (!std.mem.eql(u8, entry.value_ptr.*, path)) continue; + _ = std.os.linux.inotify_rm_watch(self.inotify_fd, entry.key_ptr.*); + allocator.free(entry.value_ptr.*); + self.watches.removeByPtr(entry.key_ptr); + return; + } +} + +fn has_watch_for_path(self: *const @This(), path: []const u8) bool { + var it = self.watches.iterator(); + while (it.next()) |entry| { + if (std.mem.eql(u8, entry.value_ptr.*, path)) return true; + } + return false; +} + +pub fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ NoSpaceLeft, OutOfMemory, HandlerFailed })!void { + const InotifyEvent = extern struct { + wd: i32, + mask: u32, + cookie: u32, + len: u32, + }; + + var buf: [65536]u8 align(@alignOf(InotifyEvent)) = undefined; + defer { + // Any unpaired MOVED_FROM means the file was moved out of the watched tree. + for (self.pending_renames.items) |r| { + self.handler.change(r.path, EventType.deleted, r.object_type) catch {}; + allocator.free(r.path); + } + self.pending_renames.clearRetainingCapacity(); + } + + while (true) { + const n = std.posix.read(self.inotify_fd, &buf) catch |e| switch (e) { + error.WouldBlock => { + // re-arm the file_discriptor + try self.arm(allocator); + break; + }, + else => return e, + }; + var offset: usize = 0; + while (offset + @sizeOf(InotifyEvent) <= n) { + const ev: *const InotifyEvent = @ptrCast(@alignCast(buf[offset..].ptr)); + const name_offset = offset + @sizeOf(InotifyEvent); + offset = name_offset + ev.len; + const watched_path = self.watches.get(ev.wd) orelse continue; + const name: []const u8 = if (ev.len > 0) + std.mem.sliceTo(buf[name_offset..][0..ev.len], 0) + else + ""; + + var full_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path: []const u8 = if (name.len > 0) + try std.fmt.bufPrint(&full_buf, "{s}/{s}", .{ watched_path, name }) + else + watched_path; + + if (ev.mask & IN.MOVED_FROM != 0) { + // Park it, we may receive a paired MOVED_TO with the same cookie. + try self.pending_renames.append(allocator, .{ + .cookie = ev.cookie, + .path = try allocator.dupe(u8, full_path), + .object_type = if (ev.mask & IN.ISDIR != 0) .dir else .file, + }); + } else if (ev.mask & IN.MOVED_TO != 0) { + // Look for a paired MOVED_FROM. + var found: ?usize = null; + for (self.pending_renames.items, 0..) |r, i| { + if (r.cookie == ev.cookie) { + found = i; + break; + } + } + if (found) |i| { + // Complete rename pair: emit a single atomic rename message. + const r = self.pending_renames.swapRemove(i); + defer allocator.free(r.path); + try self.handler.rename(r.path, full_path, r.object_type); + } else { + // No paired MOVED_FROM, file was moved in from outside the watched tree. + const ot: ObjectType = if (ev.mask & IN.ISDIR != 0) .dir else .file; + try self.handler.change(full_path, EventType.created, ot); + } + } else if (ev.mask & IN.MOVE_SELF != 0) { + // The watched directory itself was renamed/moved away. + try self.handler.change(full_path, EventType.deleted, .dir); + } else { + const is_dir = ev.mask & IN.ISDIR != 0; + const object_type: ObjectType = if (is_dir) .dir else .file; + const event_type: EventType = if (ev.mask & IN.CREATE != 0) + .created + else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) blk: { + // Suppress IN_DELETE|IN_ISDIR for subdirs that have their + // own watch: IN_DELETE_SELF on that watch will fire the + // same path without duplication. + if (is_dir and self.has_watch_for_path(full_path)) + continue; + break :blk .deleted; + } else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0) + .modified + else + continue; + try self.handler.change(full_path, event_type, object_type); + } + } + } +} diff --git a/src/backend/KQueue.zig b/src/backend/KQueue.zig new file mode 100644 index 0000000..68fbb74 --- /dev/null +++ b/src/backend/KQueue.zig @@ -0,0 +1,428 @@ +const std = @import("std"); +const types = @import("../types.zig"); +const Handler = types.Handler; +const EventType = types.EventType; +const ObjectType = types.ObjectType; + +pub const watches_recursively = false; +pub const detects_file_modifications = true; + +handler: *Handler, +kq: std.posix.fd_t, +shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread +thread: ?std.Thread, +watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned dir path -> fd +watches_mutex: std.Thread.Mutex, +file_watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned file path -> fd +file_watches_mutex: std.Thread.Mutex, +// Per-directory snapshots of filenames, used to diff on NOTE_WRITE. +// Key: owned dir path (same as watches key), value: set of owned filenames. +// Accessed from both the main thread (add_watch) and the background thread (scan_dir). +snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), +snapshots_mutex: std.Thread.Mutex, + +const EVFILT_VNODE: i16 = -4; +const EVFILT_READ: i16 = -1; +const EV_ADD: u16 = 0x0001; +const EV_ENABLE: u16 = 0x0004; +const EV_CLEAR: u16 = 0x0020; +const EV_DELETE: u16 = 0x0002; +const NOTE_DELETE: u32 = 0x00000001; +const NOTE_WRITE: u32 = 0x00000002; +const NOTE_EXTEND: u32 = 0x00000004; +const NOTE_ATTRIB: u32 = 0x00000008; +const NOTE_RENAME: u32 = 0x00000020; + +pub fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { + // Per-file kqueue watches require one open fd per watched file. Bump + // the soft NOFILE limit to the hard limit so large directory trees don't + // exhaust the default quota (256 on macOS, 1024 on many FreeBSD installs). + if (std.posix.getrlimit(.NOFILE)) |rl| { + if (rl.cur < rl.max) + std.posix.setrlimit(.NOFILE, .{ .cur = rl.max, .max = rl.max }) catch {}; + } else |_| {} + const kq = try std.posix.kqueue(); + errdefer std.posix.close(kq); + const pipe = try std.posix.pipe(); + errdefer { + std.posix.close(pipe[0]); + std.posix.close(pipe[1]); + } + // Register the read end of the shutdown pipe with kqueue so the thread + // wakes up when we want to shut down. + const shutdown_kev = std.posix.Kevent{ + .ident = @intCast(pipe[0]), + .filter = EVFILT_READ, + .flags = EV_ADD | EV_ENABLE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); + return .{ + .handler = handler, + .kq = kq, + .shutdown_pipe = pipe, + .thread = null, + .watches = .empty, + .watches_mutex = .{}, + .file_watches = .empty, + .file_watches_mutex = .{}, + .snapshots = .empty, + .snapshots_mutex = .{}, + }; +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + // Signal the thread to exit by writing to the shutdown pipe. + _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; + if (self.thread) |t| t.join(); + std.posix.close(self.shutdown_pipe[0]); + std.posix.close(self.shutdown_pipe[1]); + var it = self.watches.iterator(); + while (it.next()) |entry| { + std.posix.close(entry.value_ptr.*); + allocator.free(entry.key_ptr.*); + } + self.watches.deinit(allocator); + var fit = self.file_watches.iterator(); + while (fit.next()) |entry| { + std.posix.close(entry.value_ptr.*); + allocator.free(entry.key_ptr.*); + } + self.file_watches.deinit(allocator); + var sit = self.snapshots.iterator(); + while (sit.next()) |entry| { + // Keys are borrowed from self.watches and freed in the watches loop above. + var names = entry.value_ptr.*; + var nit = names.iterator(); + while (nit.next()) |ne| allocator.free(ne.key_ptr.*); + names.deinit(allocator); + } + self.snapshots.deinit(allocator); + std.posix.close(self.kq); +} + +pub fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); +} + +fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { + var events: [64]std.posix.Kevent = undefined; + while (true) { + // Block indefinitely until kqueue has events. + const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; + for (events[0..n]) |ev| { + if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit + if (ev.filter != EVFILT_VNODE) continue; + const fd: std.posix.fd_t = @intCast(ev.ident); + + // Check if this is a file watch: NOTE_WRITE/NOTE_EXTEND → modified. + self.file_watches_mutex.lock(); + var fwit = self.file_watches.iterator(); + const file_path: ?[]const u8 = while (fwit.next()) |entry| { + if (entry.value_ptr.* == fd) break entry.key_ptr.*; + } else null; + self.file_watches_mutex.unlock(); + if (file_path) |fp| { + if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) + self.handler.change(fp, EventType.modified, .file) catch return; + continue; + } + + // Otherwise look up the directory path for this fd. + self.watches_mutex.lock(); + var wit = self.watches.iterator(); + const dir_path: ?[]const u8 = while (wit.next()) |entry| { + if (entry.value_ptr.* == fd) break entry.key_ptr.*; + } else null; + self.watches_mutex.unlock(); + if (dir_path == null) continue; + if (ev.fflags & NOTE_DELETE != 0) { + self.handler.change(dir_path.?, EventType.deleted, .dir) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + self.handler.change(dir_path.?, EventType.renamed, .dir) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + self.scan_dir(allocator, dir_path.?) catch {}; + } + } + } +} + +// Scan a directory and diff against the snapshot, emitting created/deleted events. +fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + + // Arena for all temporaries — freed in one shot at the end. + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const tmp = arena.allocator(); + + // Collect current files and subdirectories (no lock, reading filesystem only). + var current_files: std.StringHashMapUnmanaged(void) = .empty; + var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; + var iter = dir.iterate(); + while (try iter.next()) |entry| { + switch (entry.kind) { + .file => { + const name = try tmp.dupe(u8, entry.name); + try current_files.put(tmp, name, {}); + }, + .directory => { + const name = try tmp.dupe(u8, entry.name); + try current_dirs.append(tmp, name); + }, + else => {}, + } + } + + // Diff against snapshot under the lock; collect events to emit after releasing it. + // to_create / to_delete hold borrowed pointers into the snapshot (which uses + // allocator, not tmp); only the list metadata itself uses tmp. + var to_create: std.ArrayListUnmanaged([]const u8) = .empty; + var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; + var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; + + self.snapshots_mutex.lock(); + { + for (current_dirs.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + if (!self.snapshots.contains(full_path)) { + const owned = tmp.dupe(u8, full_path) catch continue; + new_dirs.append(tmp, owned) catch continue; + } + } + + const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + + var cit = current_files.iterator(); + while (cit.next()) |entry| { + if (snapshot.contains(entry.key_ptr.*)) continue; + const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + snapshot.put(allocator, owned, {}) catch |e| { + allocator.free(owned); + self.snapshots_mutex.unlock(); + return e; + }; + try to_create.append(tmp, owned); + } + + var sit = snapshot.iterator(); + while (sit.next()) |entry| { + if (current_files.contains(entry.key_ptr.*)) continue; + try to_delete.append(tmp, entry.key_ptr.*); + } + for (to_delete.items) |name| _ = snapshot.fetchRemove(name); + } + self.snapshots_mutex.unlock(); + + // Emit all events outside the lock so handlers may safely call watch()/unwatch(). + // Emit created dirs, then deletions, then creations. Deletions first ensures that + // a rename (old disappears, new appears) reports the source path before the dest. + for (new_dirs.items) |full_path| + try self.handler.change(full_path, EventType.created, .dir); + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { + allocator.free(name); + continue; + }; + self.deregister_file_watch(allocator, full_path); + try self.handler.change(full_path, EventType.deleted, .file); + allocator.free(name); // snapshot key, owned by allocator + } + for (to_create.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + self.register_file_watch(allocator, full_path); + try self.handler.change(full_path, EventType.created, .file); + } + // arena.deinit() frees current_files, current_dirs, new_dirs, and list metadata +} + +fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.file_watches_mutex.lock(); + const already = self.file_watches.contains(path); + self.file_watches_mutex.unlock(); + if (already) return; + const fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return; + const kev = std.posix.Kevent{ + .ident = @intCast(fd), + .filter = EVFILT_VNODE, + .flags = EV_ADD | EV_ENABLE | EV_CLEAR, + .fflags = NOTE_WRITE | NOTE_EXTEND, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch { + std.posix.close(fd); + return; + }; + const owned = allocator.dupe(u8, path) catch { + std.posix.close(fd); + return; + }; + self.file_watches_mutex.lock(); + self.file_watches.put(allocator, owned, fd) catch { + self.file_watches_mutex.unlock(); + std.posix.close(fd); + allocator.free(owned); + return; + }; + self.file_watches_mutex.unlock(); +} + +fn deregister_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.file_watches_mutex.lock(); + const kv = self.file_watches.fetchRemove(path); + self.file_watches_mutex.unlock(); + if (kv) |entry| { + std.posix.close(entry.value); + allocator.free(entry.key); + } +} + +pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { + self.watches_mutex.lock(); + const already = self.watches.contains(path); + self.watches_mutex.unlock(); + if (already) return; + const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { + error.AccessDenied, + error.PermissionDenied, + error.PathAlreadyExists, + error.SymLinkLoop, + error.NameTooLong, + error.FileNotFound, + error.SystemResources, + error.NoSpaceLeft, + error.NotDir, + error.InvalidUtf8, + error.InvalidWtf8, + error.BadPathName, + error.NoDevice, + error.NetworkNotFound, + error.Unexpected, + error.ProcessFdQuotaExceeded, + error.SystemFdQuotaExceeded, + error.ProcessNotFound, + error.FileTooBig, + error.IsDir, + error.DeviceBusy, + error.FileLocksNotSupported, + error.FileBusy, + error.WouldBlock, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + errdefer std.posix.close(path_fd); + const kev = std.posix.Kevent{ + .ident = @intCast(path_fd), + .filter = EVFILT_VNODE, + .flags = EV_ADD | EV_ENABLE | EV_CLEAR, + .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { + error.AccessDenied, + error.SystemResources, + error.EventNotFound, + error.ProcessNotFound, + error.Overflow, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + const owned_path = try allocator.dupe(u8, path); + self.watches_mutex.lock(); + self.watches.put(allocator, owned_path, path_fd) catch |e| { + self.watches_mutex.unlock(); + allocator.free(owned_path); + return e; + }; + self.watches_mutex.unlock(); + // Take initial snapshot so first NOTE_WRITE has a baseline to diff against. + self.take_snapshot(allocator, owned_path) catch |e| switch (e) { + error.AccessDenied, + error.PermissionDenied, + error.SystemResources, + error.InvalidUtf8, + error.Unexpected, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + error.OutOfMemory => return error.OutOfMemory, + }; +} + +fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + // Collect file names first so we can register file watches without holding the lock. + var names: std.ArrayListUnmanaged([]u8) = .empty; + defer { + for (names.items) |n| allocator.free(n); + names.deinit(allocator); + } + var iter = dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .file) continue; + try names.append(allocator, try allocator.dupe(u8, entry.name)); + } + self.snapshots_mutex.lock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + var snapshot = gop.value_ptr; + for (names.items) |name| { + if (snapshot.contains(name)) continue; + const owned = try allocator.dupe(u8, name); + try snapshot.put(allocator, owned, {}); + } + self.snapshots_mutex.unlock(); + // Register a kqueue watch for each existing file so writes are detected. + for (names.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + self.register_file_watch(allocator, full_path); + } +} + +pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + const watches_entry = self.watches.fetchRemove(path); + self.watches_mutex.unlock(); + if (watches_entry) |entry| { + std.posix.close(entry.value); + allocator.free(entry.key); + } + if (self.snapshots.fetchRemove(path)) |entry| { + var names = entry.value; + var it = names.iterator(); + while (it.next()) |ne| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ path, ne.key_ptr.* }) catch { + allocator.free(ne.key_ptr.*); + continue; + }; + self.deregister_file_watch(allocator, full_path); + allocator.free(ne.key_ptr.*); + } + names.deinit(allocator); + } +} diff --git a/src/backend/KQueueDir.zig b/src/backend/KQueueDir.zig new file mode 100644 index 0000000..ab20e68 --- /dev/null +++ b/src/backend/KQueueDir.zig @@ -0,0 +1,362 @@ +const std = @import("std"); +const types = @import("../types.zig"); +const Handler = types.Handler; +const EventType = types.EventType; +const ObjectType = types.ObjectType; + +pub const watches_recursively = false; +pub const detects_file_modifications = false; +pub const WatchEntry = struct { fd: std.posix.fd_t, is_file: bool }; + +handler: *Handler, +kq: std.posix.fd_t, +shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread +thread: ?std.Thread, +watches: std.StringHashMapUnmanaged(WatchEntry), // owned path -> {fd, is_file} +watches_mutex: std.Thread.Mutex, +// Per-directory snapshots: owned filename -> mtime_ns. +// Used to diff on NOTE_WRITE: detects creates, deletes, and (opportunistically) +// modifications when the same directory fires another event later. +// Key: owned dir path (same as watches key), value: map of owned filename -> mtime_ns. +snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(i128)), +snapshots_mutex: std.Thread.Mutex, + +const EVFILT_VNODE: i16 = -4; +const EVFILT_READ: i16 = -1; +const EV_ADD: u16 = 0x0001; +const EV_ENABLE: u16 = 0x0004; +const EV_CLEAR: u16 = 0x0020; +const EV_DELETE: u16 = 0x0002; +const NOTE_DELETE: u32 = 0x00000001; +const NOTE_WRITE: u32 = 0x00000002; +const NOTE_EXTEND: u32 = 0x00000004; +const NOTE_ATTRIB: u32 = 0x00000008; +const NOTE_RENAME: u32 = 0x00000020; + +pub fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { + const kq = try std.posix.kqueue(); + errdefer std.posix.close(kq); + const pipe = try std.posix.pipe(); + errdefer { + std.posix.close(pipe[0]); + std.posix.close(pipe[1]); + } + // Register the read end of the shutdown pipe with kqueue so the thread + // wakes up when we want to shut down. + const shutdown_kev = std.posix.Kevent{ + .ident = @intCast(pipe[0]), + .filter = EVFILT_READ, + .flags = EV_ADD | EV_ENABLE, + .fflags = 0, + .data = 0, + .udata = 0, + }; + _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); + return .{ + .handler = handler, + .kq = kq, + .shutdown_pipe = pipe, + .thread = null, + .watches = .empty, + .watches_mutex = .{}, + .snapshots = .empty, + .snapshots_mutex = .{}, + }; +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + // Signal the thread to exit by writing to the shutdown pipe. + _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; + if (self.thread) |t| t.join(); + std.posix.close(self.shutdown_pipe[0]); + std.posix.close(self.shutdown_pipe[1]); + var it = self.watches.iterator(); + while (it.next()) |entry| { + std.posix.close(entry.value_ptr.*.fd); + allocator.free(entry.key_ptr.*); + } + self.watches.deinit(allocator); + var sit = self.snapshots.iterator(); + while (sit.next()) |entry| { + // Keys are borrowed from self.watches and freed in the watches loop above. + var snap = entry.value_ptr.*; + var nit = snap.iterator(); + while (nit.next()) |ne| allocator.free(ne.key_ptr.*); + snap.deinit(allocator); + } + self.snapshots.deinit(allocator); + std.posix.close(self.kq); +} + +pub fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); +} + +fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { + var events: [64]std.posix.Kevent = undefined; + while (true) { + // Block indefinitely until kqueue has events. + const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; + for (events[0..n]) |ev| { + if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit + if (ev.filter != EVFILT_VNODE) continue; + const fd: std.posix.fd_t = @intCast(ev.ident); + + self.watches_mutex.lock(); + var wit = self.watches.iterator(); + var watch_path: ?[]const u8 = null; + var is_file: bool = false; + while (wit.next()) |entry| { + if (entry.value_ptr.*.fd == fd) { + watch_path = entry.key_ptr.*; + is_file = entry.value_ptr.*.is_file; + break; + } + } + self.watches_mutex.unlock(); + if (watch_path == null) continue; + if (is_file) { + // Explicit file watch: emit events with .file type directly. + if (ev.fflags & NOTE_DELETE != 0) { + self.handler.change(watch_path.?, EventType.deleted, .file) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + self.handler.change(watch_path.?, EventType.renamed, .file) catch return; + } else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) { + self.handler.change(watch_path.?, EventType.modified, .file) catch return; + } + } else { + if (ev.fflags & NOTE_DELETE != 0) { + self.handler.change(watch_path.?, EventType.deleted, .dir) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + self.handler.change(watch_path.?, EventType.renamed, .dir) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + self.scan_dir(allocator, watch_path.?) catch {}; + } + } + } + } +} + +// Scan a directory and diff against the snapshot, emitting created/deleted/modified events. +// File modifications are detected opportunistically via mtime changes: if a file was +// written before a NOTE_WRITE fires for another reason (create/delete/rename of a sibling), +// the mtime diff will catch it. +fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + + // Arena for all temporaries — freed in one shot at the end. + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + const tmp = arena.allocator(); + + // Collect current files (name → mtime_ns) and subdirectories. + // No lock held while doing filesystem I/O. + var current_files: std.StringHashMapUnmanaged(i128) = .empty; + var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; + var iter = dir.iterate(); + while (try iter.next()) |entry| { + switch (entry.kind) { + .file => { + const mtime = (dir.statFile(entry.name) catch continue).mtime; + const name = try tmp.dupe(u8, entry.name); + try current_files.put(tmp, name, mtime); + }, + .directory => { + const name = try tmp.dupe(u8, entry.name); + try current_dirs.append(tmp, name); + }, + else => {}, + } + } + + // Diff against snapshot under the lock; collect events to emit after releasing it. + // to_create / to_delete / to_modify borrow pointers from the snapshot (allocator), + // only list metadata uses tmp. + var to_create: std.ArrayListUnmanaged([]const u8) = .empty; + var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; + var to_modify: std.ArrayListUnmanaged([]const u8) = .empty; + var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; + + self.snapshots_mutex.lock(); + { + for (current_dirs.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + if (!self.snapshots.contains(full_path)) { + const owned = tmp.dupe(u8, full_path) catch continue; + new_dirs.append(tmp, owned) catch continue; + } + } + + const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + + var cit = current_files.iterator(); + while (cit.next()) |entry| { + if (snapshot.getPtr(entry.key_ptr.*)) |stored_mtime| { + // File exists in both — check for modification via mtime change. + if (stored_mtime.* != entry.value_ptr.*) { + stored_mtime.* = entry.value_ptr.*; + try to_modify.append(tmp, entry.key_ptr.*); // borrow from current (tmp) + } + } else { + // New file — add to snapshot and to_create list. + const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { + self.snapshots_mutex.unlock(); + return e; + }; + snapshot.put(allocator, owned, entry.value_ptr.*) catch |e| { + allocator.free(owned); + self.snapshots_mutex.unlock(); + return e; + }; + try to_create.append(tmp, owned); // borrow from snapshot + } + } + + var sit = snapshot.iterator(); + while (sit.next()) |entry| { + if (current_files.contains(entry.key_ptr.*)) continue; + try to_delete.append(tmp, entry.key_ptr.*); // borrow from snapshot + } + for (to_delete.items) |name| _ = snapshot.fetchRemove(name); + } + self.snapshots_mutex.unlock(); + + // Emit all events outside the lock so handlers may safely call watch()/unwatch(). + // Order: new dirs, deletions (source before dest for renames), creations, modifications. + for (new_dirs.items) |full_path| + try self.handler.change(full_path, EventType.created, .dir); + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { + allocator.free(name); + continue; + }; + try self.handler.change(full_path, EventType.deleted, .file); + allocator.free(name); // snapshot key, owned by allocator + } + for (to_create.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try self.handler.change(full_path, EventType.created, .file); + } + for (to_modify.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try self.handler.change(full_path, EventType.modified, .file); + } + // arena.deinit() frees current_files, current_dirs, new_dirs, and list metadata +} + +pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { + self.watches_mutex.lock(); + const already = self.watches.contains(path); + self.watches_mutex.unlock(); + if (already) return; + const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { + error.AccessDenied, + error.PermissionDenied, + error.PathAlreadyExists, + error.SymLinkLoop, + error.NameTooLong, + error.FileNotFound, + error.SystemResources, + error.NoSpaceLeft, + error.NotDir, + error.InvalidUtf8, + error.InvalidWtf8, + error.BadPathName, + error.NoDevice, + error.NetworkNotFound, + error.Unexpected, + error.ProcessFdQuotaExceeded, + error.SystemFdQuotaExceeded, + error.ProcessNotFound, + error.FileTooBig, + error.IsDir, + error.DeviceBusy, + error.FileLocksNotSupported, + error.FileBusy, + error.WouldBlock, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + errdefer std.posix.close(path_fd); + const kev = std.posix.Kevent{ + .ident = @intCast(path_fd), + .filter = EVFILT_VNODE, + .flags = EV_ADD | EV_ENABLE | EV_CLEAR, + .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, + .data = 0, + .udata = 0, + }; + _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { + error.AccessDenied, + error.SystemResources, + error.EventNotFound, + error.ProcessNotFound, + error.Overflow, + => |e_| { + std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); + return error.WatchFailed; + }, + }; + // Determine if the path is a regular file or a directory. + const stat = std.posix.fstat(path_fd) catch null; + const is_file = if (stat) |s| std.posix.S.ISREG(s.mode) else false; + const owned_path = try allocator.dupe(u8, path); + self.watches_mutex.lock(); + self.watches.put(allocator, owned_path, .{ .fd = path_fd, .is_file = is_file }) catch |e| { + self.watches_mutex.unlock(); + allocator.free(owned_path); + return e; + }; + self.watches_mutex.unlock(); + // For directory watches only: take initial snapshot so first NOTE_WRITE has a baseline. + if (!is_file) { + self.take_snapshot(allocator, owned_path) catch return error.OutOfMemory; + } +} + +fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + self.snapshots_mutex.lock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + var iter = dir.iterate(); + while (iter.next() catch null) |entry| { + if (entry.kind != .file) continue; + if (snapshot.contains(entry.name)) continue; + const mtime = (dir.statFile(entry.name) catch continue).mtime; + const owned = allocator.dupe(u8, entry.name) catch continue; + snapshot.put(allocator, owned, mtime) catch allocator.free(owned); + } + self.snapshots_mutex.unlock(); +} + +pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + const watches_entry = self.watches.fetchRemove(path); + self.watches_mutex.unlock(); + if (watches_entry) |entry| { + std.posix.close(entry.value.fd); + allocator.free(entry.key); + } + if (self.snapshots.fetchRemove(path)) |entry| { + var snap = entry.value; + var it = snap.iterator(); + while (it.next()) |ne| allocator.free(ne.key_ptr.*); + snap.deinit(allocator); + } +} diff --git a/src/backend/Windows.zig b/src/backend/Windows.zig new file mode 100644 index 0000000..58bb50e --- /dev/null +++ b/src/backend/Windows.zig @@ -0,0 +1,254 @@ +const std = @import("std"); +const types = @import("../types.zig"); +const Handler = types.Handler; +const EventType = types.EventType; +const ObjectType = types.ObjectType; + +pub const watches_recursively = true; // ReadDirectoryChangesW with bWatchSubtree=1 +pub const detects_file_modifications = true; + +const windows = std.os.windows; + +const win32 = struct { + pub extern "kernel32" fn CloseHandle(hObject: windows.HANDLE) callconv(.winapi) windows.BOOL; + pub extern "kernel32" fn ReadDirectoryChangesW( + hDirectory: windows.HANDLE, + lpBuffer: *anyopaque, + nBufferLength: windows.DWORD, + bWatchSubtree: windows.BOOL, + dwNotifyFilter: windows.DWORD, + lpBytesReturned: ?*windows.DWORD, + lpOverlapped: ?*windows.OVERLAPPED, + lpCompletionRoutine: ?*anyopaque, + ) callconv(.winapi) windows.BOOL; + pub extern "kernel32" fn GetQueuedCompletionStatus( + CompletionPort: windows.HANDLE, + lpNumberOfBytesTransferred: *windows.DWORD, + lpCompletionKey: *windows.ULONG_PTR, + lpOverlapped: *?*windows.OVERLAPPED, + dwMilliseconds: windows.DWORD, + ) callconv(.winapi) windows.BOOL; + pub extern "kernel32" fn CreateFileW( + lpFileName: [*:0]const windows.WCHAR, + dwDesiredAccess: windows.DWORD, + dwShareMode: windows.DWORD, + lpSecurityAttributes: ?*anyopaque, + dwCreationDisposition: windows.DWORD, + dwFlagsAndAttributes: windows.DWORD, + hTemplateFile: ?windows.HANDLE, + ) callconv(.winapi) windows.HANDLE; + pub extern "kernel32" fn PostQueuedCompletionStatus( + CompletionPort: windows.HANDLE, + dwNumberOfBytesTransferred: windows.DWORD, + dwCompletionKey: windows.ULONG_PTR, + lpOverlapped: ?*windows.OVERLAPPED, + ) callconv(.winapi) windows.BOOL; + pub extern "kernel32" fn GetFileAttributesW(lpFileName: [*:0]const windows.WCHAR) callconv(.winapi) windows.DWORD; +}; + +handler: *Handler, +iocp: windows.HANDLE, +thread: ?std.Thread, +watches: std.StringHashMapUnmanaged(Watch), +watches_mutex: std.Thread.Mutex, +path_types: std.StringHashMapUnmanaged(ObjectType), + +// A completion key of zero is used to signal the background thread to exit. +const SHUTDOWN_KEY: windows.ULONG_PTR = 0; + +const Watch = struct { + handle: windows.HANDLE, + buf: Buf, + overlapped: windows.OVERLAPPED, + path: []u8, // owned +}; + +const buf_size = 65536; +const Buf = []align(4) u8; + +const FILE_NOTIFY_INFORMATION = extern struct { + NextEntryOffset: windows.DWORD, + Action: windows.DWORD, + FileNameLength: windows.DWORD, + FileName: [1]windows.WCHAR, +}; + +const FILE_ACTION_ADDED: windows.DWORD = 1; +const FILE_ACTION_REMOVED: windows.DWORD = 2; +const FILE_ACTION_MODIFIED: windows.DWORD = 3; +const FILE_ACTION_RENAMED_OLD_NAME: windows.DWORD = 4; +const FILE_ACTION_RENAMED_NEW_NAME: windows.DWORD = 5; + +const notify_filter: windows.DWORD = + 0x00000001 | // FILE_NOTIFY_CHANGE_FILE_NAME + 0x00000002 | // FILE_NOTIFY_CHANGE_DIR_NAME + 0x00000008 | // FILE_NOTIFY_CHANGE_SIZE + 0x00000010 | // FILE_NOTIFY_CHANGE_LAST_WRITE + 0x00000040; // FILE_NOTIFY_CHANGE_CREATION + +pub fn init(handler: *Handler) windows.CreateIoCompletionPortError!@This() { + const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1); + return .{ .handler = handler, .iocp = iocp, .thread = null, .watches = .empty, .watches_mutex = .{}, .path_types = .empty }; +} + +pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void { + // Wake the background thread with a shutdown key, then wait for it. + _ = win32.PostQueuedCompletionStatus(self.iocp, 0, SHUTDOWN_KEY, null); + if (self.thread) |t| t.join(); + var it = self.watches.iterator(); + while (it.next()) |entry| { + _ = win32.CloseHandle(entry.value_ptr.*.handle); + allocator.free(entry.value_ptr.*.path); + allocator.free(entry.value_ptr.*.buf); + } + self.watches.deinit(allocator); + var pt_it = self.path_types.iterator(); + while (pt_it.next()) |entry| allocator.free(entry.key_ptr.*); + self.path_types.deinit(allocator); + _ = win32.CloseHandle(self.iocp); +} + +pub fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + if (self.thread != null) return error.AlreadyArmed; + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ allocator, self.iocp, &self.watches, &self.watches_mutex, &self.path_types, self.handler }); +} + +fn thread_fn( + allocator: std.mem.Allocator, + iocp: windows.HANDLE, + watches: *std.StringHashMapUnmanaged(Watch), + watches_mutex: *std.Thread.Mutex, + path_types: *std.StringHashMapUnmanaged(ObjectType), + handler: *Handler, +) void { + var bytes: windows.DWORD = 0; + var key: windows.ULONG_PTR = 0; + var overlapped_ptr: ?*windows.OVERLAPPED = null; + while (true) { + // Block indefinitely until IOCP has a completion or shutdown signal. + const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE); + if (ok == 0 or key == SHUTDOWN_KEY) return; + const triggered_handle: windows.HANDLE = @ptrFromInt(key); + watches_mutex.lock(); + var it = watches.iterator(); + while (it.next()) |entry| { + const w = entry.value_ptr; + if (w.handle != triggered_handle) continue; + if (bytes > 0) { + var offset: usize = 0; + while (offset < bytes) { + const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); + const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; + var name_buf: [std.fs.max_path_bytes]u8 = undefined; + const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; + const event_type: EventType = switch (info.Action) { + FILE_ACTION_ADDED => .created, + FILE_ACTION_REMOVED => .deleted, + FILE_ACTION_MODIFIED => .modified, + FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, + else => { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }, + }; + var full_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }; + // Determine object_type: try GetFileAttributesW; cache result. + const object_type: ObjectType = if (event_type == .deleted) blk: { + // Path no longer exists; use cached type if available. + const cached = path_types.fetchRemove(full_path); + break :blk if (cached) |kv| blk2: { + allocator.free(kv.key); + break :blk2 kv.value; + } else .unknown; + } else blk: { + var full_path_w: [std.fs.max_path_bytes]windows.WCHAR = undefined; + const len = std.unicode.utf8ToUtf16Le(&full_path_w, full_path) catch break :blk .unknown; + full_path_w[len] = 0; + const attrs = win32.GetFileAttributesW(full_path_w[0..len :0]); + const INVALID: windows.DWORD = 0xFFFFFFFF; + const FILE_ATTRIBUTE_DIRECTORY: windows.DWORD = 0x10; + const ot: ObjectType = if (attrs == INVALID) .unknown else if (attrs & FILE_ATTRIBUTE_DIRECTORY != 0) .dir else .file; + // Cache the determined type. + if (ot != .unknown) { + const gop = path_types.getOrPut(allocator, full_path) catch break :blk ot; + if (!gop.found_existing) { + gop.key_ptr.* = allocator.dupe(u8, full_path) catch { + _ = path_types.remove(full_path); + break :blk ot; + }; + } + gop.value_ptr.* = ot; + } + break :blk ot; + }; + // Capture next_entry_offset before releasing the mutex: after unlock, + // the main thread may call remove_watch() which frees w.buf, making + // the `info` pointer (which points into w.buf) a dangling reference. + const next_entry_offset = info.NextEntryOffset; + watches_mutex.unlock(); + handler.change(full_path, event_type, object_type) catch { + watches_mutex.lock(); + break; + }; + watches_mutex.lock(); + if (next_entry_offset == 0) break; + offset += next_entry_offset; + } + } + // Re-arm ReadDirectoryChangesW for the next batch. + w.overlapped = std.mem.zeroes(windows.OVERLAPPED); + _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); + break; + } + watches_mutex.unlock(); + } +} + +pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); + if (self.watches.contains(path)) return; + const path_w = std.unicode.utf8ToUtf16LeAllocZ(allocator, path) catch return error.WatchFailed; + defer allocator.free(path_w); + const handle = win32.CreateFileW( + path_w, + windows.GENERIC_READ, + windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE | windows.FILE_SHARE_DELETE, + null, + windows.OPEN_EXISTING, + 0x02000000 | 0x40000000, // FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED + null, + ); + if (handle == windows.INVALID_HANDLE_VALUE) return error.WatchFailed; + errdefer _ = win32.CloseHandle(handle); + _ = windows.CreateIoCompletionPort(handle, self.iocp, @intFromPtr(handle), 0) catch return error.WatchFailed; + const buf = try allocator.alignedAlloc(u8, .fromByteUnits(4), buf_size); + errdefer allocator.free(buf); + const owned_path = try allocator.dupe(u8, path); + errdefer allocator.free(owned_path); + var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); + if (win32.ReadDirectoryChangesW(handle, buf.ptr, buf_size, 1, notify_filter, null, &overlapped, null) == 0) + return error.WatchFailed; + try self.watches.put(allocator, owned_path, .{ + .handle = handle, + .buf = buf, + .overlapped = overlapped, + .path = owned_path, + }); +} + +pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); + if (self.watches.fetchRemove(path)) |entry| { + _ = win32.CloseHandle(entry.value.handle); + allocator.free(entry.value.path); + allocator.free(entry.value.buf); + } +} diff --git a/src/nightwatch.zig b/src/nightwatch.zig index a667afd..d759a10 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -1,70 +1,26 @@ const std = @import("std"); const builtin = @import("builtin"); const build_options = @import("build_options"); +const types = @import("types.zig"); -pub const EventType = enum { - created, - modified, - deleted, - /// Only produced on macOS and Windows where the OS gives no pairing info. - /// On Linux, paired renames are emitted as a rename event with both paths instead. - renamed, -}; - -pub const ObjectType = enum { - file, - dir, - /// The object type could not be determined (e.g. a deleted file on Windows - /// where the path no longer exists to query). - unknown, -}; - -pub const Error = error{ - HandlerFailed, - OutOfMemory, - WatchFailed, -}; - -/// True when the Linux inotify backend runs in poll mode (caller drives the -/// event loop via poll_fd / handle_read_ready). False on all other platforms -/// and on Linux when the `linux_read_thread` build option is set. -pub const linux_poll_mode = builtin.os.tag == .linux and !build_options.linux_read_thread; +pub const EventType = types.EventType; +pub const ObjectType = types.ObjectType; +pub const Error = types.Error; +pub const ReadableStatus = types.ReadableStatus; +pub const linux_poll_mode = types.linux_poll_mode; +pub const Handler = types.Handler; /// True if the current backend detects file content modifications in real time. /// False only when kqueue_dir_only=true, where directory-level watches are used /// and file writes do not trigger a directory NOTE_WRITE event. pub const detects_file_modifications = Backend.detects_file_modifications; -pub const Handler = struct { - vtable: *const VTable, - - pub const VTable = struct { - change: *const fn (handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void, - rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void, - /// Only present in Linux poll mode (linux_poll_mode == true). - wait_readable: if (linux_poll_mode) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void, - }; - - fn change(handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void { - return handler.vtable.change(handler, path, event_type, object_type); - } - - fn rename(handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void { - return handler.vtable.rename(handler, src_path, dst_path, object_type); - } - - fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus { - if (comptime linux_poll_mode) { - return handler.vtable.wait_readable(handler); - } else { - unreachable; - } - } -}; - -pub const ReadableStatus = enum { - // TODO: is_readable, // backend may now read from fd (blocking mode) - will_notify, // backend must wait for a handle_read_ready call +const Backend = switch (builtin.os.tag) { + .linux => @import("backend/INotify.zig"), + .macos => if (build_options.macos_fsevents) @import("backend/FSEvents.zig") else if (build_options.kqueue_dir_only) @import("backend/KQueueDir.zig") else @import("backend/KQueue.zig"), + .freebsd, .openbsd, .netbsd, .dragonfly => if (build_options.kqueue_dir_only) @import("backend/KQueueDir.zig") else @import("backend/KQueue.zig"), + .windows => @import("backend/Windows.zig"), + else => @compileError("file_watcher: unsupported OS"), }; allocator: std.mem.Allocator, @@ -181,1507 +137,3 @@ fn recurse_watch(backend: *Backend, allocator: std.mem.Allocator, dir_path: []co recurse_watch(backend, allocator, sub); } } - -const Backend = switch (builtin.os.tag) { - .linux => INotifyBackend, - .macos => if (build_options.macos_fsevents) FSEventsBackend else if (build_options.kqueue_dir_only) KQueueDirBackend else KQueueBackend, - .freebsd, .openbsd, .netbsd, .dragonfly => if (build_options.kqueue_dir_only) KQueueDirBackend else KQueueBackend, - .windows => WindowsBackend, - else => @compileError("file_watcher: unsupported OS"), -}; - -const INotifyBackend = struct { - const watches_recursively = false; - const detects_file_modifications = true; - - const PendingRename = struct { - cookie: u32, - path: []u8, // owned - object_type: ObjectType, - }; - - handler: *Handler, - inotify_fd: std.posix.fd_t, - watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path - pending_renames: std.ArrayListUnmanaged(PendingRename), - // Used only in linux_read_thread mode: - stop_pipe: if (build_options.linux_read_thread) [2]std.posix.fd_t else void, - thread: if (build_options.linux_read_thread) ?std.Thread else void, - - const IN = std.os.linux.IN; - - const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY | - IN.MOVED_FROM | IN.MOVED_TO | IN.DELETE_SELF | - IN.MOVE_SELF | IN.CLOSE_WRITE; - - const in_flags: std.os.linux.O = .{ .NONBLOCK = true }; - - fn init(handler: *Handler) !@This() { - const inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags)); - errdefer std.posix.close(inotify_fd); - if (comptime build_options.linux_read_thread) { - const stop_pipe = try std.posix.pipe(); - return .{ - .handler = handler, - .inotify_fd = inotify_fd, - .watches = .empty, - .pending_renames = .empty, - .stop_pipe = stop_pipe, - .thread = null, - }; - } else { - return .{ - .handler = handler, - .inotify_fd = inotify_fd, - .watches = .empty, - .pending_renames = .empty, - .stop_pipe = {}, - .thread = {}, - }; - } - } - - fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - if (comptime build_options.linux_read_thread) { - // Signal thread to stop and wait for it to exit. - _ = std.posix.write(self.stop_pipe[1], "x") catch {}; - if (self.thread) |t| t.join(); - std.posix.close(self.stop_pipe[0]); - std.posix.close(self.stop_pipe[1]); - } - var it = self.watches.iterator(); - while (it.next()) |entry| allocator.free(entry.value_ptr.*); - self.watches.deinit(allocator); - for (self.pending_renames.items) |r| allocator.free(r.path); - self.pending_renames.deinit(allocator); - std.posix.close(self.inotify_fd); - } - - fn arm(self: *@This(), allocator: std.mem.Allocator) error{HandlerFailed}!void { - if (comptime build_options.linux_read_thread) { - if (self.thread != null) return; // already running - self.thread = std.Thread.spawn(.{}, thread_fn, .{ self, allocator }) catch return error.HandlerFailed; - } else { - return switch (self.handler.wait_readable() catch |e| switch (e) { - error.HandlerFailed => |e_| return e_, - }) { - .will_notify => {}, - }; - } - } - - fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { - var pfds = [_]std.posix.pollfd{ - .{ .fd = self.inotify_fd, .events = std.posix.POLL.IN, .revents = 0 }, - .{ .fd = self.stop_pipe[0], .events = std.posix.POLL.IN, .revents = 0 }, - }; - while (true) { - _ = std.posix.poll(&pfds, -1) catch return; - if (pfds[1].revents & std.posix.POLL.IN != 0) return; // stop signal - if (pfds[0].revents & std.posix.POLL.IN != 0) { - self.handle_read_ready(allocator) catch return; - } - } - } - - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void { - const path_z = try allocator.dupeZ(u8, path); - defer allocator.free(path_z); - const wd = std.os.linux.inotify_add_watch(self.inotify_fd, path_z, watch_mask); - switch (std.posix.errno(wd)) { - .SUCCESS => {}, - else => |e| { - std.log.err("nightwatch.add_watch failed: {t}", .{e}); - return error.WatchFailed; - }, - } - const owned_path = try allocator.dupe(u8, path); - errdefer allocator.free(owned_path); - const result = try self.watches.getOrPut(allocator, @intCast(wd)); - if (result.found_existing) allocator.free(result.value_ptr.*); - result.value_ptr.* = owned_path; - } - - fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - var it = self.watches.iterator(); - while (it.next()) |entry| { - if (!std.mem.eql(u8, entry.value_ptr.*, path)) continue; - _ = std.os.linux.inotify_rm_watch(self.inotify_fd, entry.key_ptr.*); - allocator.free(entry.value_ptr.*); - self.watches.removeByPtr(entry.key_ptr); - return; - } - } - - fn has_watch_for_path(self: *const @This(), path: []const u8) bool { - var it = self.watches.iterator(); - while (it.next()) |entry| { - if (std.mem.eql(u8, entry.value_ptr.*, path)) return true; - } - return false; - } - - fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ NoSpaceLeft, OutOfMemory, HandlerFailed })!void { - const InotifyEvent = extern struct { - wd: i32, - mask: u32, - cookie: u32, - len: u32, - }; - - var buf: [65536]u8 align(@alignOf(InotifyEvent)) = undefined; - defer { - // Any unpaired MOVED_FROM means the file was moved out of the watched tree. - for (self.pending_renames.items) |r| { - self.handler.change(r.path, EventType.deleted, r.object_type) catch {}; - allocator.free(r.path); - } - self.pending_renames.clearRetainingCapacity(); - } - - while (true) { - const n = std.posix.read(self.inotify_fd, &buf) catch |e| switch (e) { - error.WouldBlock => { - // re-arm the file_discriptor - try self.arm(allocator); - break; - }, - else => return e, - }; - var offset: usize = 0; - while (offset + @sizeOf(InotifyEvent) <= n) { - const ev: *const InotifyEvent = @ptrCast(@alignCast(buf[offset..].ptr)); - const name_offset = offset + @sizeOf(InotifyEvent); - offset = name_offset + ev.len; - const watched_path = self.watches.get(ev.wd) orelse continue; - const name: []const u8 = if (ev.len > 0) - std.mem.sliceTo(buf[name_offset..][0..ev.len], 0) - else - ""; - - var full_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path: []const u8 = if (name.len > 0) - try std.fmt.bufPrint(&full_buf, "{s}/{s}", .{ watched_path, name }) - else - watched_path; - - if (ev.mask & IN.MOVED_FROM != 0) { - // Park it, we may receive a paired MOVED_TO with the same cookie. - try self.pending_renames.append(allocator, .{ - .cookie = ev.cookie, - .path = try allocator.dupe(u8, full_path), - .object_type = if (ev.mask & IN.ISDIR != 0) .dir else .file, - }); - } else if (ev.mask & IN.MOVED_TO != 0) { - // Look for a paired MOVED_FROM. - var found: ?usize = null; - for (self.pending_renames.items, 0..) |r, i| { - if (r.cookie == ev.cookie) { - found = i; - break; - } - } - if (found) |i| { - // Complete rename pair: emit a single atomic rename message. - const r = self.pending_renames.swapRemove(i); - defer allocator.free(r.path); - try self.handler.rename(r.path, full_path, r.object_type); - } else { - // No paired MOVED_FROM, file was moved in from outside the watched tree. - const ot: ObjectType = if (ev.mask & IN.ISDIR != 0) .dir else .file; - try self.handler.change(full_path, EventType.created, ot); - } - } else if (ev.mask & IN.MOVE_SELF != 0) { - // The watched directory itself was renamed/moved away. - try self.handler.change(full_path, EventType.deleted, .dir); - } else { - const is_dir = ev.mask & IN.ISDIR != 0; - const object_type: ObjectType = if (is_dir) .dir else .file; - const event_type: EventType = if (ev.mask & IN.CREATE != 0) - .created - else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) blk: { - // Suppress IN_DELETE|IN_ISDIR for subdirs that have their - // own watch: IN_DELETE_SELF on that watch will fire the - // same path without duplication. - if (is_dir and self.has_watch_for_path(full_path)) - continue; - break :blk .deleted; - } else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0) - .modified - else - continue; - try self.handler.change(full_path, event_type, object_type); - } - } - } - } -}; - -const FSEventsBackend = struct { - const watches_recursively = true; // FSEventStreamCreate watches the entire subtree - const detects_file_modifications = true; - - handler: *Handler, - stream: ?*anyopaque, // FSEventStreamRef - queue: ?*anyopaque, // dispatch_queue_t - ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped - watches: std.StringArrayHashMapUnmanaged(void), // owned paths - - const threaded = false; // callback fires on GCD thread; no FW_event needed - - const kFSEventStreamCreateFlagNoDefer: u32 = 0x00000002; - const kFSEventStreamCreateFlagFileEvents: u32 = 0x00000010; - const kFSEventStreamEventFlagItemCreated: u32 = 0x00000100; - const kFSEventStreamEventFlagItemRemoved: u32 = 0x00000200; - const kFSEventStreamEventFlagItemRenamed: u32 = 0x00000800; - const kFSEventStreamEventFlagItemModified: u32 = 0x00001000; - const kFSEventStreamEventFlagItemIsDir: u32 = 0x00020000; - const kFSEventStreamEventIdSinceNow: u64 = 0xFFFFFFFFFFFFFFFF; - const kCFStringEncodingUTF8: u32 = 0x08000100; - - // Mirror of FSEventStreamContext (Apple SDK struct; version must be 0). - const FSEventStreamContext = extern struct { - version: isize = 0, - info: ?*anyopaque = null, - retain: ?*anyopaque = null, - release: ?*anyopaque = null, - copy_description: ?*anyopaque = null, - }; - - const cf = struct { - pub extern "c" fn CFStringCreateWithBytesNoCopy( - alloc: ?*anyopaque, - bytes: [*]const u8, - numBytes: isize, - encoding: u32, - isExternalRepresentation: u8, - contentsDeallocator: ?*anyopaque, - ) ?*anyopaque; - pub extern "c" fn CFArrayCreate( - allocator: ?*anyopaque, - values: [*]const ?*anyopaque, - numValues: isize, - callBacks: ?*anyopaque, - ) ?*anyopaque; - pub extern "c" fn CFRelease(cf: *anyopaque) void; - pub extern "c" fn FSEventStreamCreate( - allocator: ?*anyopaque, - callback: *const anyopaque, - context: *const FSEventStreamContext, - pathsToWatch: *anyopaque, - sinceWhen: u64, - latency: f64, - flags: u32, - ) ?*anyopaque; - pub extern "c" fn FSEventStreamSetDispatchQueue(stream: *anyopaque, queue: *anyopaque) void; - pub extern "c" fn FSEventStreamStart(stream: *anyopaque) u8; - pub extern "c" fn FSEventStreamStop(stream: *anyopaque) void; - pub extern "c" fn FSEventStreamInvalidate(stream: *anyopaque) void; - pub extern "c" fn FSEventStreamRelease(stream: *anyopaque) void; - pub extern "c" fn dispatch_queue_create(label: [*:0]const u8, attr: ?*anyopaque) *anyopaque; - pub extern "c" fn dispatch_release(obj: *anyopaque) void; - pub extern "c" var kCFAllocatorNull: *anyopaque; - }; - - const CallbackContext = struct { - handler: *Handler, - // Snapshot of the watched root paths at arm() time, used to filter out - // spurious events for the root directories themselves that FSEvents - // sometimes delivers as historical events at stream start. - watched_roots: []const []const u8, // owned slice of owned strings - }; - - fn init(handler: *Handler) error{}!@This() { - return .{ - .handler = handler, - .stream = null, - .queue = null, - .ctx = null, - .watches = .empty, - }; - } - - fn stop_stream(self: *@This(), allocator: std.mem.Allocator) void { - if (self.stream) |s| { - cf.FSEventStreamStop(s); - cf.FSEventStreamInvalidate(s); - cf.FSEventStreamRelease(s); - self.stream = null; - } - if (self.queue) |q| { - cf.dispatch_release(q); - self.queue = null; - } - if (self.ctx) |c| { - for (c.watched_roots) |r| allocator.free(r); - allocator.free(c.watched_roots); - allocator.destroy(c); - self.ctx = null; - } - } - - fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - self.stop_stream(allocator); - var it = self.watches.iterator(); - while (it.next()) |entry| allocator.free(entry.key_ptr.*); - self.watches.deinit(allocator); - } - - fn arm(self: *@This(), allocator: std.mem.Allocator) error{ OutOfMemory, ArmFailed }!void { - if (self.stream != null) return; - if (self.watches.count() == 0) return; // no paths yet; will arm on first add_watch - - var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; - defer cf_strings.deinit(allocator); - var it = self.watches.iterator(); - while (it.next()) |entry| { - const path = entry.key_ptr.*; - const s = cf.CFStringCreateWithBytesNoCopy( - null, - path.ptr, - @intCast(path.len), - kCFStringEncodingUTF8, - 0, - cf.kCFAllocatorNull, - ) orelse continue; - cf_strings.append(allocator, s) catch { - cf.CFRelease(s); - break; - }; - } - defer for (cf_strings.items) |s| cf.CFRelease(s.?); - - const paths_array = cf.CFArrayCreate( - null, - cf_strings.items.ptr, - @intCast(cf_strings.items.len), - null, - ) orelse return error.ArmFailed; - defer cf.CFRelease(paths_array); - - // Snapshot watched root paths so the callback can filter them out. - const roots = try allocator.alloc([]const u8, self.watches.count()); - errdefer allocator.free(roots); - var ri: usize = 0; - errdefer for (roots[0..ri]) |r| allocator.free(r); - var wit2 = self.watches.iterator(); - while (wit2.next()) |entry| { - roots[ri] = try allocator.dupe(u8, entry.key_ptr.*); - ri += 1; - } - - const ctx = try allocator.create(CallbackContext); - errdefer allocator.destroy(ctx); - ctx.* = .{ .handler = self.handler, .watched_roots = roots }; - - // FSEventStreamCreate copies the context struct; stack allocation is fine. - const stream_ctx = FSEventStreamContext{ .version = 0, .info = ctx }; - const stream = cf.FSEventStreamCreate( - null, - @ptrCast(&callback), - &stream_ctx, - paths_array, - kFSEventStreamEventIdSinceNow, - 0.1, - kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, - ) orelse return error.ArmFailed; - errdefer cf.FSEventStreamRelease(stream); - - const queue = cf.dispatch_queue_create("flow.file_watcher", null); - cf.FSEventStreamSetDispatchQueue(stream, queue); - _ = cf.FSEventStreamStart(stream); - - self.stream = stream; - self.queue = queue; - self.ctx = ctx; - } - - fn callback( - _: *anyopaque, - info: ?*anyopaque, - num_events: usize, - event_paths: *anyopaque, - event_flags: [*]const u32, - _: [*]const u64, - ) callconv(.c) void { - const ctx: *CallbackContext = @ptrCast(@alignCast(info orelse return)); - const paths: [*][*:0]const u8 = @ptrCast(@alignCast(event_paths)); - outer: for (0..num_events) |i| { - const path = std.mem.sliceTo(paths[i], 0); - const flags = event_flags[i]; - - // Skip events for the watched root dirs themselves; FSEvents often - // delivers spurious historical events for them at stream start. - for (ctx.watched_roots) |root| { - if (std.mem.eql(u8, path, root)) continue :outer; - } - - // FSEvents coalesces operations, so multiple flags may be set on - // a single event. Emit one change call per applicable flag so - // callers see all relevant event types (e.g. created + modified). - const ot: ObjectType = if (flags & kFSEventStreamEventFlagItemIsDir != 0) .dir else .file; - if (flags & kFSEventStreamEventFlagItemCreated != 0) { - ctx.handler.change(path, .created, ot) catch {}; - } - if (flags & kFSEventStreamEventFlagItemRemoved != 0) { - ctx.handler.change(path, .deleted, ot) catch {}; - } - if (flags & kFSEventStreamEventFlagItemRenamed != 0) { - ctx.handler.change(path, .renamed, ot) catch {}; - } - if (flags & kFSEventStreamEventFlagItemModified != 0) { - ctx.handler.change(path, .modified, ot) catch {}; - } - } - } - - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void { - if (self.watches.contains(path)) return; - const owned = try allocator.dupe(u8, path); - errdefer allocator.free(owned); - try self.watches.put(allocator, owned, {}); - self.stop_stream(allocator); - self.arm(allocator) catch {}; - } - - fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - if (self.watches.fetchSwapRemove(path)) |entry| allocator.free(entry.key); - self.stop_stream(allocator); - self.arm(allocator) catch {}; - } -}; - -const KQueueBackend = struct { - const watches_recursively = false; - const detects_file_modifications = true; - - handler: *Handler, - kq: std.posix.fd_t, - shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread - thread: ?std.Thread, - watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned dir path -> fd - watches_mutex: std.Thread.Mutex, - file_watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned file path -> fd - file_watches_mutex: std.Thread.Mutex, - // Per-directory snapshots of filenames, used to diff on NOTE_WRITE. - // Key: owned dir path (same as watches key), value: set of owned filenames. - // Accessed from both the main thread (add_watch) and the background thread (scan_dir). - snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), - snapshots_mutex: std.Thread.Mutex, - - const EVFILT_VNODE: i16 = -4; - const EVFILT_READ: i16 = -1; - const EV_ADD: u16 = 0x0001; - const EV_ENABLE: u16 = 0x0004; - const EV_CLEAR: u16 = 0x0020; - const EV_DELETE: u16 = 0x0002; - const NOTE_DELETE: u32 = 0x00000001; - const NOTE_WRITE: u32 = 0x00000002; - const NOTE_EXTEND: u32 = 0x00000004; - const NOTE_ATTRIB: u32 = 0x00000008; - const NOTE_RENAME: u32 = 0x00000020; - - fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { - // Per-file kqueue watches require one open fd per watched file. Bump - // the soft NOFILE limit to the hard limit so large directory trees don't - // exhaust the default quota (256 on macOS, 1024 on many FreeBSD installs). - if (std.posix.getrlimit(.NOFILE)) |rl| { - if (rl.cur < rl.max) - std.posix.setrlimit(.NOFILE, .{ .cur = rl.max, .max = rl.max }) catch {}; - } else |_| {} - const kq = try std.posix.kqueue(); - errdefer std.posix.close(kq); - const pipe = try std.posix.pipe(); - errdefer { - std.posix.close(pipe[0]); - std.posix.close(pipe[1]); - } - // Register the read end of the shutdown pipe with kqueue so the thread - // wakes up when we want to shut down. - const shutdown_kev = std.posix.Kevent{ - .ident = @intCast(pipe[0]), - .filter = EVFILT_READ, - .flags = EV_ADD | EV_ENABLE, - .fflags = 0, - .data = 0, - .udata = 0, - }; - _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); - return .{ - .handler = handler, - .kq = kq, - .shutdown_pipe = pipe, - .thread = null, - .watches = .empty, - .watches_mutex = .{}, - .file_watches = .empty, - .file_watches_mutex = .{}, - .snapshots = .empty, - .snapshots_mutex = .{}, - }; - } - - fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - // Signal the thread to exit by writing to the shutdown pipe. - _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; - if (self.thread) |t| t.join(); - std.posix.close(self.shutdown_pipe[0]); - std.posix.close(self.shutdown_pipe[1]); - var it = self.watches.iterator(); - while (it.next()) |entry| { - std.posix.close(entry.value_ptr.*); - allocator.free(entry.key_ptr.*); - } - self.watches.deinit(allocator); - var fit = self.file_watches.iterator(); - while (fit.next()) |entry| { - std.posix.close(entry.value_ptr.*); - allocator.free(entry.key_ptr.*); - } - self.file_watches.deinit(allocator); - var sit = self.snapshots.iterator(); - while (sit.next()) |entry| { - // Keys are borrowed from self.watches and freed in the watches loop above. - var names = entry.value_ptr.*; - var nit = names.iterator(); - while (nit.next()) |ne| allocator.free(ne.key_ptr.*); - names.deinit(allocator); - } - self.snapshots.deinit(allocator); - std.posix.close(self.kq); - } - - fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { - if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); - } - - fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { - var events: [64]std.posix.Kevent = undefined; - while (true) { - // Block indefinitely until kqueue has events. - const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; - for (events[0..n]) |ev| { - if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit - if (ev.filter != EVFILT_VNODE) continue; - const fd: std.posix.fd_t = @intCast(ev.ident); - - // Check if this is a file watch: NOTE_WRITE/NOTE_EXTEND → modified. - self.file_watches_mutex.lock(); - var fwit = self.file_watches.iterator(); - const file_path: ?[]const u8 = while (fwit.next()) |entry| { - if (entry.value_ptr.* == fd) break entry.key_ptr.*; - } else null; - self.file_watches_mutex.unlock(); - if (file_path) |fp| { - if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) - self.handler.change(fp, EventType.modified, .file) catch return; - continue; - } - - // Otherwise look up the directory path for this fd. - self.watches_mutex.lock(); - var wit = self.watches.iterator(); - const dir_path: ?[]const u8 = while (wit.next()) |entry| { - if (entry.value_ptr.* == fd) break entry.key_ptr.*; - } else null; - self.watches_mutex.unlock(); - if (dir_path == null) continue; - if (ev.fflags & NOTE_DELETE != 0) { - self.handler.change(dir_path.?, EventType.deleted, .dir) catch return; - } else if (ev.fflags & NOTE_RENAME != 0) { - self.handler.change(dir_path.?, EventType.renamed, .dir) catch return; - } else if (ev.fflags & NOTE_WRITE != 0) { - self.scan_dir(allocator, dir_path.?) catch {}; - } - } - } - } - - // Scan a directory and diff against the snapshot, emitting created/deleted events. - fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { - var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; - defer dir.close(); - - // Arena for all temporaries — freed in one shot at the end. - var arena = std.heap.ArenaAllocator.init(allocator); - defer arena.deinit(); - const tmp = arena.allocator(); - - // Collect current files and subdirectories (no lock, reading filesystem only). - var current_files: std.StringHashMapUnmanaged(void) = .empty; - var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; - var iter = dir.iterate(); - while (try iter.next()) |entry| { - switch (entry.kind) { - .file => { - const name = try tmp.dupe(u8, entry.name); - try current_files.put(tmp, name, {}); - }, - .directory => { - const name = try tmp.dupe(u8, entry.name); - try current_dirs.append(tmp, name); - }, - else => {}, - } - } - - // Diff against snapshot under the lock; collect events to emit after releasing it. - // to_create / to_delete hold borrowed pointers into the snapshot (which uses - // allocator, not tmp); only the list metadata itself uses tmp. - var to_create: std.ArrayListUnmanaged([]const u8) = .empty; - var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; - var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; - - self.snapshots_mutex.lock(); - { - for (current_dirs.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - if (!self.snapshots.contains(full_path)) { - const owned = tmp.dupe(u8, full_path) catch continue; - new_dirs.append(tmp, owned) catch continue; - } - } - - const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { - self.snapshots_mutex.unlock(); - return e; - }; - if (!gop.found_existing) gop.value_ptr.* = .empty; - const snapshot = gop.value_ptr; - - var cit = current_files.iterator(); - while (cit.next()) |entry| { - if (snapshot.contains(entry.key_ptr.*)) continue; - const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { - self.snapshots_mutex.unlock(); - return e; - }; - snapshot.put(allocator, owned, {}) catch |e| { - allocator.free(owned); - self.snapshots_mutex.unlock(); - return e; - }; - try to_create.append(tmp, owned); - } - - var sit = snapshot.iterator(); - while (sit.next()) |entry| { - if (current_files.contains(entry.key_ptr.*)) continue; - try to_delete.append(tmp, entry.key_ptr.*); - } - for (to_delete.items) |name| _ = snapshot.fetchRemove(name); - } - self.snapshots_mutex.unlock(); - - // Emit all events outside the lock so handlers may safely call watch()/unwatch(). - // Emit created dirs, then deletions, then creations. Deletions first ensures that - // a rename (old disappears, new appears) reports the source path before the dest. - for (new_dirs.items) |full_path| - try self.handler.change(full_path, EventType.created, .dir); - for (to_delete.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { - allocator.free(name); - continue; - }; - self.deregister_file_watch(allocator, full_path); - try self.handler.change(full_path, EventType.deleted, .file); - allocator.free(name); // snapshot key, owned by allocator - } - for (to_create.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - self.register_file_watch(allocator, full_path); - try self.handler.change(full_path, EventType.created, .file); - } - // arena.deinit() frees current_files, current_dirs, new_dirs, and list metadata - } - - fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - self.file_watches_mutex.lock(); - const already = self.file_watches.contains(path); - self.file_watches_mutex.unlock(); - if (already) return; - const fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return; - const kev = std.posix.Kevent{ - .ident = @intCast(fd), - .filter = EVFILT_VNODE, - .flags = EV_ADD | EV_ENABLE | EV_CLEAR, - .fflags = NOTE_WRITE | NOTE_EXTEND, - .data = 0, - .udata = 0, - }; - _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch { - std.posix.close(fd); - return; - }; - const owned = allocator.dupe(u8, path) catch { - std.posix.close(fd); - return; - }; - self.file_watches_mutex.lock(); - self.file_watches.put(allocator, owned, fd) catch { - self.file_watches_mutex.unlock(); - std.posix.close(fd); - allocator.free(owned); - return; - }; - self.file_watches_mutex.unlock(); - } - - fn deregister_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - self.file_watches_mutex.lock(); - const kv = self.file_watches.fetchRemove(path); - self.file_watches_mutex.unlock(); - if (kv) |entry| { - std.posix.close(entry.value); - allocator.free(entry.key); - } - } - - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { - self.watches_mutex.lock(); - const already = self.watches.contains(path); - self.watches_mutex.unlock(); - if (already) return; - const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { - error.AccessDenied, - error.PermissionDenied, - error.PathAlreadyExists, - error.SymLinkLoop, - error.NameTooLong, - error.FileNotFound, - error.SystemResources, - error.NoSpaceLeft, - error.NotDir, - error.InvalidUtf8, - error.InvalidWtf8, - error.BadPathName, - error.NoDevice, - error.NetworkNotFound, - error.Unexpected, - error.ProcessFdQuotaExceeded, - error.SystemFdQuotaExceeded, - error.ProcessNotFound, - error.FileTooBig, - error.IsDir, - error.DeviceBusy, - error.FileLocksNotSupported, - error.FileBusy, - error.WouldBlock, - => |e_| { - std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); - return error.WatchFailed; - }, - }; - errdefer std.posix.close(path_fd); - const kev = std.posix.Kevent{ - .ident = @intCast(path_fd), - .filter = EVFILT_VNODE, - .flags = EV_ADD | EV_ENABLE | EV_CLEAR, - .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, - .data = 0, - .udata = 0, - }; - _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { - error.AccessDenied, - error.SystemResources, - error.EventNotFound, - error.ProcessNotFound, - error.Overflow, - => |e_| { - std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); - return error.WatchFailed; - }, - }; - const owned_path = try allocator.dupe(u8, path); - self.watches_mutex.lock(); - self.watches.put(allocator, owned_path, path_fd) catch |e| { - self.watches_mutex.unlock(); - allocator.free(owned_path); - return e; - }; - self.watches_mutex.unlock(); - // Take initial snapshot so first NOTE_WRITE has a baseline to diff against. - self.take_snapshot(allocator, owned_path) catch |e| switch (e) { - error.AccessDenied, - error.PermissionDenied, - error.SystemResources, - error.InvalidUtf8, - error.Unexpected, - => |e_| { - std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); - return error.WatchFailed; - }, - error.OutOfMemory => return error.OutOfMemory, - }; - } - - fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { - var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; - defer dir.close(); - // Collect file names first so we can register file watches without holding the lock. - var names: std.ArrayListUnmanaged([]u8) = .empty; - defer { - for (names.items) |n| allocator.free(n); - names.deinit(allocator); - } - var iter = dir.iterate(); - while (try iter.next()) |entry| { - if (entry.kind != .file) continue; - try names.append(allocator, try allocator.dupe(u8, entry.name)); - } - self.snapshots_mutex.lock(); - const gop = try self.snapshots.getOrPut(allocator, dir_path); - if (!gop.found_existing) gop.value_ptr.* = .empty; - var snapshot = gop.value_ptr; - for (names.items) |name| { - if (snapshot.contains(name)) continue; - const owned = try allocator.dupe(u8, name); - try snapshot.put(allocator, owned, {}); - } - self.snapshots_mutex.unlock(); - // Register a kqueue watch for each existing file so writes are detected. - for (names.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - self.register_file_watch(allocator, full_path); - } - } - - fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - self.watches_mutex.lock(); - const watches_entry = self.watches.fetchRemove(path); - self.watches_mutex.unlock(); - if (watches_entry) |entry| { - std.posix.close(entry.value); - allocator.free(entry.key); - } - if (self.snapshots.fetchRemove(path)) |entry| { - var names = entry.value; - var it = names.iterator(); - while (it.next()) |ne| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ path, ne.key_ptr.* }) catch { - allocator.free(ne.key_ptr.*); - continue; - }; - self.deregister_file_watch(allocator, full_path); - allocator.free(ne.key_ptr.*); - } - names.deinit(allocator); - } - } -}; - -const KQueueDirBackend = struct { - const watches_recursively = false; - const detects_file_modifications = false; - const WatchEntry = struct { fd: std.posix.fd_t, is_file: bool }; - - handler: *Handler, - kq: std.posix.fd_t, - shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread - thread: ?std.Thread, - watches: std.StringHashMapUnmanaged(WatchEntry), // owned path -> {fd, is_file} - watches_mutex: std.Thread.Mutex, - // Per-directory snapshots: owned filename -> mtime_ns. - // Used to diff on NOTE_WRITE: detects creates, deletes, and (opportunistically) - // modifications when the same directory fires another event later. - // Key: owned dir path (same as watches key), value: map of owned filename -> mtime_ns. - snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(i128)), - snapshots_mutex: std.Thread.Mutex, - - const EVFILT_VNODE: i16 = -4; - const EVFILT_READ: i16 = -1; - const EV_ADD: u16 = 0x0001; - const EV_ENABLE: u16 = 0x0004; - const EV_CLEAR: u16 = 0x0020; - const EV_DELETE: u16 = 0x0002; - const NOTE_DELETE: u32 = 0x00000001; - const NOTE_WRITE: u32 = 0x00000002; - const NOTE_EXTEND: u32 = 0x00000004; - const NOTE_ATTRIB: u32 = 0x00000008; - const NOTE_RENAME: u32 = 0x00000020; - - fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { - const kq = try std.posix.kqueue(); - errdefer std.posix.close(kq); - const pipe = try std.posix.pipe(); - errdefer { - std.posix.close(pipe[0]); - std.posix.close(pipe[1]); - } - // Register the read end of the shutdown pipe with kqueue so the thread - // wakes up when we want to shut down. - const shutdown_kev = std.posix.Kevent{ - .ident = @intCast(pipe[0]), - .filter = EVFILT_READ, - .flags = EV_ADD | EV_ENABLE, - .fflags = 0, - .data = 0, - .udata = 0, - }; - _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); - return .{ - .handler = handler, - .kq = kq, - .shutdown_pipe = pipe, - .thread = null, - .watches = .empty, - .watches_mutex = .{}, - .snapshots = .empty, - .snapshots_mutex = .{}, - }; - } - - fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - // Signal the thread to exit by writing to the shutdown pipe. - _ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {}; - if (self.thread) |t| t.join(); - std.posix.close(self.shutdown_pipe[0]); - std.posix.close(self.shutdown_pipe[1]); - var it = self.watches.iterator(); - while (it.next()) |entry| { - std.posix.close(entry.value_ptr.*.fd); - allocator.free(entry.key_ptr.*); - } - self.watches.deinit(allocator); - var sit = self.snapshots.iterator(); - while (sit.next()) |entry| { - // Keys are borrowed from self.watches and freed in the watches loop above. - var snap = entry.value_ptr.*; - var nit = snap.iterator(); - while (nit.next()) |ne| allocator.free(ne.key_ptr.*); - snap.deinit(allocator); - } - self.snapshots.deinit(allocator); - std.posix.close(self.kq); - } - - fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { - if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator }); - } - - fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void { - var events: [64]std.posix.Kevent = undefined; - while (true) { - // Block indefinitely until kqueue has events. - const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break; - for (events[0..n]) |ev| { - if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit - if (ev.filter != EVFILT_VNODE) continue; - const fd: std.posix.fd_t = @intCast(ev.ident); - - self.watches_mutex.lock(); - var wit = self.watches.iterator(); - var watch_path: ?[]const u8 = null; - var is_file: bool = false; - while (wit.next()) |entry| { - if (entry.value_ptr.*.fd == fd) { - watch_path = entry.key_ptr.*; - is_file = entry.value_ptr.*.is_file; - break; - } - } - self.watches_mutex.unlock(); - if (watch_path == null) continue; - if (is_file) { - // Explicit file watch: emit events with .file type directly. - if (ev.fflags & NOTE_DELETE != 0) { - self.handler.change(watch_path.?, EventType.deleted, .file) catch return; - } else if (ev.fflags & NOTE_RENAME != 0) { - self.handler.change(watch_path.?, EventType.renamed, .file) catch return; - } else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0) { - self.handler.change(watch_path.?, EventType.modified, .file) catch return; - } - } else { - if (ev.fflags & NOTE_DELETE != 0) { - self.handler.change(watch_path.?, EventType.deleted, .dir) catch return; - } else if (ev.fflags & NOTE_RENAME != 0) { - self.handler.change(watch_path.?, EventType.renamed, .dir) catch return; - } else if (ev.fflags & NOTE_WRITE != 0) { - self.scan_dir(allocator, watch_path.?) catch {}; - } - } - } - } - } - - // Scan a directory and diff against the snapshot, emitting created/deleted/modified events. - // File modifications are detected opportunistically via mtime changes: if a file was - // written before a NOTE_WRITE fires for another reason (create/delete/rename of a sibling), - // the mtime diff will catch it. - fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { - var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; - defer dir.close(); - - // Arena for all temporaries — freed in one shot at the end. - var arena = std.heap.ArenaAllocator.init(allocator); - defer arena.deinit(); - const tmp = arena.allocator(); - - // Collect current files (name → mtime_ns) and subdirectories. - // No lock held while doing filesystem I/O. - var current_files: std.StringHashMapUnmanaged(i128) = .empty; - var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; - var iter = dir.iterate(); - while (try iter.next()) |entry| { - switch (entry.kind) { - .file => { - const mtime = (dir.statFile(entry.name) catch continue).mtime; - const name = try tmp.dupe(u8, entry.name); - try current_files.put(tmp, name, mtime); - }, - .directory => { - const name = try tmp.dupe(u8, entry.name); - try current_dirs.append(tmp, name); - }, - else => {}, - } - } - - // Diff against snapshot under the lock; collect events to emit after releasing it. - // to_create / to_delete / to_modify borrow pointers from the snapshot (allocator), - // only list metadata uses tmp. - var to_create: std.ArrayListUnmanaged([]const u8) = .empty; - var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; - var to_modify: std.ArrayListUnmanaged([]const u8) = .empty; - var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; - - self.snapshots_mutex.lock(); - { - for (current_dirs.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - if (!self.snapshots.contains(full_path)) { - const owned = tmp.dupe(u8, full_path) catch continue; - new_dirs.append(tmp, owned) catch continue; - } - } - - const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| { - self.snapshots_mutex.unlock(); - return e; - }; - if (!gop.found_existing) gop.value_ptr.* = .empty; - const snapshot = gop.value_ptr; - - var cit = current_files.iterator(); - while (cit.next()) |entry| { - if (snapshot.getPtr(entry.key_ptr.*)) |stored_mtime| { - // File exists in both — check for modification via mtime change. - if (stored_mtime.* != entry.value_ptr.*) { - stored_mtime.* = entry.value_ptr.*; - try to_modify.append(tmp, entry.key_ptr.*); // borrow from current (tmp) - } - } else { - // New file — add to snapshot and to_create list. - const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { - self.snapshots_mutex.unlock(); - return e; - }; - snapshot.put(allocator, owned, entry.value_ptr.*) catch |e| { - allocator.free(owned); - self.snapshots_mutex.unlock(); - return e; - }; - try to_create.append(tmp, owned); // borrow from snapshot - } - } - - var sit = snapshot.iterator(); - while (sit.next()) |entry| { - if (current_files.contains(entry.key_ptr.*)) continue; - try to_delete.append(tmp, entry.key_ptr.*); // borrow from snapshot - } - for (to_delete.items) |name| _ = snapshot.fetchRemove(name); - } - self.snapshots_mutex.unlock(); - - // Emit all events outside the lock so handlers may safely call watch()/unwatch(). - // Order: new dirs, deletions (source before dest for renames), creations, modifications. - for (new_dirs.items) |full_path| - try self.handler.change(full_path, EventType.created, .dir); - for (to_delete.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch { - allocator.free(name); - continue; - }; - try self.handler.change(full_path, EventType.deleted, .file); - allocator.free(name); // snapshot key, owned by allocator - } - for (to_create.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - try self.handler.change(full_path, EventType.created, .file); - } - for (to_modify.items) |name| { - var path_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - try self.handler.change(full_path, EventType.modified, .file); - } - // arena.deinit() frees current_files, current_dirs, new_dirs, and list metadata - } - - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { - self.watches_mutex.lock(); - const already = self.watches.contains(path); - self.watches_mutex.unlock(); - if (already) return; - const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { - error.AccessDenied, - error.PermissionDenied, - error.PathAlreadyExists, - error.SymLinkLoop, - error.NameTooLong, - error.FileNotFound, - error.SystemResources, - error.NoSpaceLeft, - error.NotDir, - error.InvalidUtf8, - error.InvalidWtf8, - error.BadPathName, - error.NoDevice, - error.NetworkNotFound, - error.Unexpected, - error.ProcessFdQuotaExceeded, - error.SystemFdQuotaExceeded, - error.ProcessNotFound, - error.FileTooBig, - error.IsDir, - error.DeviceBusy, - error.FileLocksNotSupported, - error.FileBusy, - error.WouldBlock, - => |e_| { - std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); - return error.WatchFailed; - }, - }; - errdefer std.posix.close(path_fd); - const kev = std.posix.Kevent{ - .ident = @intCast(path_fd), - .filter = EVFILT_VNODE, - .flags = EV_ADD | EV_ENABLE | EV_CLEAR, - .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, - .data = 0, - .udata = 0, - }; - _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { - error.AccessDenied, - error.SystemResources, - error.EventNotFound, - error.ProcessNotFound, - error.Overflow, - => |e_| { - std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); - return error.WatchFailed; - }, - }; - // Determine if the path is a regular file or a directory. - const stat = std.posix.fstat(path_fd) catch null; - const is_file = if (stat) |s| std.posix.S.ISREG(s.mode) else false; - const owned_path = try allocator.dupe(u8, path); - self.watches_mutex.lock(); - self.watches.put(allocator, owned_path, .{ .fd = path_fd, .is_file = is_file }) catch |e| { - self.watches_mutex.unlock(); - allocator.free(owned_path); - return e; - }; - self.watches_mutex.unlock(); - // For directory watches only: take initial snapshot so first NOTE_WRITE has a baseline. - if (!is_file) { - self.take_snapshot(allocator, owned_path) catch return error.OutOfMemory; - } - } - - fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { - var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; - defer dir.close(); - self.snapshots_mutex.lock(); - const gop = try self.snapshots.getOrPut(allocator, dir_path); - if (!gop.found_existing) gop.value_ptr.* = .empty; - const snapshot = gop.value_ptr; - var iter = dir.iterate(); - while (iter.next() catch null) |entry| { - if (entry.kind != .file) continue; - if (snapshot.contains(entry.name)) continue; - const mtime = (dir.statFile(entry.name) catch continue).mtime; - const owned = allocator.dupe(u8, entry.name) catch continue; - snapshot.put(allocator, owned, mtime) catch allocator.free(owned); - } - self.snapshots_mutex.unlock(); - } - - fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - self.watches_mutex.lock(); - const watches_entry = self.watches.fetchRemove(path); - self.watches_mutex.unlock(); - if (watches_entry) |entry| { - std.posix.close(entry.value.fd); - allocator.free(entry.key); - } - if (self.snapshots.fetchRemove(path)) |entry| { - var snap = entry.value; - var it = snap.iterator(); - while (it.next()) |ne| allocator.free(ne.key_ptr.*); - snap.deinit(allocator); - } - } -}; - -const WindowsBackend = struct { - const watches_recursively = true; // ReadDirectoryChangesW with bWatchSubtree=1 - const detects_file_modifications = true; - - const windows = std.os.windows; - - const win32 = struct { - pub extern "kernel32" fn CloseHandle(hObject: windows.HANDLE) callconv(.winapi) windows.BOOL; - pub extern "kernel32" fn ReadDirectoryChangesW( - hDirectory: windows.HANDLE, - lpBuffer: *anyopaque, - nBufferLength: windows.DWORD, - bWatchSubtree: windows.BOOL, - dwNotifyFilter: windows.DWORD, - lpBytesReturned: ?*windows.DWORD, - lpOverlapped: ?*windows.OVERLAPPED, - lpCompletionRoutine: ?*anyopaque, - ) callconv(.winapi) windows.BOOL; - pub extern "kernel32" fn GetQueuedCompletionStatus( - CompletionPort: windows.HANDLE, - lpNumberOfBytesTransferred: *windows.DWORD, - lpCompletionKey: *windows.ULONG_PTR, - lpOverlapped: *?*windows.OVERLAPPED, - dwMilliseconds: windows.DWORD, - ) callconv(.winapi) windows.BOOL; - pub extern "kernel32" fn CreateFileW( - lpFileName: [*:0]const windows.WCHAR, - dwDesiredAccess: windows.DWORD, - dwShareMode: windows.DWORD, - lpSecurityAttributes: ?*anyopaque, - dwCreationDisposition: windows.DWORD, - dwFlagsAndAttributes: windows.DWORD, - hTemplateFile: ?windows.HANDLE, - ) callconv(.winapi) windows.HANDLE; - pub extern "kernel32" fn PostQueuedCompletionStatus( - CompletionPort: windows.HANDLE, - dwNumberOfBytesTransferred: windows.DWORD, - dwCompletionKey: windows.ULONG_PTR, - lpOverlapped: ?*windows.OVERLAPPED, - ) callconv(.winapi) windows.BOOL; - pub extern "kernel32" fn GetFileAttributesW(lpFileName: [*:0]const windows.WCHAR) callconv(.winapi) windows.DWORD; - }; - - handler: *Handler, - iocp: windows.HANDLE, - thread: ?std.Thread, - watches: std.StringHashMapUnmanaged(Watch), - watches_mutex: std.Thread.Mutex, - path_types: std.StringHashMapUnmanaged(ObjectType), - - // A completion key of zero is used to signal the background thread to exit. - const SHUTDOWN_KEY: windows.ULONG_PTR = 0; - - const Watch = struct { - handle: windows.HANDLE, - buf: Buf, - overlapped: windows.OVERLAPPED, - path: []u8, // owned - }; - - const buf_size = 65536; - const Buf = []align(4) u8; - - const FILE_NOTIFY_INFORMATION = extern struct { - NextEntryOffset: windows.DWORD, - Action: windows.DWORD, - FileNameLength: windows.DWORD, - FileName: [1]windows.WCHAR, - }; - - const FILE_ACTION_ADDED: windows.DWORD = 1; - const FILE_ACTION_REMOVED: windows.DWORD = 2; - const FILE_ACTION_MODIFIED: windows.DWORD = 3; - const FILE_ACTION_RENAMED_OLD_NAME: windows.DWORD = 4; - const FILE_ACTION_RENAMED_NEW_NAME: windows.DWORD = 5; - - const notify_filter: windows.DWORD = - 0x00000001 | // FILE_NOTIFY_CHANGE_FILE_NAME - 0x00000002 | // FILE_NOTIFY_CHANGE_DIR_NAME - 0x00000008 | // FILE_NOTIFY_CHANGE_SIZE - 0x00000010 | // FILE_NOTIFY_CHANGE_LAST_WRITE - 0x00000040; // FILE_NOTIFY_CHANGE_CREATION - - fn init(handler: *Handler) windows.CreateIoCompletionPortError!@This() { - const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1); - return .{ .handler = handler, .iocp = iocp, .thread = null, .watches = .empty, .watches_mutex = .{}, .path_types = .empty }; - } - - fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - // Wake the background thread with a shutdown key, then wait for it. - _ = win32.PostQueuedCompletionStatus(self.iocp, 0, SHUTDOWN_KEY, null); - if (self.thread) |t| t.join(); - var it = self.watches.iterator(); - while (it.next()) |entry| { - _ = win32.CloseHandle(entry.value_ptr.*.handle); - allocator.free(entry.value_ptr.*.path); - allocator.free(entry.value_ptr.*.buf); - } - self.watches.deinit(allocator); - var pt_it = self.path_types.iterator(); - while (pt_it.next()) |entry| allocator.free(entry.key_ptr.*); - self.path_types.deinit(allocator); - _ = win32.CloseHandle(self.iocp); - } - - fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { - if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ allocator, self.iocp, &self.watches, &self.watches_mutex, &self.path_types, self.handler }); - } - - fn thread_fn( - allocator: std.mem.Allocator, - iocp: windows.HANDLE, - watches: *std.StringHashMapUnmanaged(Watch), - watches_mutex: *std.Thread.Mutex, - path_types: *std.StringHashMapUnmanaged(ObjectType), - handler: *Handler, - ) void { - var bytes: windows.DWORD = 0; - var key: windows.ULONG_PTR = 0; - var overlapped_ptr: ?*windows.OVERLAPPED = null; - while (true) { - // Block indefinitely until IOCP has a completion or shutdown signal. - const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE); - if (ok == 0 or key == SHUTDOWN_KEY) return; - const triggered_handle: windows.HANDLE = @ptrFromInt(key); - watches_mutex.lock(); - var it = watches.iterator(); - while (it.next()) |entry| { - const w = entry.value_ptr; - if (w.handle != triggered_handle) continue; - if (bytes > 0) { - var offset: usize = 0; - while (offset < bytes) { - const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); - const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; - var name_buf: [std.fs.max_path_bytes]u8 = undefined; - const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; - const event_type: EventType = switch (info.Action) { - FILE_ACTION_ADDED => .created, - FILE_ACTION_REMOVED => .deleted, - FILE_ACTION_MODIFIED => .modified, - FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, - else => { - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - continue; - }, - }; - var full_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch { - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - continue; - }; - // Determine object_type: try GetFileAttributesW; cache result. - const object_type: ObjectType = if (event_type == .deleted) blk: { - // Path no longer exists; use cached type if available. - const cached = path_types.fetchRemove(full_path); - break :blk if (cached) |kv| blk2: { - allocator.free(kv.key); - break :blk2 kv.value; - } else .unknown; - } else blk: { - var full_path_w: [std.fs.max_path_bytes]windows.WCHAR = undefined; - const len = std.unicode.utf8ToUtf16Le(&full_path_w, full_path) catch break :blk .unknown; - full_path_w[len] = 0; - const attrs = win32.GetFileAttributesW(full_path_w[0..len :0]); - const INVALID: windows.DWORD = 0xFFFFFFFF; - const FILE_ATTRIBUTE_DIRECTORY: windows.DWORD = 0x10; - const ot: ObjectType = if (attrs == INVALID) .unknown else if (attrs & FILE_ATTRIBUTE_DIRECTORY != 0) .dir else .file; - // Cache the determined type. - if (ot != .unknown) { - const gop = path_types.getOrPut(allocator, full_path) catch break :blk ot; - if (!gop.found_existing) { - gop.key_ptr.* = allocator.dupe(u8, full_path) catch { - _ = path_types.remove(full_path); - break :blk ot; - }; - } - gop.value_ptr.* = ot; - } - break :blk ot; - }; - // Capture next_entry_offset before releasing the mutex: after unlock, - // the main thread may call remove_watch() which frees w.buf, making - // the `info` pointer (which points into w.buf) a dangling reference. - const next_entry_offset = info.NextEntryOffset; - watches_mutex.unlock(); - handler.change(full_path, event_type, object_type) catch { - watches_mutex.lock(); - break; - }; - watches_mutex.lock(); - if (next_entry_offset == 0) break; - offset += next_entry_offset; - } - } - // Re-arm ReadDirectoryChangesW for the next batch. - w.overlapped = std.mem.zeroes(windows.OVERLAPPED); - _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); - break; - } - watches_mutex.unlock(); - } - } - - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void { - self.watches_mutex.lock(); - defer self.watches_mutex.unlock(); - if (self.watches.contains(path)) return; - const path_w = std.unicode.utf8ToUtf16LeAllocZ(allocator, path) catch return error.WatchFailed; - defer allocator.free(path_w); - const handle = win32.CreateFileW( - path_w, - windows.GENERIC_READ, - windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE | windows.FILE_SHARE_DELETE, - null, - windows.OPEN_EXISTING, - 0x02000000 | 0x40000000, // FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED - null, - ); - if (handle == windows.INVALID_HANDLE_VALUE) return error.WatchFailed; - errdefer _ = win32.CloseHandle(handle); - _ = windows.CreateIoCompletionPort(handle, self.iocp, @intFromPtr(handle), 0) catch return error.WatchFailed; - const buf = try allocator.alignedAlloc(u8, .fromByteUnits(4), buf_size); - errdefer allocator.free(buf); - const owned_path = try allocator.dupe(u8, path); - errdefer allocator.free(owned_path); - var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); - if (win32.ReadDirectoryChangesW(handle, buf.ptr, buf_size, 1, notify_filter, null, &overlapped, null) == 0) - return error.WatchFailed; - try self.watches.put(allocator, owned_path, .{ - .handle = handle, - .buf = buf, - .overlapped = overlapped, - .path = owned_path, - }); - } - - fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { - self.watches_mutex.lock(); - defer self.watches_mutex.unlock(); - if (self.watches.fetchRemove(path)) |entry| { - _ = win32.CloseHandle(entry.value.handle); - allocator.free(entry.value.path); - allocator.free(entry.value.buf); - } - } -}; diff --git a/src/types.zig b/src/types.zig new file mode 100644 index 0000000..0a07d3b --- /dev/null +++ b/src/types.zig @@ -0,0 +1,63 @@ +const std = @import("std"); +const builtin = @import("builtin"); +const build_options = @import("build_options"); + +pub const EventType = enum { + created, + modified, + deleted, + /// Only produced on macOS and Windows where the OS gives no pairing info. + /// On Linux, paired renames are emitted as a rename event with both paths instead. + renamed, +}; + +pub const ObjectType = enum { + file, + dir, + /// The object type could not be determined (e.g. a deleted file on Windows + /// where the path no longer exists to query). + unknown, +}; + +pub const Error = error{ + HandlerFailed, + OutOfMemory, + WatchFailed, +}; + +/// True when the Linux inotify backend runs in poll mode (caller drives the +/// event loop via poll_fd / handle_read_ready). False on all other platforms +/// and on Linux when the `linux_read_thread` build option is set. +pub const linux_poll_mode = builtin.os.tag == .linux and !build_options.linux_read_thread; + +pub const ReadableStatus = enum { + // TODO: is_readable, // backend may now read from fd (blocking mode) + will_notify, // backend must wait for a handle_read_ready call +}; + +pub const Handler = struct { + vtable: *const VTable, + + pub const VTable = struct { + change: *const fn (handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void, + rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void, + /// Only present in Linux poll mode (linux_poll_mode == true). + wait_readable: if (linux_poll_mode) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void, + }; + + pub fn change(handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void { + return handler.vtable.change(handler, path, event_type, object_type); + } + + pub fn rename(handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void { + return handler.vtable.rename(handler, src_path, dst_path, object_type); + } + + pub fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus { + if (comptime linux_poll_mode) { + return handler.vtable.wait_readable(handler); + } else { + unreachable; + } + } +};