diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..db3cb54 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/.zig-cache/ +/zig-out/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..d8b4369 --- /dev/null +++ b/README.md @@ -0,0 +1,122 @@ +``` + _ _ _ _ _ _ _ _ _ + | \ | (_) | | | | | \ / | | | | | + | \| |_ __ _| |__ | |_ \ \ _ / /_ _| |_ __| |___ + | . ` | |/ _` | '_ \| __| \ ` ' / _` | __|/ _| '_ \ + | |\ | | (_| | | | | |_ \ / (_| | |_| (_| | | | + |_| \_|_|\__, |_| |_|\__| \_|_/ \__,_|\__|\__|_| |_| + __/ | + |___/ + T H E N I G H T W A T C H +``` +![nightwatch](docs/nightwatch.png) + +> The city sleeps. +> The files do not. + +"FABRICATI DIEM, PVNC" + +**The Night Watch** is a file change tracker for directory trees, written +in **Zig**. + +It provides: + +- A standalone CLI for tracking filesystem changes +- A module for embedding change-tracking into other Zig programs +- Minimal dependencies and consistent, predictable, cross-platform behavior + +It does not interfere. +It does not speculate. +It simply keeps watch. + +------------------------------------------------------------------------ + +## Features + +- Recursive directory tree tracking +- Deterministic multi-platform support (Linux, FreeBSD, MacOS, Windows) +- Lightweight and fast +- Embeddable Zig module API +- Standalone CLI executable + +------------------------------------------------------------------------ + +# Installation + +The Watch is written in **Zig** and built using the Zig build system. + +## Requirements + +- Zig (latest stable recommended) + +## Build CLI + +``` bash +zig build +``` + +The executable will be located in: + +`zig-out/bin/nightwatch` + +## Install System-Wide + +``` bash +zig build install +``` + +------------------------------------------------------------------------ + +# Using as a Zig Module + +The Night Watch exposes a reusable module that can be imported into +other Zig programs. + +In your `build.zig`: + +``` zig +const nightwatch = b.dependency("nightwatch", .{ + .target = target, + .optimize = optimize, +}); + +exe.root_module.addImport("nightwatch", nightwatch.module("nightwatch")); +``` + +In your Zig source: + +``` zig +const nightwatch = @import("nightwatch"); +``` + +You now have programmatic access to the tracking engine. + +------------------------------------------------------------------------ + +# CLI Usage + +``` bash +nightwatch [{path}..] +``` + +Run: + +``` bash +nightwatch --help +``` + +for full command documentation. + +------------------------------------------------------------------------ + +# Philosophy + +Other tools watch files. + +The Night Watch keeps watch over the peace. + +It remembers what changed. +It records what vanished. +It notices what arrived at 2:14 AM. + +And it writes it down. diff --git a/build.zig b/build.zig new file mode 100644 index 0000000..2cf4311 --- /dev/null +++ b/build.zig @@ -0,0 +1,46 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{}); + + const mod = b.addModule("nightwatch", .{ + .root_source_file = b.path("src/nightwatch.zig"), + .target = target, + }); + + const exe = b.addExecutable(.{ + .name = "nightwatch", + .root_module = b.createModule(.{ + .root_source_file = b.path("src/main.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "nightwatch", .module = mod }, + }, + }), + }); + + b.installArtifact(exe); + + const run_step = b.step("run", "Run the app"); + const run_cmd = b.addRunArtifact(exe); + run_step.dependOn(&run_cmd.step); + run_cmd.step.dependOn(b.getInstallStep()); + if (b.args) |args| + run_cmd.addArgs(args); + + const mod_tests = b.addTest(.{ + .root_module = mod, + }); + const run_mod_tests = b.addRunArtifact(mod_tests); + + const exe_tests = b.addTest(.{ + .root_module = exe.root_module, + }); + const run_exe_tests = b.addRunArtifact(exe_tests); + + const test_step = b.step("test", "Run tests"); + test_step.dependOn(&run_mod_tests.step); + test_step.dependOn(&run_exe_tests.step); +} diff --git a/build.zig.zon b/build.zig.zon new file mode 100644 index 0000000..d4d3787 --- /dev/null +++ b/build.zig.zon @@ -0,0 +1,81 @@ +.{ + // This is the default name used by packages depending on this one. For + // example, when a user runs `zig fetch --save `, this field is used + // as the key in the `dependencies` table. Although the user can choose a + // different name, most users will stick with this provided value. + // + // It is redundant to include "zig" in this name because it is already + // within the Zig package namespace. + .name = .flow_changes, + // This is a [Semantic Version](https://semver.org/). + // In a future version of Zig it will be used for package deduplication. + .version = "0.0.0", + // Together with name, this represents a globally unique package + // identifier. This field is generated by the Zig toolchain when the + // package is first created, and then *never changes*. This allows + // unambiguous detection of one package being an updated version of + // another. + // + // When forking a Zig project, this id should be regenerated (delete the + // field and run `zig build`) if the upstream project is still maintained. + // Otherwise, the fork is *hostile*, attempting to take control over the + // original project's identity. Thus it is recommended to leave the comment + // on the following line intact, so that it shows up in code reviews that + // modify the field. + .fingerprint = 0x63107cb039894510, // Changing this has security and trust implications. + // Tracks the earliest Zig version that the package considers to be a + // supported use case. + .minimum_zig_version = "0.15.2", + // This field is optional. + // Each dependency must either provide a `url` and `hash`, or a `path`. + // `zig build --fetch` can be used to fetch all dependencies of a package, recursively. + // Once all dependencies are fetched, `zig build` no longer requires + // internet connectivity. + .dependencies = .{ + // See `zig fetch --save ` for a command-line interface for adding dependencies. + //.example = .{ + // // When updating this field to a new URL, be sure to delete the corresponding + // // `hash`, otherwise you are communicating that you expect to find the old hash at + // // the new URL. If the contents of a URL change this will result in a hash mismatch + // // which will prevent zig from using it. + // .url = "https://example.com/foo.tar.gz", + // + // // This is computed from the file contents of the directory of files that is + // // obtained after fetching `url` and applying the inclusion rules given by + // // `paths`. + // // + // // This field is the source of truth; packages do not come from a `url`; they + // // come from a `hash`. `url` is just one of many possible mirrors for how to + // // obtain a package matching this `hash`. + // // + // // Uses the [multihash](https://multiformats.io/multihash/) format. + // .hash = "...", + // + // // When this is provided, the package is found in a directory relative to the + // // build root. In this case the package's hash is irrelevant and therefore not + // // computed. This field and `url` are mutually exclusive. + // .path = "foo", + // + // // When this is set to `true`, a package is declared to be lazily + // // fetched. This makes the dependency only get fetched if it is + // // actually used. + // .lazy = false, + //}, + }, + // Specifies the set of files and directories that are included in this package. + // Only files and directories listed here are included in the `hash` that + // is computed for this package. Only files listed here will remain on disk + // when using the zig package manager. As a rule of thumb, one should list + // files required for compilation plus any license(s). + // Paths are relative to the build root. Use the empty string (`""`) to refer to + // the build root itself. + // A directory listed here means that all files within, recursively, are included. + .paths = .{ + "build.zig", + "build.zig.zon", + "src", + // For example... + //"LICENSE", + //"README.md", + }, +} diff --git a/docs/nightwatch.png b/docs/nightwatch.png new file mode 100644 index 0000000..fbc9566 Binary files /dev/null and b/docs/nightwatch.png differ diff --git a/src/main.zig b/src/main.zig new file mode 100644 index 0000000..c4cbfb0 --- /dev/null +++ b/src/main.zig @@ -0,0 +1,8 @@ +const std = @import("std"); +const nightwatch = @import("nightwatch"); + +pub fn main() !void { + std.debug.print("FABRICATI DIEM, PVNC\n", .{}); +} + +test "simple test" {} diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 4b67406..0ef9e52 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -13,6 +13,9 @@ pub const EventType = enum { created, modified, deleted, + /// A new directory was created inside a watched directory. The receiver + /// should call watch() on the path to get events for files created in it. + dir_created, /// Only produced on macOS and Windows where the OS gives no pairing info. /// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead. renamed, @@ -78,8 +81,6 @@ const INotifyBackend = struct { fd_watcher: tp.file_descriptor, watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path - const threaded = false; - const IN = std.os.linux.IN; const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY | @@ -103,7 +104,7 @@ const INotifyBackend = struct { std.posix.close(self.inotify_fd); } - fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { + fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { parent.deinit(); try self.fd_watcher.wait_read(); } @@ -131,7 +132,10 @@ const INotifyBackend = struct { } } - fn drain(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) (std.posix.ReadError || error{ NoSpaceLeft, OutOfMemory, Exit })!void { + fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) (std.posix.ReadError || error{ ThespianFileDescriptorWaitReadFailed, NoSpaceLeft, OutOfMemory, Exit })!void { + // re-arm the file_discriptor + try self.fd_watcher.wait_read(); + const InotifyEvent = extern struct { wd: i32, mask: u32, @@ -207,7 +211,7 @@ const INotifyBackend = struct { try parent.send(.{ "FW", "change", full_path, EventType.deleted }); } else { const event_type: EventType = if (ev.mask & IN.CREATE != 0) - .created + if (ev.mask & IN.ISDIR != 0) .dir_created else .created else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) .deleted else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0) @@ -222,28 +226,29 @@ const INotifyBackend = struct { }; const FSEventsBackend = struct { - thread: ?std.Thread, - run_loop: ?*anyopaque, // CFRunLoopRef, set by the background thread before blocking - stream: ?*anyopaque, // FSEventStreamRef, created on the background thread + stream: ?*anyopaque, // FSEventStreamRef + queue: ?*anyopaque, // dispatch_queue_t + ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped watches: std.StringArrayHashMapUnmanaged(void), // owned paths - mutex: std.Thread.Mutex, // protects run_loop - const threaded = false; // callback sends FW messages directly; no FW_event needed + const threaded = false; // callback fires on GCD thread; no FW_event needed + const kFSEventStreamCreateFlagNoDefer: u32 = 0x00000002; const kFSEventStreamCreateFlagFileEvents: u32 = 0x00000010; const kFSEventStreamEventFlagItemCreated: u32 = 0x00000100; const kFSEventStreamEventFlagItemRemoved: u32 = 0x00000200; const kFSEventStreamEventFlagItemRenamed: u32 = 0x00000800; const kFSEventStreamEventFlagItemModified: u32 = 0x00001000; + const kFSEventStreamEventFlagItemIsDir: u32 = 0x00020000; const kFSEventStreamEventIdSinceNow: u64 = 0xFFFFFFFFFFFFFFFF; + const kCFStringEncodingUTF8: u32 = 0x08000100; - // CoreFoundation / CoreServices extern declarations const cf = struct { pub extern "c" fn CFStringCreateWithBytesNoCopy( alloc: ?*anyopaque, bytes: [*]const u8, numBytes: isize, - encoding: u32, // kCFStringEncodingUTF8 = 0x08000100 + encoding: u32, isExternalRepresentation: u8, contentsDeallocator: ?*anyopaque, ) ?*anyopaque; @@ -254,10 +259,6 @@ const FSEventsBackend = struct { callBacks: ?*anyopaque, ) ?*anyopaque; pub extern "c" fn CFRelease(cf: *anyopaque) void; - pub extern "c" fn CFRunLoopGetCurrent() *anyopaque; - pub extern "c" fn CFRunLoopRun() void; - pub extern "c" fn CFRunLoopStop(rl: *anyopaque) void; - pub extern "c" fn CFRunLoopAddSource(rl: *anyopaque, source: *anyopaque, mode: *anyopaque) void; pub extern "c" fn FSEventStreamCreate( allocator: ?*anyopaque, callback: *const anyopaque, @@ -267,61 +268,100 @@ const FSEventsBackend = struct { latency: f64, flags: u32, ) ?*anyopaque; - pub extern "c" fn FSEventStreamSchedule(stream: *anyopaque, runLoop: *anyopaque, runLoopMode: *anyopaque) void; + pub extern "c" fn FSEventStreamSetDispatchQueue(stream: *anyopaque, queue: *anyopaque) void; pub extern "c" fn FSEventStreamStart(stream: *anyopaque) u8; pub extern "c" fn FSEventStreamStop(stream: *anyopaque) void; pub extern "c" fn FSEventStreamInvalidate(stream: *anyopaque) void; pub extern "c" fn FSEventStreamRelease(stream: *anyopaque) void; - // kCFRunLoopDefaultMode, a well-known constant pointer exported by CoreFoundation - pub extern "c" var kCFRunLoopDefaultMode: *anyopaque; - pub extern "c" var kCFAllocatorDefault: *anyopaque; + pub extern "c" fn dispatch_queue_create(label: [*:0]const u8, attr: ?*anyopaque) *anyopaque; + pub extern "c" fn dispatch_release(obj: *anyopaque) void; pub extern "c" var kCFAllocatorNull: *anyopaque; }; - const kCFStringEncodingUTF8: u32 = 0x08000100; - - // Context passed to the FSEvents callback via the thread's stack. const CallbackContext = struct { parent: tp.pid, }; fn init() error{}!@This() { - return .{ - .thread = null, - .run_loop = null, - .stream = null, - .watches = .empty, - .mutex = .{}, - }; + return .{ .stream = null, .queue = null, .ctx = null, .watches = .empty }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - // Stop the run loop, which causes the thread to exit. - self.mutex.lock(); - const rl = self.run_loop; - self.mutex.unlock(); - if (rl) |r| cf.CFRunLoopStop(r); - if (self.thread) |t| t.join(); - // Stream is cleaned up by the thread before it exits. + if (self.stream) |s| { + cf.FSEventStreamStop(s); + cf.FSEventStreamInvalidate(s); + cf.FSEventStreamRelease(s); + self.stream = null; + } + if (self.queue) |q| { + cf.dispatch_release(q); + self.queue = null; + } + if (self.ctx) |c| { + c.parent.deinit(); + allocator.destroy(c); + self.ctx = null; + } var it = self.watches.iterator(); while (it.next()) |entry| allocator.free(entry.key_ptr.*); self.watches.deinit(allocator); } - fn arm(self: *@This(), parent: tp.pid) std.Thread.SpawnError!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) error{OutOfMemory}!void { errdefer parent.deinit(); - if (self.thread != null) return; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, parent }); - } + if (self.stream != null) return; - const FSEventStreamCallback = *const fn ( - stream: *anyopaque, - info: ?*anyopaque, - numEvents: usize, - eventPaths: *anyopaque, - eventFlags: [*]const u32, - eventIds: [*]const u64, - ) callconv(.c) void; + var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; + defer cf_strings.deinit(allocator); + var it = self.watches.iterator(); + while (it.next()) |entry| { + const path = entry.key_ptr.*; + const s = cf.CFStringCreateWithBytesNoCopy( + null, + path.ptr, + @intCast(path.len), + kCFStringEncodingUTF8, + 0, + cf.kCFAllocatorNull, + ) orelse continue; + cf_strings.append(allocator, s) catch { + cf.CFRelease(s); + break; + }; + } + defer for (cf_strings.items) |s| cf.CFRelease(s.?); + + const paths_array = cf.CFArrayCreate( + null, + cf_strings.items.ptr, + @intCast(cf_strings.items.len), + null, + ) orelse return; + defer cf.CFRelease(paths_array); + + const ctx = try allocator.create(CallbackContext); + errdefer allocator.destroy(ctx); + ctx.* = .{ .parent = parent }; + + const stream = cf.FSEventStreamCreate( + null, + @ptrCast(&callback), + ctx, + paths_array, + kFSEventStreamEventIdSinceNow, + 0.1, + kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, + ) orelse return; + errdefer cf.FSEventStreamRelease(stream); + + const queue = cf.dispatch_queue_create("flow.file_watcher", null); + cf.FSEventStreamSetDispatchQueue(stream, queue); + _ = cf.FSEventStreamStart(stream); + + self.stream = stream; + self.queue = queue; + self.ctx = ctx; + } fn callback( _: *anyopaque, @@ -339,7 +379,7 @@ const FSEventsBackend = struct { const event_type: EventType = if (flags & kFSEventStreamEventFlagItemRemoved != 0) .deleted else if (flags & kFSEventStreamEventFlagItemCreated != 0) - .created + if (flags & kFSEventStreamEventFlagItemIsDir != 0) .dir_created else .created else if (flags & kFSEventStreamEventFlagItemRenamed != 0) .renamed else if (flags & kFSEventStreamEventFlagItemModified != 0) @@ -350,68 +390,18 @@ const FSEventsBackend = struct { } } - fn thread_fn(self: *@This(), parent: tp.pid) void { - var ctx = CallbackContext{ .parent = parent }; - defer ctx.parent.deinit(); - - // Build the CFArray of paths to watch. - var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; - defer cf_strings.deinit(std.heap.c_allocator); - var it = self.watches.iterator(); - while (it.next()) |entry| { - const path = entry.key_ptr.*; - const s = cf.CFStringCreateWithBytesNoCopy(null, path.ptr, @intCast(path.len), kCFStringEncodingUTF8, 0, cf.kCFAllocatorNull) orelse continue; - cf_strings.append(std.heap.c_allocator, s) catch { - cf.CFRelease(s); - break; - }; - } - defer for (cf_strings.items) |s| cf.CFRelease(s.?); - - const paths_array = cf.CFArrayCreate(null, cf_strings.items.ptr, @intCast(cf_strings.items.len), null) orelse return; - defer cf.CFRelease(paths_array); - - const stream = cf.FSEventStreamCreate( - null, - @ptrCast(&callback), - &ctx, - paths_array, - kFSEventStreamEventIdSinceNow, - 0.1, // 100ms latency - kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, - ) orelse return; - defer { - cf.FSEventStreamStop(stream); - cf.FSEventStreamInvalidate(stream); - cf.FSEventStreamRelease(stream); - } - - const rl = cf.CFRunLoopGetCurrent(); - cf.FSEventStreamSchedule(stream, rl, cf.kCFRunLoopDefaultMode); - _ = cf.FSEventStreamStart(stream); - - // Publish the run loop reference so deinit() can stop it. - self.mutex.lock(); - self.run_loop = rl; - self.mutex.unlock(); - - cf.CFRunLoopRun(); // blocks until CFRunLoopStop is called - } - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void { if (self.watches.contains(path)) return; const owned = try allocator.dupe(u8, path); errdefer allocator.free(owned); try self.watches.put(allocator, owned, {}); - // Note: watches added after arm() take effect on the next restart. - // In practice, all watches are added before the first open() call. + // Watches added after arm() take effect on the next restart. + // In practice all watches are added before arm() is called. } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { if (self.watches.fetchSwapRemove(path)) |entry| allocator.free(entry.key); } - - fn drain(_: *@This(), _: std.mem.Allocator, _: tp.pid_ref) tp.result {} }; const KQueueBackend = struct { @@ -419,8 +409,11 @@ const KQueueBackend = struct { shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread thread: ?std.Thread, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd - - const threaded = true; + // Per-directory snapshots of filenames, used to diff on NOTE_WRITE. + // Key: owned dir path (same as watches key), value: set of owned filenames. + // Accessed from both the main thread (add_watch) and the background thread (scan_dir). + snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: std.Thread.Mutex, const EVFILT_VNODE: i16 = -4; const EVFILT_READ: i16 = -1; @@ -453,7 +446,7 @@ const KQueueBackend = struct { .udata = 0, }; _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); - return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty }; + return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { @@ -468,32 +461,132 @@ const KQueueBackend = struct { allocator.free(entry.key_ptr.*); } self.watches.deinit(allocator); + var sit = self.snapshots.iterator(); + while (sit.next()) |entry| { + var names = entry.value_ptr.*; + var nit = names.iterator(); + while (nit.next()) |ne| allocator.free(ne.key_ptr.*); + names.deinit(allocator); + } + self.snapshots.deinit(allocator); std.posix.close(self.kq); } - fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent }); } - fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void { + fn thread_fn( + kq: std.posix.fd_t, + watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), + snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: *std.Thread.Mutex, + allocator: std.mem.Allocator, + parent: tp.pid, + ) void { defer parent.deinit(); var events: [64]std.posix.Kevent = undefined; while (true) { // Block indefinitely until kqueue has events. const n = std.posix.kevent(kq, &.{}, &events, null) catch break; - var has_vnode_events = false; for (events[0..n]) |ev| { if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit - if (ev.filter == EVFILT_VNODE) has_vnode_events = true; + if (ev.filter != EVFILT_VNODE) continue; + // Find the directory path for this fd. + var wit = watches.iterator(); + while (wit.next()) |entry| { + if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; + const dir_path = entry.key_ptr.*; + if (ev.fflags & NOTE_DELETE != 0) { + parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return; + } else if (ev.fflags & NOTE_RENAME != 0) { + parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return; + } else if (ev.fflags & NOTE_WRITE != 0) { + scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {}; + } + break; + } } - if (has_vnode_events) - parent.send(.{"FW_event"}) catch break; } } - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) (std.posix.OpenError || std.posix.KEventError || error{OutOfMemory})!void { + // Scan a directory and diff against the snapshot, emitting created/deleted events. + fn scan_dir( + dir_path: []const u8, + snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), + snapshots_mutex: *std.Thread.Mutex, + allocator: std.mem.Allocator, + parent: tp.pid, + ) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + + // Collect current filenames (no lock needed, reading filesystem only). + var current: std.StringHashMapUnmanaged(void) = .empty; + defer { + var it = current.iterator(); + while (it.next()) |e| allocator.free(e.key_ptr.*); + current.deinit(allocator); + } + var iter = dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .file) continue; + const name = try allocator.dupe(u8, entry.name); + try current.put(allocator, name, {}); + } + + // Emit dir_created for new subdirectories outside the lock (no snapshot involvement). + var dir2 = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir2.close(); + var dir_iter = dir2.iterate(); + while (try dir_iter.next()) |entry| { + if (entry.kind != .directory) continue; + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue; + // Only emit if not already watched. + if (!snapshots.contains(full_path)) + try parent.send(.{ "FW", "change", full_path, EventType.dir_created }); + } + + snapshots_mutex.lock(); + defer snapshots_mutex.unlock(); + + // Get or create the snapshot for this directory. + const gop = try snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + const snapshot = gop.value_ptr; + + // Emit created events for files in current but not in snapshot. + var cit = current.iterator(); + while (cit.next()) |entry| { + if (snapshot.contains(entry.key_ptr.*)) continue; + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue; + try parent.send(.{ "FW", "change", full_path, EventType.created }); + const owned = try allocator.dupe(u8, entry.key_ptr.*); + try snapshot.put(allocator, owned, {}); + } + + // Emit deleted events for files in snapshot but not in current. + var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; + defer to_delete.deinit(allocator); + var sit = snapshot.iterator(); + while (sit.next()) |entry| { + if (current.contains(entry.key_ptr.*)) continue; + try to_delete.append(allocator, entry.key_ptr.*); + } + for (to_delete.items) |name| { + var path_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; + try parent.send(.{ "FW", "change", full_path, EventType.deleted }); + _ = snapshot.fetchRemove(name); + allocator.free(name); + } + } + + fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void { if (self.watches.contains(path)) return; const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0); errdefer std.posix.close(path_fd); @@ -509,6 +602,25 @@ const KQueueBackend = struct { const owned_path = try allocator.dupe(u8, path); errdefer allocator.free(owned_path); try self.watches.put(allocator, owned_path, path_fd); + // Take initial snapshot so first NOTE_WRITE has a baseline to diff against. + try self.take_snapshot(allocator, owned_path); + } + + fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { + var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; + defer dir.close(); + self.snapshots_mutex.lock(); + defer self.snapshots_mutex.unlock(); + const gop = try self.snapshots.getOrPut(allocator, dir_path); + if (!gop.found_existing) gop.value_ptr.* = .empty; + var snapshot = gop.value_ptr; + var iter = dir.iterate(); + while (try iter.next()) |entry| { + if (entry.kind != .file) continue; + if (snapshot.contains(entry.name)) continue; + const owned = try allocator.dupe(u8, entry.name); + try snapshot.put(allocator, owned, {}); + } } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { @@ -516,35 +628,16 @@ const KQueueBackend = struct { std.posix.close(entry.value); allocator.free(entry.key); } - } - - fn drain(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) tp.result { - _ = allocator; - var events: [64]std.posix.Kevent = undefined; - const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 }; - const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return; - for (events[0..n]) |ev| { - if (ev.filter != EVFILT_VNODE) continue; - var it = self.watches.iterator(); - while (it.next()) |entry| { - if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; - const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) - .deleted - else if (ev.fflags & NOTE_RENAME != 0) - .renamed - else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) - .modified - else - continue; - try parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }); - break; - } + if (self.snapshots.fetchRemove(path)) |entry| { + var names = entry.value; + var it = names.iterator(); + while (it.next()) |ne| allocator.free(ne.key_ptr.*); + names.deinit(allocator); } } }; const WindowsBackend = struct { - const threaded = true; const windows = std.os.windows; const win32 = struct { @@ -581,11 +674,13 @@ const WindowsBackend = struct { dwCompletionKey: windows.ULONG_PTR, lpOverlapped: ?*windows.OVERLAPPED, ) callconv(.winapi) windows.BOOL; + pub extern "kernel32" fn GetFileAttributesW(lpFileName: [*:0]const windows.WCHAR) callconv(.winapi) windows.DWORD; }; iocp: windows.HANDLE, thread: ?std.Thread, watches: std.StringHashMapUnmanaged(Watch), + watches_mutex: std.Thread.Mutex, // A completion key of zero is used to signal the background thread to exit. const SHUTDOWN_KEY: windows.ULONG_PTR = 0; @@ -622,7 +717,7 @@ const WindowsBackend = struct { fn init() windows.CreateIoCompletionPortError!@This() { const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1); - return .{ .iocp = iocp, .thread = null, .watches = .empty }; + return .{ .iocp = iocp, .thread = null, .watches = .empty, .watches_mutex = .{} }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { @@ -639,13 +734,19 @@ const WindowsBackend = struct { _ = win32.CloseHandle(self.iocp); } - fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + _ = allocator; errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, parent }); } - fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void { + fn thread_fn( + iocp: windows.HANDLE, + watches: *std.StringHashMapUnmanaged(Watch), + watches_mutex: *std.Thread.Mutex, + parent: tp.pid, + ) void { defer parent.deinit(); var bytes: windows.DWORD = 0; var key: windows.ULONG_PTR = 0; @@ -654,7 +755,69 @@ const WindowsBackend = struct { // Block indefinitely until IOCP has a completion or shutdown signal. const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE); if (ok == 0 or key == SHUTDOWN_KEY) return; - parent.send(.{"FW_event"}) catch return; + const triggered_handle: windows.HANDLE = @ptrFromInt(key); + watches_mutex.lock(); + var it = watches.iterator(); + while (it.next()) |entry| { + const w = entry.value_ptr; + if (w.handle != triggered_handle) continue; + if (bytes > 0) { + var offset: usize = 0; + while (offset < bytes) { + const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); + const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; + var name_buf: [std.fs.max_path_bytes]u8 = undefined; + const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; + const event_type: EventType = switch (info.Action) { + FILE_ACTION_ADDED => .created, + FILE_ACTION_REMOVED => .deleted, + FILE_ACTION_MODIFIED => .modified, + FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, + else => { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }, + }; + var full_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }; + // Distinguish files from directories. + const is_dir = blk: { + var full_path_w: [std.fs.max_path_bytes]windows.WCHAR = undefined; + const len = std.unicode.utf8ToUtf16Le(&full_path_w, full_path) catch break :blk false; + full_path_w[len] = 0; + const attrs = win32.GetFileAttributesW(full_path_w[0..len :0]); + const INVALID: windows.DWORD = 0xFFFFFFFF; + const FILE_ATTRIBUTE_DIRECTORY: windows.DWORD = 0x10; + break :blk attrs != INVALID and (attrs & FILE_ATTRIBUTE_DIRECTORY) != 0; + }; + const adjusted_event_type: EventType = if (is_dir and event_type == .created) + .dir_created + else if (is_dir) { // Other directory events (modified, deleted, renamed), skip. + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + } else event_type; + watches_mutex.unlock(); + parent.send(.{ "FW", "change", full_path, adjusted_event_type }) catch { + watches_mutex.lock(); + break; + }; + watches_mutex.lock(); + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + } + } + // Re-arm ReadDirectoryChangesW for the next batch. + w.overlapped = std.mem.zeroes(windows.OVERLAPPED); + _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); + break; + } + watches_mutex.unlock(); } } @@ -664,6 +827,8 @@ const WindowsBackend = struct { FileWatcherInvalidHandle, FileWatcherReadDirectoryChangesFailed, })!void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); if (self.watches.contains(path)) return; const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path); defer allocator.free(path_w); @@ -695,59 +860,14 @@ const WindowsBackend = struct { } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); if (self.watches.fetchRemove(path)) |entry| { _ = win32.CloseHandle(entry.value.handle); allocator.free(entry.value.path); allocator.free(entry.value.buf); } } - - fn drain(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) !void { - _ = allocator; - var bytes: windows.DWORD = 0; - var key: windows.ULONG_PTR = 0; - var overlapped_ptr: ?*windows.OVERLAPPED = null; - while (true) { - // Non-blocking drain, the blocking wait is done in the background thread. - const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0); - if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break; - const triggered_handle: windows.HANDLE = @ptrFromInt(key); - var it = self.watches.iterator(); - while (it.next()) |entry| { - const w = entry.value_ptr; - if (w.handle != triggered_handle) continue; - if (bytes > 0) { - var offset: usize = 0; - while (offset < bytes) { - const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); - const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; - var name_buf: [std.fs.max_path_bytes]u8 = undefined; - const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; - const event_type: EventType = switch (info.Action) { - FILE_ACTION_ADDED => .created, - FILE_ACTION_REMOVED => .deleted, - FILE_ACTION_MODIFIED => .modified, - FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, - else => { - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - continue; - }, - }; - var full_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch continue; - try parent.send(.{ "FW", "change", full_path, event_type }); - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - } - } - // Re-arm ReadDirectoryChangesW for the next batch. - w.overlapped = std.mem.zeroes(windows.OVERLAPPED); - _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); - break; - } - } - } }; const Process = struct { @@ -784,7 +904,7 @@ const Process = struct { errdefer self.deinit(); _ = tp.set_trap(true); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); - self.backend.arm(tp.self_pid().clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); + self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); tp.receive(&self.receiver); } @@ -806,12 +926,10 @@ const Process = struct { var err_code: i64 = 0; var err_msg: []const u8 = undefined; - if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { - self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); - } else if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { + if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { + self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e); + } else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); - } else if (Backend.threaded and try cbor.match(m.buf, .{"FW_event"})) { - self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {