Compare commits

...

9 commits

Author SHA1 Message Date
ae4b56b62a
fix: dangling interceptor crash 2026-03-07 20:02:40 +01:00
9b4d7c2121
feat: implement auto subdirectory watching for platforms that don't watch trees 2026-03-07 19:54:49 +01:00
9dda7efc25
refactor: add a little flair 2026-03-07 19:24:40 +01:00
4569a33382
fix: windows build 2026-03-07 19:20:30 +01:00
e4cc1b82fe
feat: add basic command line executable 2026-03-07 19:10:48 +01:00
f3463dd0dc
fix(kqueue): emit scan_dir events outside the snapshot lock
Handler callbacks invoked while holding snapshots_mutex could deadlock if
the handler called watch() or unwatch(), which also acquires that lock.
Refactor scan_dir to collect all pending events (dir_created, created,
deleted) into temporary lists under the lock, then emit them after
releasing it. Also consolidate the two directory iteration passes into one.
2026-03-07 18:19:00 +01:00
8dc759db61
fix: add a watches mutex to kqueue 2026-02-26 14:55:38 +01:00
9679b0cedf
fix: more test fixes 2026-02-26 14:47:38 +01:00
20c167b37d
build: install test executables
This makes cross compiled testing a little easier for now.
2026-02-26 14:47:07 +01:00
4 changed files with 400 additions and 88 deletions

View file

