build: allow selecting a backend at comptime instead of via a build flag

This commit is contained in:
CJ van den Berg 2026-03-09 10:46:29 +01:00
parent 6930adae7f
commit aa4e2920dd
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9
6 changed files with 457 additions and 372 deletions

View file

@ -8,26 +8,12 @@ pub fn build(b: *std.Build) void {
break :blk b.option( break :blk b.option(
bool, bool,
"macos_fsevents", "macos_fsevents",
"Use the FSEvents backend on macOS instead of kqueue (requires Xcode frameworks)", "Add the FSEvents backend on macOS (requires Xcode frameworks)",
) orelse false; ) orelse false;
} else false; } else false;
const linux_read_thread = b.option(
bool,
"linux_read_thread",
"Use a background thread on Linux (like macOS/Windows) instead of requiring the caller to drive the event loop via poll_fd/handle_read_ready",
) orelse true;
const kqueue_dir_only = b.option(
bool,
"kqueue_dir_only",
"Use directory-only kqueue watches (lower fd usage, no real-time file modification detection). Default: false",
) orelse false;
const options = b.addOptions(); const options = b.addOptions();
options.addOption(bool, "macos_fsevents", macos_fsevents); options.addOption(bool, "macos_fsevents", macos_fsevents);
options.addOption(bool, "linux_read_thread", linux_read_thread);
options.addOption(bool, "kqueue_dir_only", kqueue_dir_only);
const options_mod = options.createModule(); const options_mod = options.createModule();
const mod = b.addModule("nightwatch", .{ const mod = b.addModule("nightwatch", .{

View file

@ -1,12 +1,8 @@
const std = @import("std"); const std = @import("std");
const build_options = @import("build_options");
const types = @import("../types.zig"); const types = @import("../types.zig");
const Handler = types.Handler;
const EventType = types.EventType; const EventType = types.EventType;
const ObjectType = types.ObjectType; const ObjectType = types.ObjectType;
const InterfaceType = types.InterfaceType;
pub const watches_recursively = false;
pub const detects_file_modifications = true;
const PendingRename = struct { const PendingRename = struct {
cookie: u32, cookie: u32,
@ -14,218 +10,241 @@ const PendingRename = struct {
object_type: ObjectType, object_type: ObjectType,
}; };
handler: *Handler, pub fn Create(comptime variant: InterfaceType) type {
inotify_fd: std.posix.fd_t, return struct {
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path handler: *Handler,
pending_renames: std.ArrayListUnmanaged(PendingRename), inotify_fd: std.posix.fd_t,
// Used only in linux_read_thread mode: watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
stop_pipe: if (build_options.linux_read_thread) [2]std.posix.fd_t else void, pending_renames: std.ArrayListUnmanaged(PendingRename),
thread: if (build_options.linux_read_thread) ?std.Thread else void, stop_pipe: switch (variant) {
.threaded => [2]std.posix.fd_t,
const IN = std.os.linux.IN; .polling => void,
},
const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY | thread: switch (variant) {
IN.MOVED_FROM | IN.MOVED_TO | IN.DELETE_SELF | .threaded => ?std.Thread,
IN.MOVE_SELF | IN.CLOSE_WRITE; .polling => void,
const in_flags: std.os.linux.O = .{ .NONBLOCK = true };
pub fn init(handler: *Handler) !@This() {
const inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags));
errdefer std.posix.close(inotify_fd);
if (comptime build_options.linux_read_thread) {
const stop_pipe = try std.posix.pipe();
return .{
.handler = handler,
.inotify_fd = inotify_fd,
.watches = .empty,
.pending_renames = .empty,
.stop_pipe = stop_pipe,
.thread = null,
};
} else {
return .{
.handler = handler,
.inotify_fd = inotify_fd,
.watches = .empty,
.pending_renames = .empty,
.stop_pipe = {},
.thread = {},
};
}
}
pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
if (comptime build_options.linux_read_thread) {
// Signal thread to stop and wait for it to exit.
_ = std.posix.write(self.stop_pipe[1], "x") catch {};
if (self.thread) |t| t.join();
std.posix.close(self.stop_pipe[0]);
std.posix.close(self.stop_pipe[1]);
}
var it = self.watches.iterator();
while (it.next()) |entry| allocator.free(entry.value_ptr.*);
self.watches.deinit(allocator);
for (self.pending_renames.items) |r| allocator.free(r.path);
self.pending_renames.deinit(allocator);
std.posix.close(self.inotify_fd);
}
pub fn arm(self: *@This(), allocator: std.mem.Allocator) error{HandlerFailed}!void {
if (comptime build_options.linux_read_thread) {
if (self.thread != null) return; // already running
self.thread = std.Thread.spawn(.{}, thread_fn, .{ self, allocator }) catch return error.HandlerFailed;
} else {
return switch (self.handler.wait_readable() catch |e| switch (e) {
error.HandlerFailed => |e_| return e_,
}) {
.will_notify => {},
};
}
}
fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void {
var pfds = [_]std.posix.pollfd{
.{ .fd = self.inotify_fd, .events = std.posix.POLL.IN, .revents = 0 },
.{ .fd = self.stop_pipe[0], .events = std.posix.POLL.IN, .revents = 0 },
};
while (true) {
_ = std.posix.poll(&pfds, -1) catch return;
if (pfds[1].revents & std.posix.POLL.IN != 0) return; // stop signal
if (pfds[0].revents & std.posix.POLL.IN != 0) {
self.handle_read_ready(allocator) catch return;
}
}
}
pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void {
const path_z = try allocator.dupeZ(u8, path);
defer allocator.free(path_z);
const wd = std.os.linux.inotify_add_watch(self.inotify_fd, path_z, watch_mask);
switch (std.posix.errno(wd)) {
.SUCCESS => {},
else => |e| {
std.log.err("nightwatch.add_watch failed: {t}", .{e});
return error.WatchFailed;
}, },
}
const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path);
const result = try self.watches.getOrPut(allocator, @intCast(wd));
if (result.found_existing) allocator.free(result.value_ptr.*);
result.value_ptr.* = owned_path;
}
pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { pub const watches_recursively = false;
var it = self.watches.iterator(); pub const detects_file_modifications = true;
while (it.next()) |entry| {
if (!std.mem.eql(u8, entry.value_ptr.*, path)) continue;
_ = std.os.linux.inotify_rm_watch(self.inotify_fd, entry.key_ptr.*);
allocator.free(entry.value_ptr.*);
self.watches.removeByPtr(entry.key_ptr);
return;
}
}
fn has_watch_for_path(self: *const @This(), path: []const u8) bool { const Handler = switch (variant) {
var it = self.watches.iterator(); .threaded => types.Handler,
while (it.next()) |entry| { .polling => types.PollingHandler,
if (std.mem.eql(u8, entry.value_ptr.*, path)) return true;
}
return false;
}
pub fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ NoSpaceLeft, OutOfMemory, HandlerFailed })!void {
const InotifyEvent = extern struct {
wd: i32,
mask: u32,
cookie: u32,
len: u32,
};
var buf: [65536]u8 align(@alignOf(InotifyEvent)) = undefined;
defer {
// Any unpaired MOVED_FROM means the file was moved out of the watched tree.
for (self.pending_renames.items) |r| {
self.handler.change(r.path, EventType.deleted, r.object_type) catch {};
allocator.free(r.path);
}
self.pending_renames.clearRetainingCapacity();
}
while (true) {
const n = std.posix.read(self.inotify_fd, &buf) catch |e| switch (e) {
error.WouldBlock => {
// re-arm the file_discriptor
try self.arm(allocator);
break;
},
else => return e,
}; };
var offset: usize = 0;
while (offset + @sizeOf(InotifyEvent) <= n) {
const ev: *const InotifyEvent = @ptrCast(@alignCast(buf[offset..].ptr));
const name_offset = offset + @sizeOf(InotifyEvent);
offset = name_offset + ev.len;
const watched_path = self.watches.get(ev.wd) orelse continue;
const name: []const u8 = if (ev.len > 0)
std.mem.sliceTo(buf[name_offset..][0..ev.len], 0)
else
"";
var full_buf: [std.fs.max_path_bytes]u8 = undefined; const IN = std.os.linux.IN;
const full_path: []const u8 = if (name.len > 0)
try std.fmt.bufPrint(&full_buf, "{s}/{s}", .{ watched_path, name })
else
watched_path;
if (ev.mask & IN.MOVED_FROM != 0) { const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY |
// Park it, we may receive a paired MOVED_TO with the same cookie. IN.MOVED_FROM | IN.MOVED_TO | IN.DELETE_SELF |
try self.pending_renames.append(allocator, .{ IN.MOVE_SELF | IN.CLOSE_WRITE;
.cookie = ev.cookie,
.path = try allocator.dupe(u8, full_path), const in_flags: std.os.linux.O = .{ .NONBLOCK = true };
.object_type = if (ev.mask & IN.ISDIR != 0) .dir else .file,
}); pub fn init(handler: *Handler) !@This() {
} else if (ev.mask & IN.MOVED_TO != 0) { const inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags));
// Look for a paired MOVED_FROM. errdefer std.posix.close(inotify_fd);
var found: ?usize = null; switch (variant) {
for (self.pending_renames.items, 0..) |r, i| { .threaded => {
if (r.cookie == ev.cookie) { const stop_pipe = try std.posix.pipe();
found = i; return .{
break; .handler = handler,
} .inotify_fd = inotify_fd,
} .watches = .empty,
if (found) |i| { .pending_renames = .empty,
// Complete rename pair: emit a single atomic rename message. .stop_pipe = stop_pipe,
const r = self.pending_renames.swapRemove(i); .thread = null,
defer allocator.free(r.path); };
try self.handler.rename(r.path, full_path, r.object_type); },
} else { .polling => {
// No paired MOVED_FROM, file was moved in from outside the watched tree. return .{
const ot: ObjectType = if (ev.mask & IN.ISDIR != 0) .dir else .file; .handler = handler,
try self.handler.change(full_path, EventType.created, ot); .inotify_fd = inotify_fd,
} .watches = .empty,
} else if (ev.mask & IN.MOVE_SELF != 0) { .pending_renames = .empty,
// The watched directory itself was renamed/moved away. .stop_pipe = {},
try self.handler.change(full_path, EventType.deleted, .dir); .thread = {},
} else { };
const is_dir = ev.mask & IN.ISDIR != 0; },
const object_type: ObjectType = if (is_dir) .dir else .file;
const event_type: EventType = if (ev.mask & IN.CREATE != 0)
.created
else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) blk: {
// Suppress IN_DELETE|IN_ISDIR for subdirs that have their
// own watch: IN_DELETE_SELF on that watch will fire the
// same path without duplication.
if (is_dir and self.has_watch_for_path(full_path))
continue;
break :blk .deleted;
} else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0)
.modified
else
continue;
try self.handler.change(full_path, event_type, object_type);
} }
} }
}
pub fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
if (comptime variant == .threaded) {
// Signal thread to stop and wait for it to exit.
_ = std.posix.write(self.stop_pipe[1], "x") catch {};
if (self.thread) |t| t.join();
std.posix.close(self.stop_pipe[0]);
std.posix.close(self.stop_pipe[1]);
}
var it = self.watches.iterator();
while (it.next()) |entry| allocator.free(entry.value_ptr.*);
self.watches.deinit(allocator);
for (self.pending_renames.items) |r| allocator.free(r.path);
self.pending_renames.deinit(allocator);
std.posix.close(self.inotify_fd);
}
pub fn arm(self: *@This(), allocator: std.mem.Allocator) error{HandlerFailed}!void {
switch (variant) {
.threaded => {
if (self.thread != null) return; // already running
self.thread = std.Thread.spawn(.{}, thread_fn, .{ self, allocator }) catch return error.HandlerFailed;
},
.polling => {
return switch (self.handler.wait_readable() catch |e| switch (e) {
error.HandlerFailed => |e_| return e_,
}) {
.will_notify => {},
};
},
}
}
fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void {
var pfds = [_]std.posix.pollfd{
.{ .fd = self.inotify_fd, .events = std.posix.POLL.IN, .revents = 0 },
.{ .fd = self.stop_pipe[0], .events = std.posix.POLL.IN, .revents = 0 },
};
while (true) {
_ = std.posix.poll(&pfds, -1) catch return;
if (pfds[1].revents & std.posix.POLL.IN != 0) return; // stop signal
if (pfds[0].revents & std.posix.POLL.IN != 0) {
self.handle_read_ready(allocator) catch return;
}
}
}
pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void {
const path_z = try allocator.dupeZ(u8, path);
defer allocator.free(path_z);
const wd = std.os.linux.inotify_add_watch(self.inotify_fd, path_z, watch_mask);
switch (std.posix.errno(wd)) {
.SUCCESS => {},
else => |e| {
std.log.err("nightwatch.add_watch failed: {t}", .{e});
return error.WatchFailed;
},
}
const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path);
const result = try self.watches.getOrPut(allocator, @intCast(wd));
if (result.found_existing) allocator.free(result.value_ptr.*);
result.value_ptr.* = owned_path;
}
pub fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
var it = self.watches.iterator();
while (it.next()) |entry| {
if (!std.mem.eql(u8, entry.value_ptr.*, path)) continue;
_ = std.os.linux.inotify_rm_watch(self.inotify_fd, entry.key_ptr.*);
allocator.free(entry.value_ptr.*);
self.watches.removeByPtr(entry.key_ptr);
return;
}
}
fn has_watch_for_path(self: *const @This(), path: []const u8) bool {
var it = self.watches.iterator();
while (it.next()) |entry| {
if (std.mem.eql(u8, entry.value_ptr.*, path)) return true;
}
return false;
}
pub fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ NoSpaceLeft, OutOfMemory, HandlerFailed })!void {
const InotifyEvent = extern struct {
wd: i32,
mask: u32,
cookie: u32,
len: u32,
};
var buf: [65536]u8 align(@alignOf(InotifyEvent)) = undefined;
defer {
// Any unpaired MOVED_FROM means the file was moved out of the watched tree.
for (self.pending_renames.items) |r| {
self.handler.change(r.path, EventType.deleted, r.object_type) catch {};
allocator.free(r.path);
}
self.pending_renames.clearRetainingCapacity();
}
while (true) {
const n = std.posix.read(self.inotify_fd, &buf) catch |e| switch (e) {
error.WouldBlock => {
// re-arm the file_discriptor
try self.arm(allocator);
break;
},
else => return e,
};
var offset: usize = 0;
while (offset + @sizeOf(InotifyEvent) <= n) {
const ev: *const InotifyEvent = @ptrCast(@alignCast(buf[offset..].ptr));
const name_offset = offset + @sizeOf(InotifyEvent);
offset = name_offset + ev.len;
const watched_path = self.watches.get(ev.wd) orelse continue;
const name: []const u8 = if (ev.len > 0)
std.mem.sliceTo(buf[name_offset..][0..ev.len], 0)
else
"";
var full_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path: []const u8 = if (name.len > 0)
try std.fmt.bufPrint(&full_buf, "{s}/{s}", .{ watched_path, name })
else
watched_path;
if (ev.mask & IN.MOVED_FROM != 0) {
// Park it, we may receive a paired MOVED_TO with the same cookie.
try self.pending_renames.append(allocator, .{
.cookie = ev.cookie,
.path = try allocator.dupe(u8, full_path),
.object_type = if (ev.mask & IN.ISDIR != 0) .dir else .file,
});
} else if (ev.mask & IN.MOVED_TO != 0) {
// Look for a paired MOVED_FROM.
var found: ?usize = null;
for (self.pending_renames.items, 0..) |r, i| {
if (r.cookie == ev.cookie) {
found = i;
break;
}
}
if (found) |i| {
// Complete rename pair: emit a single atomic rename message.
const r = self.pending_renames.swapRemove(i);
defer allocator.free(r.path);
try self.handler.rename(r.path, full_path, r.object_type);
} else {
// No paired MOVED_FROM, file was moved in from outside the watched tree.
const ot: ObjectType = if (ev.mask & IN.ISDIR != 0) .dir else .file;
try self.handler.change(full_path, EventType.created, ot);
}
} else if (ev.mask & IN.MOVE_SELF != 0) {
// The watched directory itself was renamed/moved away.
try self.handler.change(full_path, EventType.deleted, .dir);
} else {
const is_dir = ev.mask & IN.ISDIR != 0;
const object_type: ObjectType = if (is_dir) .dir else .file;
const event_type: EventType = if (ev.mask & IN.CREATE != 0)
.created
else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) blk: {
// Suppress IN_DELETE|IN_ISDIR for subdirs that have their
// own watch: IN_DELETE_SELF on that watch will fire the
// same path without duplication.
if (is_dir and self.has_watch_for_path(full_path))
continue;
break :blk .deleted;
} else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0)
.modified
else
continue;
try self.handler.change(full_path, event_type, object_type);
}
}
}
}
};
} }

View file

@ -3,8 +3,9 @@ const builtin = @import("builtin");
const nightwatch = @import("nightwatch"); const nightwatch = @import("nightwatch");
const is_posix = switch (builtin.os.tag) { const is_posix = switch (builtin.os.tag) {
.linux, .macos, .freebsd => true, .linux, .macos, .freebsd, .openbsd, .netbsd, .dragonfly => true,
else => false, .windows => false,
else => @compileError("unsupported OS"),
}; };
// Self-pipe: signal handler writes a byte so poll() / read() unblocks cleanly. // Self-pipe: signal handler writes a byte so poll() / read() unblocks cleanly.
@ -22,7 +23,6 @@ const CliHandler = struct {
const vtable = nightwatch.Handler.VTable{ const vtable = nightwatch.Handler.VTable{
.change = change_cb, .change = change_cb,
.rename = rename_cb, .rename = rename_cb,
.wait_readable = if (nightwatch.linux_poll_mode) wait_readable_cb else {},
}; };
fn change_cb(h: *nightwatch.Handler, path: []const u8, event_type: nightwatch.EventType, object_type: nightwatch.ObjectType) error{HandlerFailed}!void { fn change_cb(h: *nightwatch.Handler, path: []const u8, event_type: nightwatch.EventType, object_type: nightwatch.ObjectType) error{HandlerFailed}!void {
@ -215,7 +215,7 @@ pub fn main() !void {
.ignore = ignore_list.items, .ignore = ignore_list.items,
}; };
var watcher = try nightwatch.init(allocator, &cli_handler.handler); var watcher = try nightwatch.Default.init(allocator, &cli_handler.handler);
defer watcher.deinit(); defer watcher.deinit();
for (watch_paths.items) |path| { for (watch_paths.items) |path| {

View file

@ -7,133 +7,202 @@ pub const EventType = types.EventType;
pub const ObjectType = types.ObjectType; pub const ObjectType = types.ObjectType;
pub const Error = types.Error; pub const Error = types.Error;
pub const ReadableStatus = types.ReadableStatus; pub const ReadableStatus = types.ReadableStatus;
pub const linux_poll_mode = types.linux_poll_mode; pub const InterfaceType = types.InterfaceType;
pub const Handler = types.Handler; pub const Handler = types.Handler;
pub const PollingHandler = types.PollingHandler;
/// True if the current backend detects file content modifications in real time. pub const Variant = switch (builtin.os.tag) {
/// False only when kqueue_dir_only=true, where directory-level watches are used .linux => InterfaceType,
/// and file writes do not trigger a directory NOTE_WRITE event. .macos => if (build_options.macos_fsevents) enum { fsevents, kqueue, kqueuedir } else enum { kqueue, kqueuedir },
pub const detects_file_modifications = Backend.detects_file_modifications; .freebsd, .openbsd, .netbsd, .dragonfly => enum { kqueue, kqueuedir },
.windows => enum { windows },
const Backend = switch (builtin.os.tag) { else => @compileError("unsupported OS"),
.linux => @import("backend/INotify.zig"),
.macos => if (build_options.macos_fsevents) @import("backend/FSEvents.zig") else if (build_options.kqueue_dir_only) @import("backend/KQueueDir.zig") else @import("backend/KQueue.zig"),
.freebsd, .openbsd, .netbsd, .dragonfly => if (build_options.kqueue_dir_only) @import("backend/KQueueDir.zig") else @import("backend/KQueue.zig"),
.windows => @import("backend/Windows.zig"),
else => @compileError("file_watcher: unsupported OS"),
}; };
allocator: std.mem.Allocator, pub const defaultVariant: Variant = switch (builtin.os.tag) {
interceptor: *Interceptor, .linux => .threaded,
.macos, .freebsd, .openbsd, .netbsd, .dragonfly => .kqueue,
.windows => .windows,
else => @compileError("unsupported OS"),
};
pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() { pub const Default: type = Create(defaultVariant);
const ic = try allocator.create(Interceptor);
errdefer allocator.destroy(ic);
ic.* = .{
.handler = .{ .vtable = &Interceptor.vtable },
.user_handler = handler,
.allocator = allocator,
.backend = undefined,
};
ic.backend = try Backend.init(&ic.handler);
errdefer ic.backend.deinit(allocator);
try ic.backend.arm(allocator);
return .{ .allocator = allocator, .interceptor = ic };
}
pub fn deinit(self: *@This()) void { pub fn Create(comptime variant: Variant) type {
self.interceptor.backend.deinit(self.allocator); return struct {
self.allocator.destroy(self.interceptor); pub const Backend = switch (builtin.os.tag) {
} .linux => @import("backend/INotify.zig").Create(variant),
.macos => if (build_options.macos_fsevents) switch (variant) {
.fsevents => @import("backend/FSEvents.zig"),
.kqueue => @import("backend/KQueue.zig"),
.kqueuedir => @import("backend/KQueueDir.zig"),
} else switch (variant) {
.kqueue => @import("backend/KQueue.zig"),
.kqueuedir => @import("backend/KQueueDir.zig"),
},
.freebsd, .openbsd, .netbsd, .dragonfly => switch (variant) {
.kqueue => @import("backend/KQueue.zig"),
.kqueuedir => @import("backend/KQueueDir.zig"),
},
.windows => switch (variant) {
.windows => @import("backend/Windows.zig"),
},
else => @compileError("unsupported OS"),
};
pub const interfaceType: InterfaceType = switch (builtin.os.tag) {
.linux => variant,
else => .threaded,
};
/// Watch a path (file or directory) for changes. The handler will receive allocator: std.mem.Allocator,
/// `change` and (linux only) `rename` calls. When path is a directory, interceptor: *Interceptor,
/// all subdirectories are watched recursively and new directories created
/// inside are watched automatically.
pub fn watch(self: *@This(), path: []const u8) Error!void {
// Make the path absolute without resolving symlinks so that callers who
// pass "/tmp/foo" (where /tmp is a symlink) receive events with the same
// "/tmp/foo" prefix rather than the resolved "/private/tmp/foo" prefix.
var buf: [std.fs.max_path_bytes]u8 = undefined;
const abs_path: []const u8 = if (std.fs.path.isAbsolute(path))
path
else blk: {
var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
const cwd = std.fs.cwd().realpath(".", &cwd_buf) catch return error.WatchFailed;
break :blk std.fmt.bufPrint(&buf, "{s}/{s}", .{ cwd, path }) catch return error.WatchFailed;
};
try self.interceptor.backend.add_watch(self.allocator, abs_path);
if (!Backend.watches_recursively) {
recurse_watch(&self.interceptor.backend, self.allocator, abs_path);
}
}
/// Stop watching a previously watched path /// True if the current backend detects file content modifications in real time.
pub fn unwatch(self: *@This(), path: []const u8) void { /// False only when kqueue_dir_only=true, where directory-level watches are used
self.interceptor.backend.remove_watch(self.allocator, path); /// and file writes do not trigger a directory NOTE_WRITE event.
} pub const detects_file_modifications = Backend.detects_file_modifications;
/// Drive event delivery by reading from the inotify fd. pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() {
/// Only available in Linux poll mode (linux_poll_mode == true). const ic = try allocator.create(Interceptor);
pub fn handle_read_ready(self: *@This()) !void { errdefer allocator.destroy(ic);
comptime if (!linux_poll_mode) @compileError("handle_read_ready is only available in Linux poll mode; use linux_read_thread=true for a background-thread model"); ic.* = .{
try self.interceptor.backend.handle_read_ready(self.allocator); .handler = .{ .vtable = &Interceptor.vtable },
} .user_handler = handler,
.allocator = allocator,
/// Returns the inotify file descriptor that should be polled for POLLIN .backend = undefined,
/// before calling handle_read_ready(). };
/// Only available in Linux poll mode (linux_poll_mode == true). ic.backend = try Backend.init(&ic.handler);
pub fn poll_fd(self: *const @This()) std.posix.fd_t { errdefer ic.backend.deinit(allocator);
comptime if (!linux_poll_mode) @compileError("poll_fd is only available in Linux poll mode; use linux_read_thread=true for a background-thread model"); try ic.backend.arm(allocator);
return self.interceptor.backend.inotify_fd; return .{ .allocator = allocator, .interceptor = ic };
}
// Wraps the user's handler to intercept dir_created events and auto-watch
// new directories before forwarding to the user.
// Heap-allocated so that &ic.handler stays valid regardless of how the
// nightwatch struct is moved after init() returns.
const Interceptor = struct {
handler: Handler,
user_handler: *Handler,
allocator: std.mem.Allocator,
backend: Backend,
const vtable = Handler.VTable{
.change = change_cb,
.rename = rename_cb,
.wait_readable = if (linux_poll_mode) wait_readable_cb else {},
};
fn change_cb(h: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
if (event_type == .created and object_type == .dir and !Backend.watches_recursively) {
self.backend.add_watch(self.allocator, path) catch {};
recurse_watch(&self.backend, self.allocator, path);
} }
return self.user_handler.change(path, event_type, object_type);
}
fn rename_cb(h: *Handler, src: []const u8, dst: []const u8, object_type: ObjectType) error{HandlerFailed}!void { pub fn deinit(self: *@This()) void {
const self: *Interceptor = @fieldParentPtr("handler", h); self.interceptor.backend.deinit(self.allocator);
return self.user_handler.rename(src, dst, object_type); self.allocator.destroy(self.interceptor);
} }
fn wait_readable_cb(h: *Handler) error{HandlerFailed}!ReadableStatus { /// Watch a path (file or directory) for changes. The handler will receive
const self: *Interceptor = @fieldParentPtr("handler", h); /// `change` and (linux only) `rename` calls. When path is a directory,
return self.user_handler.wait_readable(); /// all subdirectories are watched recursively and new directories created
} /// inside are watched automatically.
}; pub fn watch(self: *@This(), path: []const u8) Error!void {
// Make the path absolute without resolving symlinks so that callers who
// pass "/tmp/foo" (where /tmp is a symlink) receive events with the same
// "/tmp/foo" prefix rather than the resolved "/private/tmp/foo" prefix.
var buf: [std.fs.max_path_bytes]u8 = undefined;
const abs_path: []const u8 = if (std.fs.path.isAbsolute(path))
path
else blk: {
var cwd_buf: [std.fs.max_path_bytes]u8 = undefined;
const cwd = std.fs.cwd().realpath(".", &cwd_buf) catch return error.WatchFailed;
break :blk std.fmt.bufPrint(&buf, "{s}/{s}", .{ cwd, path }) catch return error.WatchFailed;
};
try self.interceptor.backend.add_watch(self.allocator, abs_path);
if (!Backend.watches_recursively) {
recurse_watch(&self.interceptor.backend, self.allocator, abs_path);
}
}
// Scans subdirectories of dir_path and adds a watch for each one, recursively. /// Stop watching a previously watched path
fn recurse_watch(backend: *Backend, allocator: std.mem.Allocator, dir_path: []const u8) void { pub fn unwatch(self: *@This(), path: []const u8) void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; self.interceptor.backend.remove_watch(self.allocator, path);
defer dir.close(); }
var it = dir.iterate();
while (it.next() catch return) |entry| { /// Drive event delivery by reading from the inotify fd.
if (entry.kind != .directory) continue; /// Only available in Linux poll mode (linux_poll_mode == true).
var buf: [std.fs.max_path_bytes]u8 = undefined; pub fn handle_read_ready(self: *@This()) !void {
const sub = std.fmt.bufPrint(&buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue; comptime if (@hasDecl(Backend, "polling") and Backend.polling) @compileError("handle_read_ready is only available in polling backends");
backend.add_watch(allocator, sub) catch {}; try self.interceptor.backend.handle_read_ready(self.allocator);
recurse_watch(backend, allocator, sub); }
}
/// Returns the inotify file descriptor that should be polled for POLLIN
/// before calling handle_read_ready().
/// Only available in Linux poll mode (linux_poll_mode == true).
pub fn poll_fd(self: *const @This()) std.posix.fd_t {
comptime if (@hasDecl(Backend, "polling") and Backend.polling) @compileError("poll_fd is only available in polling backends");
return self.interceptor.backend.inotify_fd;
}
// Wraps the user's handler to intercept dir_created events and auto-watch
// new directories before forwarding to the user.
// Heap-allocated so that &ic.handler stays valid regardless of how the
// nightwatch struct is moved after init() returns.
const Interceptor = struct {
handler: Handler,
user_handler: *Handler,
allocator: std.mem.Allocator,
backend: Backend,
const vtable = Handler.VTable{
.change = change_cb,
.rename = rename_cb,
};
fn change_cb(h: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
if (event_type == .created and object_type == .dir and !Backend.watches_recursively) {
self.backend.add_watch(self.allocator, path) catch {};
recurse_watch(&self.backend, self.allocator, path);
}
return self.user_handler.change(path, event_type, object_type);
}
fn rename_cb(h: *Handler, src: []const u8, dst: []const u8, object_type: ObjectType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.rename(src, dst, object_type);
}
fn wait_readable_cb(h: *Handler) error{HandlerFailed}!ReadableStatus {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.wait_readable();
}
};
const PollingInterceptor = struct {
handler: PollingHandler,
user_handler: *PollingHandler,
allocator: std.mem.Allocator,
backend: Backend,
const vtable = PollingHandler.VTable{
.change = change_cb,
.rename = rename_cb,
.wait_readable = wait_readable_cb,
};
fn change_cb(h: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
if (event_type == .created and object_type == .dir and !Backend.watches_recursively) {
self.backend.add_watch(self.allocator, path) catch {};
recurse_watch(&self.backend, self.allocator, path);
}
return self.user_handler.change(path, event_type, object_type);
}
fn rename_cb(h: *Handler, src: []const u8, dst: []const u8, object_type: ObjectType) error{HandlerFailed}!void {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.rename(src, dst, object_type);
}
fn wait_readable_cb(h: *Handler) error{HandlerFailed}!ReadableStatus {
const self: *Interceptor = @fieldParentPtr("handler", h);
return self.user_handler.wait_readable();
}
};
// Scans subdirectories of dir_path and adds a watch for each one, recursively.
fn recurse_watch(backend: *Backend, allocator: std.mem.Allocator, dir_path: []const u8) void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
var it = dir.iterate();
while (it.next() catch return) |entry| {
if (entry.kind != .directory) continue;
var buf: [std.fs.max_path_bytes]u8 = undefined;
const sub = std.fmt.bufPrint(&buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue;
backend.add_watch(allocator, sub) catch {};
recurse_watch(backend, allocator, sub);
}
}
};
} }

View file

@ -49,7 +49,6 @@ const TestHandler = struct {
const vtable = nw.Handler.VTable{ const vtable = nw.Handler.VTable{
.change = change_cb, .change = change_cb,
.rename = rename_cb, .rename = rename_cb,
.wait_readable = if (nw.linux_poll_mode) wait_readable_cb else {},
}; };
fn change_cb(handler: *nw.Handler, path: []const u8, event_type: nw.EventType, object_type: nw.ObjectType) error{HandlerFailed}!void { fn change_cb(handler: *nw.Handler, path: []const u8, event_type: nw.EventType, object_type: nw.ObjectType) error{HandlerFailed}!void {
@ -135,7 +134,7 @@ const TestHandler = struct {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Watcher type alias - nightwatch.zig is itself a struct type. // Watcher type alias - nightwatch.zig is itself a struct type.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const Watcher = nw; const Watcher = nw.Default;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Test utilities // Test utilities
@ -178,13 +177,12 @@ fn removeTempDir(path: []const u8) void {
} }
/// Drive event delivery: /// Drive event delivery:
/// - Linux: call handle_read_ready() so inotify events are processed. /// - polling watchers: call handle_read_ready() so events are processed.
/// - Others: the backend uses its own thread/callback; sleep briefly. /// - threaded watchers: the backend uses its own thread/callback; sleep briefly.
fn drainEvents(watcher: *Watcher) !void { fn drainEvents(watcher: *Watcher) !void {
if (nw.linux_poll_mode) { switch (Watcher.interfaceType) {
try watcher.handle_read_ready(); .polling => try watcher.handle_read_ready(),
} else { .threaded => std.Thread.sleep(300 * std.time.ns_per_ms),
std.Thread.sleep(300 * std.time.ns_per_ms);
} }
} }
@ -223,7 +221,7 @@ test "creating a file emits a 'created' event" {
test "writing to a file emits a 'modified' event" { test "writing to a file emits a 'modified' event" {
// kqueue watches directories only; file writes don't trigger a directory event, // kqueue watches directories only; file writes don't trigger a directory event,
// so modifications are not reliably detectable in real time on this backend. // so modifications are not reliably detectable in real time on this backend.
if (comptime !nw.detects_file_modifications) return error.SkipZigTest; if (comptime !Watcher.detects_file_modifications) return error.SkipZigTest;
const allocator = std.testing.allocator; const allocator = std.testing.allocator;
@ -516,7 +514,7 @@ test "rename-then-modify: rename event precedes the subsequent modify event" {
// After renaming a file, a write to the new name should produce events in // After renaming a file, a write to the new name should produce events in
// the order [rename/old-name, rename/new-name, modify] so that a consumer // the order [rename/old-name, rename/new-name, modify] so that a consumer
// always knows the current identity of the file before seeing changes to it. // always knows the current identity of the file before seeing changes to it.
if (comptime !nw.detects_file_modifications) return error.SkipZigTest; if (comptime !Watcher.detects_file_modifications) return error.SkipZigTest;
const allocator = std.testing.allocator; const allocator = std.testing.allocator;

View file

@ -1,6 +1,5 @@
const std = @import("std"); const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const build_options = @import("build_options");
pub const EventType = enum { pub const EventType = enum {
created, created,
@ -25,14 +24,9 @@ pub const Error = error{
WatchFailed, WatchFailed,
}; };
/// True when the Linux inotify backend runs in poll mode (caller drives the pub const InterfaceType = enum {
/// event loop via poll_fd / handle_read_ready). False on all other platforms polling,
/// and on Linux when the `linux_read_thread` build option is set. threaded,
pub const linux_poll_mode = builtin.os.tag == .linux and !build_options.linux_read_thread;
pub const ReadableStatus = enum {
// TODO: is_readable, // backend may now read from fd (blocking mode)
will_notify, // backend must wait for a handle_read_ready call
}; };
pub const Handler = struct { pub const Handler = struct {
@ -41,8 +35,31 @@ pub const Handler = struct {
pub const VTable = struct { pub const VTable = struct {
change: *const fn (handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void, change: *const fn (handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void,
rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void, rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void,
/// Only present in Linux poll mode (linux_poll_mode == true). };
wait_readable: if (linux_poll_mode) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void,
pub fn change(handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void {
return handler.vtable.change(handler, path, event_type, object_type);
}
pub fn rename(handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void {
return handler.vtable.rename(handler, src_path, dst_path, object_type);
}
};
/// Used only by the inotify backend in poll mode (caller drives the event
/// loop via poll_fd / handle_read_ready)
pub const PollingHandler = struct {
vtable: *const VTable,
pub const ReadableStatus = enum {
// TODO: is_readable, // backend may now read from fd (blocking mode)
will_notify, // backend must wait for a handle_read_ready call
};
pub const VTable = struct {
change: *const fn (handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void,
rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8, object_type: ObjectType) error{HandlerFailed}!void,
wait_readable: *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus,
}; };
pub fn change(handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void { pub fn change(handler: *Handler, path: []const u8, event_type: EventType, object_type: ObjectType) error{HandlerFailed}!void {
@ -54,10 +71,6 @@ pub const Handler = struct {
} }
pub fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus { pub fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus {
if (comptime linux_poll_mode) { return handler.vtable.wait_readable(handler);
return handler.vtable.wait_readable(handler);
} else {
unreachable;
}
} }
}; };