@ -56,11 +56,13 @@ pub fn build(b: *std.Build) void {
run_cmd.addArgs(args); run_cmd.addArgs(args);
const mod_tests = b.addTest(.{ const mod_tests = b.addTest(.{
.name = "mod_tests",
.root_module = mod, .root_module = mod,
}); });
const run_mod_tests = b.addRunArtifact(mod_tests); const run_mod_tests = b.addRunArtifact(mod_tests);
const exe_tests = b.addTest(.{ const exe_tests = b.addTest(.{
.name = "exe_tests",
.root_module = b.createModule(.{ .root_module = b.createModule(.{
.root_source_file = b.path("src/main.zig"), .root_source_file = b.path("src/main.zig"),
.target = target, .target = target,
@ -76,6 +78,7 @@ pub fn build(b: *std.Build) void {
// Integration test suite: exercises the public API by performing real // Integration test suite: exercises the public API by performing real
// filesystem operations and verifying Handler callbacks via TestHandler. // filesystem operations and verifying Handler callbacks via TestHandler.
const integration_tests = b.addTest(.{ const integration_tests = b.addTest(.{
.name = "integration_tests",
.root_module = b.createModule(.{ .root_module = b.createModule(.{
.root_source_file = b.path("src/nightwatch_test.zig"), .root_source_file = b.path("src/nightwatch_test.zig"),
.target = target, .target = target,
@ -91,4 +94,8 @@ pub fn build(b: *std.Build) void {
test_step.dependOn(&run_mod_tests.step); test_step.dependOn(&run_mod_tests.step);
test_step.dependOn(&run_exe_tests.step); test_step.dependOn(&run_exe_tests.step);
test_step.dependOn(&run_integration_tests.step); test_step.dependOn(&run_integration_tests.step);
b.installArtifact(mod_tests);
b.installArtifact(exe_tests);
b.installArtifact(integration_tests);
} }

View file

@ -1,8 +1,178 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin");
const nightwatch = @import("nightwatch"); const nightwatch = @import("nightwatch");
const is_posix = switch (builtin.os.tag) {
.linux, .macos, .freebsd => true,
else => false,
};
// Self-pipe: signal handler writes a byte so poll() / read() unblocks cleanly.
var sig_pipe: if (is_posix) [2]std.posix.fd_t else void = undefined;
fn posix_sighandler(_: c_int) callconv(.c) void {
_ = std.posix.write(sig_pipe[1], &[_]u8{0}) catch {};
}
const CliHandler = struct {
handler: nightwatch.Handler,
out: std.fs.File,
const vtable = nightwatch.Handler.VTable{
.change = change_cb,
.rename = rename_cb,
.wait_readable = if (builtin.os.tag == .linux) wait_readable_cb else {},
};
fn change_cb(h: *nightwatch.Handler, path: []const u8, event_type: nightwatch.EventType) error{HandlerFailed}!void {
const self: *CliHandler = @fieldParentPtr("handler", h);
var buf: [4096]u8 = undefined;
var stdout = self.out.writer(&buf);
defer stdout.interface.flush() catch {};
const label = switch (event_type) {
.created => "create ",
.modified => "modify ",
.deleted => "delete ",
.dir_created => "mkdir ",
.renamed => "rename ",
};
stdout.interface.print("{s} {s}\n", .{ label, path }) catch return error.HandlerFailed;
}
fn rename_cb(h: *nightwatch.Handler, src: []const u8, dst: []const u8) error{HandlerFailed}!void {
const self: *CliHandler = @fieldParentPtr("handler", h);
var buf: [4096]u8 = undefined;
var stdout = self.out.writer(&buf);
defer stdout.interface.flush() catch {};
stdout.interface.print("rename {s} -> {s}\n", .{ src, dst }) catch return error.HandlerFailed;
}
fn wait_readable_cb(_: *nightwatch.Handler) error{HandlerFailed}!nightwatch.ReadableStatus {
return .will_notify;
}
};
fn run_linux(watcher: *nightwatch) !void {
var fds = [_]std.posix.pollfd{
.{ .fd = watcher.poll_fd(), .events = std.posix.POLL.IN, .revents = 0 },
.{ .fd = sig_pipe[0], .events = std.posix.POLL.IN, .revents = 0 },
};
while (true) {
_ = try std.posix.poll(&fds, -1);
if (fds[1].revents & std.posix.POLL.IN != 0) return; // signal
if (fds[0].revents & std.posix.POLL.IN != 0) {
watcher.handle_read_ready() catch return;
}
}
}
fn run_posix() void {
// Backend (kqueue) drives its own thread; we just block until signal.
var buf: [1]u8 = undefined;
_ = std.posix.read(sig_pipe[0], &buf) catch {};
}
var win_shutdown = std.atomic.Value(bool).init(false);
fn win_ctrl_handler(ctrl_type: std.os.windows.DWORD) callconv(.winapi) std.os.windows.BOOL {
_ = ctrl_type;
win_shutdown.store(true, .release);
return std.os.windows.TRUE;
}
fn run_windows() void {
const SetConsoleCtrlHandler = struct {
extern "kernel32" fn SetConsoleCtrlHandler(
HandlerRoutine: ?*const fn (std.os.windows.DWORD) callconv(.winapi) std.os.windows.BOOL,
Add: std.os.windows.BOOL,
) callconv(.winapi) std.os.windows.BOOL;
}.SetConsoleCtrlHandler;
_ = SetConsoleCtrlHandler(win_ctrl_handler, std.os.windows.TRUE);
while (!win_shutdown.load(.acquire)) {
std.Thread.sleep(50 * std.time.ns_per_ms);
}
}
fn usage(out: std.fs.File) !void {
var buf: [4096]u8 = undefined;
var writer = out.writer(&buf);
try writer.interface.print(
\\Usage: nightwatch <path> [<path> ...]
\\
\\The Watch never sleeps.
\\
\\Events printed to stdout:
\\ create a file was created
\\ modify a file was modified
\\ delete a file or directory was deleted
\\ mkdir a directory was created
\\ rename a file or directory was renamed
\\
\\Stand down with Ctrl-C.
\\
, .{});
try writer.interface.flush();
}
pub fn main() !void { pub fn main() !void {
std.debug.print("FABRICATI DIEM, PVNC\n", .{}); var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
const args = try std.process.argsAlloc(allocator);
defer std.process.argsFree(allocator, args);
if (args.len < 2) {
try usage(std.fs.File.stderr());
std.process.exit(1);
}
if (std.mem.eql(u8, args[1], "-h") or std.mem.eql(u8, args[1], "--help")) {
try usage(std.fs.File.stdout());
return;
}
var buf: [4096]u8 = undefined;
var stderr = std.fs.File.stderr().writer(&buf);
defer stderr.interface.flush() catch {};
if (is_posix) {
sig_pipe = try std.posix.pipe();
const sa = std.posix.Sigaction{
.handler = .{ .handler = posix_sighandler },
.mask = std.posix.sigemptyset(),
.flags = 0,
};
std.posix.sigaction(std.posix.SIG.INT, &sa, null);
std.posix.sigaction(std.posix.SIG.TERM, &sa, null);
}
defer if (is_posix) {
std.posix.close(sig_pipe[0]);
std.posix.close(sig_pipe[1]);
};
var cli_handler = CliHandler{
.handler = .{ .vtable = &CliHandler.vtable },
.out = std.fs.File.stdout(),
};
var watcher = try nightwatch.init(allocator, &cli_handler.handler);
defer watcher.deinit();
for (args[1..]) |path| {
watcher.watch(path) catch |err| {
try stderr.interface.print("nightwatch: {s}: {s}\n", .{ path, @errorName(err) });
continue;
};
try stderr.interface.print("on watch: {s}\n", .{path});
}
if (builtin.os.tag == .linux) {
try run_linux(&watcher);
} else if (builtin.os.tag == .windows) {
run_windows();
} else if (is_posix) {
run_posix();
}
} }
test "simple test" {} test "simple test" {}

View file

@ -6,9 +6,8 @@ pub const EventType = enum {
created, created,
modified, modified,
deleted, deleted,
/// A new directory was created inside a watched directory. The /// A new directory was created inside a watched directory.
/// receiver should call watch() on the path to get events for files /// The library automatically begins watching it; no action is required.
/// created in it.
dir_created, dir_created,
/// Only produced on macOS and Windows where the OS gives no pairing info. /// Only produced on macOS and Windows where the OS gives no pairing info.
/// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead. /// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead.
@ -49,34 +48,103 @@ pub const ReadableStatus = enum {
}; };
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
backend: Backend, interceptor: *Interceptor,
pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() { pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() {
var self: @This() = .{ const ic = try allocator.create(Interceptor);
errdefer allocator.destroy(ic);
ic.* = .{
.handler = .{ .vtable = &Interceptor.vtable },
.user_handler = handler,
.allocator = allocator, .allocator = allocator,
.backend = try Backend.init(handler), .backend = undefined,
}; };
try self.backend.arm(self.allocator); ic.backend = try Backend.init(&ic.handler);
return self; errdefer ic.backend.deinit(allocator);
try ic.backend.arm(allocator);
return .{ .allocator = allocator, .interceptor = ic };
} }
pub fn deinit(self: *@This()) void { pub fn deinit(self: *@This()) void {
self.backend.deinit(self.allocator); self.interceptor.backend.deinit(self.allocator);
self.allocator.destroy(self.interceptor);
} }
/// Watch a path (file or directory) for changes. The handler will receive /// Watch a path (file or directory) for changes. The handler will receive
/// `change` and (linux only) `rename` calls /// `change` and (linux only) `rename` calls. When path is a directory,
/// all subdirectories are watched recursively and new directories created
/// inside are watched automatically.
pub fn watch(self: *@This(), path: []const u8) Error!void { pub fn watch(self: *@This(), path: []const u8) Error!void {
return self.backend.add_watch(self.allocator, path); try self.interceptor.backend.add_watch(self.allocator, path);
if (!Backend.watches_recursively) {
recurse_watch(&self.interceptor.backend, self.allocator, path);
}
} }
/// Stop watching a previously watched path /// Stop watching a previously watched path
pub fn unwatch(self: *@This(), path: []const u8) void { pub fn unwatch(self: *@This(), path: []const u8) void {
self.backend.remove_watch(self.allocator, path); self.interceptor.backend.remove_watch(self.allocator, path);
} }
pub fn handle_read_ready(self: *@This()) !void { pub fn handle_read_ready(self: *@This()) !void {
try self.backend.handle_read_ready(self.allocator); try self.interceptor.backend.handle_read_ready(self.allocator);
}
/// Returns the inotify file descriptor that should be polled for POLLIN
/// before calling handle_read_ready(). Only available on Linux.
pub fn poll_fd(self: *const @This()) std.posix.fd_t {
comptime if (builtin.os.tag != .linux) @compileError("poll_fd is only available on Linux");
return self.interceptor.backend.inotify_fd;
}
// Wraps the user's handler to intercept dir_created events and auto-watch
// new directories before forwarding to the user.
// Heap-allocated so that &ic.handler stays valid regardless of how the
// nightwatch struct is moved after init() returns.
const Interceptor = struct {
handler: Handler,
user_handler: *Handler,
allocator: std.mem.Allocator,
backend: Backend,
const vtable = Handler.VTable{
.change = change_cb,
.rename = rename_cb,
.wait_readable = if (builtin.os.tag == .linux) wait_readable_cb else {},
};
fn change_cb(h: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
if (event_type == .dir_created and !Backend.watches_recursively) {
self.backend.add_watch(self.allocator, path) catch {};
recurse_watch(&self.backend, self.allocator, path);
}
return self.user_handler.change(path, event_type);
}
fn rename_cb(h: *Handler, src: []const u8, dst: []const u8) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.rename(src, dst);
}
fn wait_readable_cb(h: *Handler) error{HandlerFailed}!ReadableStatus {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.wait_readable();
}
};
// Scans subdirectories of dir_path and adds a watch for each one, recursively.
fn recurse_watch(backend: *Backend, allocator: std.mem.Allocator, dir_path: []const u8) void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
var it = dir.iterate();
while (it.next() catch return) |entry| {
if (entry.kind != .directory) continue;
var buf: [std.fs.max_path_bytes]u8 = undefined;
const sub = std.fmt.bufPrint(&buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue;
backend.add_watch(allocator, sub) catch {};
recurse_watch(backend, allocator, sub);
}
} }
const Backend = switch (builtin.os.tag) { const Backend = switch (builtin.os.tag) {
@ -88,6 +156,8 @@ const Backend = switch (builtin.os.tag) {
}; };
const INotifyBackend = struct { const INotifyBackend = struct {
const watches_recursively = false;
handler: *Handler, handler: *Handler,
inotify_fd: std.posix.fd_t, inotify_fd: std.posix.fd_t,
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
@ -247,6 +317,8 @@ const INotifyBackend = struct {
}; };
const FSEventsBackend = struct { const FSEventsBackend = struct {
const watches_recursively = true; // FSEventStreamCreate watches the entire subtree
handler: *Handler, handler: *Handler,
stream: ?*anyopaque, // FSEventStreamRef stream: ?*anyopaque, // FSEventStreamRef
queue: ?*anyopaque, // dispatch_queue_t queue: ?*anyopaque, // dispatch_queue_t
@ -435,11 +507,14 @@ const FSEventsBackend = struct {
}; };
const KQueueBackend = struct { const KQueueBackend = struct {
const watches_recursively = false;
handler: *Handler, handler: *Handler,
kq: std.posix.fd_t, kq: std.posix.fd_t,
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
watches_mutex: std.Thread.Mutex,
// Per-directory snapshots of filenames, used to diff on NOTE_WRITE. // Per-directory snapshots of filenames, used to diff on NOTE_WRITE.
// Key: owned dir path (same as watches key), value: set of owned filenames. // Key: owned dir path (same as watches key), value: set of owned filenames.
// Accessed from both the main thread (add_watch) and the background thread (scan_dir). // Accessed from both the main thread (add_watch) and the background thread (scan_dir).
@ -477,7 +552,16 @@ const KQueueBackend = struct {
.udata = 0, .udata = 0,
}; };
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null);
return .{ .handler = handler, .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} }; return .{
.handler = handler,
.kq = kq,
.shutdown_pipe = pipe,
.thread = null,
.watches = .empty,
.watches_mutex = .{},
.snapshots = .empty,
.snapshots_mutex = .{},
};
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
@ -494,7 +578,7 @@ const KQueueBackend = struct {
self.watches.deinit(allocator); self.watches.deinit(allocator);
var sit = self.snapshots.iterator(); var sit = self.snapshots.iterator();
while (sit.next()) |entry| { while (sit.next()) |entry| {
allocator.free(entry.key_ptr.*); // Keys are borrowed from self.watches and freed in the watches loop above.
var names = entry.value_ptr.*; var names = entry.value_ptr.*;
var nit = names.iterator(); var nit = names.iterator();
while (nit.next()) |ne| allocator.free(ne.key_ptr.*); while (nit.next()) |ne| allocator.free(ne.key_ptr.*);
@ -506,12 +590,21 @@ const KQueueBackend = struct {
fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, self.handler }); self.thread = try std.Thread.spawn(.{}, thread_fn, .{
self.kq,
&self.watches,
&self.watches_mutex,
&self.snapshots,
&self.snapshots_mutex,
allocator,
self.handler,
});
} }
fn thread_fn( fn thread_fn(
kq: std.posix.fd_t, kq: std.posix.fd_t,
watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), watches: *const std.StringHashMapUnmanaged(std.posix.fd_t),
watches_mutex: *std.Thread.Mutex,
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex, snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
@ -525,18 +618,20 @@ const KQueueBackend = struct {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
// Find the directory path for this fd. // Find the directory path for this fd.
watches_mutex.lock();
var wit = watches.iterator(); var wit = watches.iterator();
while (wit.next()) |entry| { const dir_path: ?[]const u8 = while (wit.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; if (entry.value_ptr.* == @as(std.posix.fd_t, @intCast(ev.ident)))
const dir_path = entry.key_ptr.*; break entry.key_ptr.*;
if (ev.fflags & NOTE_DELETE != 0) { } else null;
handler.change(dir_path, EventType.deleted) catch return; watches_mutex.unlock();
} else if (ev.fflags & NOTE_RENAME != 0) { if (dir_path == null) continue;
handler.change(dir_path, EventType.renamed) catch return; if (ev.fflags & NOTE_DELETE != 0) {
} else if (ev.fflags & NOTE_WRITE != 0) { handler.change(dir_path.?, EventType.deleted) catch return;
scan_dir(dir_path, snapshots, snapshots_mutex, allocator, handler) catch {}; } else if (ev.fflags & NOTE_RENAME != 0) {
} handler.change(dir_path.?, EventType.renamed) catch return;
break; } else if (ev.fflags & NOTE_WRITE != 0) {
scan_dir(dir_path.?, snapshots, snapshots_mutex, allocator, handler) catch {};
} }
} }
} }
@ -553,71 +648,102 @@ const KQueueBackend = struct {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close(); defer dir.close();
// Collect current filenames (no lock needed, reading filesystem only). // Collect current files and subdirectories (no lock, reading filesystem only).
var current: std.StringHashMapUnmanaged(void) = .empty; var current_files: std.StringHashMapUnmanaged(void) = .empty;
defer { defer {
var it = current.iterator(); var it = current_files.iterator();
while (it.next()) |e| allocator.free(e.key_ptr.*); while (it.next()) |e| allocator.free(e.key_ptr.*);
current.deinit(allocator); current_files.deinit(allocator);
}
var current_dirs: std.ArrayListUnmanaged([]u8) = .empty;
defer {
for (current_dirs.items) |d| allocator.free(d);
current_dirs.deinit(allocator);
} }
var iter = dir.iterate(); var iter = dir.iterate();
while (try iter.next()) |entry| { while (try iter.next()) |entry| {
if (entry.kind != .file) continue; switch (entry.kind) {
const name = try allocator.dupe(u8, entry.name); .file => {
try current.put(allocator, name, {}); const name = try allocator.dupe(u8, entry.name);
try current_files.put(allocator, name, {});
},
.directory => {
const name = try allocator.dupe(u8, entry.name);
try current_dirs.append(allocator, name);
},
else => {},
}
} }
// Emit dir_created for new subdirectories outside the lock (no snapshot involvement). // Diff against snapshot under the lock; collect events to emit after releasing it.
var dir2 = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; var to_create: std.ArrayListUnmanaged([]const u8) = .empty;
defer dir2.close(); defer to_create.deinit(allocator);
var dir_iter = dir2.iterate();
while (try dir_iter.next()) |entry| {
if (entry.kind != .directory) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue;
// Only emit if not already watched.
if (!snapshots.contains(full_path))
try handler.change(full_path, EventType.dir_created);
}
snapshots_mutex.lock();
defer snapshots_mutex.unlock();
// Get or create the snapshot for this directory.
const gop = try snapshots.getOrPut(allocator, dir_path);
if (!gop.found_existing) gop.value_ptr.* = .empty;
const snapshot = gop.value_ptr;
// Emit created events for files in current but not in snapshot.
var cit = current.iterator();
while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue;
try handler.change(full_path, EventType.created);
const owned = try allocator.dupe(u8, entry.key_ptr.*);
try snapshot.put(allocator, owned, {});
}
// Emit deleted events for files in snapshot but not in current.
var to_delete: std.ArrayListUnmanaged([]const u8) = .empty; var to_delete: std.ArrayListUnmanaged([]const u8) = .empty;
defer to_delete.deinit(allocator); defer to_delete.deinit(allocator);
var sit = snapshot.iterator(); var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty;
while (sit.next()) |entry| { defer new_dirs.deinit(allocator);
if (current.contains(entry.key_ptr.*)) continue;
try to_delete.append(allocator, entry.key_ptr.*); snapshots_mutex.lock();
{
for (current_dirs.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
if (!snapshots.contains(full_path))
try new_dirs.append(allocator, full_path);
}
const gop = snapshots.getOrPut(allocator, dir_path) catch |e| {
snapshots_mutex.unlock();
return e;
};
if (!gop.found_existing) gop.value_ptr.* = .empty;
const snapshot = gop.value_ptr;
var cit = current_files.iterator();
while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue;
const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| {
snapshots_mutex.unlock();
return e;
};
snapshot.put(allocator, owned, {}) catch |e| {
allocator.free(owned);
snapshots_mutex.unlock();
return e;
};
try to_create.append(allocator, owned);
}
var sit = snapshot.iterator();
while (sit.next()) |entry| {
if (current_files.contains(entry.key_ptr.*)) continue;
try to_delete.append(allocator, entry.key_ptr.*);
}
for (to_delete.items) |name| _ = snapshot.fetchRemove(name);
}
snapshots_mutex.unlock();
// Emit all events outside the lock so handlers may safely call watch()/unwatch().
for (new_dirs.items) |full_path|
try handler.change(full_path, EventType.dir_created);
for (to_create.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try handler.change(full_path, EventType.created);
} }
for (to_delete.items) |name| { for (to_delete.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined; var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try handler.change(full_path, EventType.deleted); try handler.change(full_path, EventType.deleted);
_ = snapshot.fetchRemove(name);
allocator.free(name); allocator.free(name);
} }
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ WatchFailed, OutOfMemory }!void {
if (self.watches.contains(path)) return; self.watches_mutex.lock();
const already = self.watches.contains(path);
self.watches_mutex.unlock();
if (already) return;
const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) {
error.AccessDenied, error.AccessDenied,
error.PermissionDenied, error.PermissionDenied,
@ -669,8 +795,13 @@ const KQueueBackend = struct {
}, },
}; };
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path); self.watches_mutex.lock();
try self.watches.put(allocator, owned_path, path_fd); self.watches.put(allocator, owned_path, path_fd) catch |e| {
self.watches_mutex.unlock();
allocator.free(owned_path);
return e;
};
self.watches_mutex.unlock();
// Take initial snapshot so first NOTE_WRITE has a baseline to diff against. // Take initial snapshot so first NOTE_WRITE has a baseline to diff against.
self.take_snapshot(allocator, owned_path) catch |e| switch (e) { self.take_snapshot(allocator, owned_path) catch |e| switch (e) {
error.AccessDenied, error.AccessDenied,
@ -704,7 +835,10 @@ const KQueueBackend = struct {
} }
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
if (self.watches.fetchRemove(path)) |entry| { self.watches_mutex.lock();
const watches_entry = self.watches.fetchRemove(path);
self.watches_mutex.unlock();
if (watches_entry) |entry| {
std.posix.close(entry.value); std.posix.close(entry.value);
allocator.free(entry.key); allocator.free(entry.key);
} }
@ -718,6 +852,8 @@ const KQueueBackend = struct {
}; };
const WindowsBackend = struct { const WindowsBackend = struct {
const watches_recursively = true; // ReadDirectoryChangesW with bWatchSubtree=1
const windows = std.os.windows; const windows = std.os.windows;
const win32 = struct { const win32 = struct {
@ -904,16 +1040,11 @@ const WindowsBackend = struct {
} }
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) (windows.CreateIoCompletionPortError || error{ fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void {
InvalidUtf8,
OutOfMemory,
FileWatcherInvalidHandle,
FileWatcherReadDirectoryChangesFailed,
})!void {
self.watches_mutex.lock(); self.watches_mutex.lock();
defer self.watches_mutex.unlock(); defer self.watches_mutex.unlock();
if (self.watches.contains(path)) return; if (self.watches.contains(path)) return;
const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path); const path_w = std.unicode.utf8ToUtf16LeAllocZ(allocator, path) catch return error.WatchFailed;
defer allocator.free(path_w); defer allocator.free(path_w);
const handle = win32.CreateFileW( const handle = win32.CreateFileW(
path_w, path_w,
@ -924,16 +1055,16 @@ const WindowsBackend = struct {
0x02000000 | 0x40000000, // FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED 0x02000000 | 0x40000000, // FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED
null, null,
); );
if (handle == windows.INVALID_HANDLE_VALUE) return error.FileWatcherInvalidHandle; if (handle == windows.INVALID_HANDLE_VALUE) return error.WatchFailed;
errdefer _ = win32.CloseHandle(handle); errdefer _ = win32.CloseHandle(handle);
_ = try windows.CreateIoCompletionPort(handle, self.iocp, @intFromPtr(handle), 0); _ = windows.CreateIoCompletionPort(handle, self.iocp, @intFromPtr(handle), 0) catch return error.WatchFailed;
const buf = try allocator.alignedAlloc(u8, .fromByteUnits(4), buf_size); const buf = try allocator.alignedAlloc(u8, .fromByteUnits(4), buf_size);
errdefer allocator.free(buf); errdefer allocator.free(buf);
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path); errdefer allocator.free(owned_path);
var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
if (win32.ReadDirectoryChangesW(handle, buf.ptr, buf_size, 1, notify_filter, null, &overlapped, null) == 0) if (win32.ReadDirectoryChangesW(handle, buf.ptr, buf_size, 1, notify_filter, null, &overlapped, null) == 0)
return error.FileWatcherReadDirectoryChangesFailed; return error.WatchFailed;
try self.watches.put(allocator, owned_path, .{ try self.watches.put(allocator, owned_path, .{
.handle = handle, .handle = handle,
.buf = buf, .buf = buf,

View file

@ -148,7 +148,11 @@ fn makeTempDir(allocator: std.mem.Allocator) ![]u8 {
const name = try std.fmt.allocPrint( const name = try std.fmt.allocPrint(
allocator, allocator,
"/tmp/nightwatch_test_{d}_{d}", "/tmp/nightwatch_test_{d}_{d}",
.{ std.os.linux.getpid(), n }, .{ switch (builtin.os.tag) {
.linux => std.os.linux.getpid(),
.windows => std.os.windows.GetCurrentProcessId(),
else => std.c.getpid(),
}, n },
); );
errdefer allocator.free(name); errdefer allocator.free(name);
try std.fs.makeDirAbsolute(name); try std.fs.makeDirAbsolute(name);