feat: add support for a background read thread in linux too

This commit is contained in:
CJ van den Berg 2026-03-08 09:49:03 +01:00
parent 1cbbdbd3cb
commit 5502f8940a
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9
4 changed files with 87 additions and 20 deletions

View file

@ -12,8 +12,17 @@ pub fn build(b: *std.Build) void {
) orelse false; ) orelse false;
} else false; } else false;
const linux_read_thread = if (target.result.os.tag == .linux) blk: {
break :blk b.option(
bool,
"linux_read_thread",
"Use a background thread on Linux (like macOS/Windows) instead of requiring the caller to drive the event loop via poll_fd/handle_read_ready",
) orelse false;
} else false;
const options = b.addOptions(); const options = b.addOptions();
options.addOption(bool, "use_fsevents", use_fsevents); options.addOption(bool, "use_fsevents", use_fsevents);
options.addOption(bool, "linux_read_thread", linux_read_thread);
const options_mod = options.createModule(); const options_mod = options.createModule();
const mod = b.addModule("nightwatch", .{ const mod = b.addModule("nightwatch", .{

View file

@ -21,7 +21,7 @@ const CliHandler = struct {
const vtable = nightwatch.Handler.VTable{ const vtable = nightwatch.Handler.VTable{
.change = change_cb, .change = change_cb,
.rename = rename_cb, .rename = rename_cb,
.wait_readable = if (builtin.os.tag == .linux) wait_readable_cb else {}, .wait_readable = if (nightwatch.linux_poll_mode) wait_readable_cb else {},
}; };
fn change_cb(h: *nightwatch.Handler, path: []const u8, event_type: nightwatch.EventType) error{HandlerFailed}!void { fn change_cb(h: *nightwatch.Handler, path: []const u8, event_type: nightwatch.EventType) error{HandlerFailed}!void {
@ -166,7 +166,7 @@ pub fn main() !void {
try stderr.interface.print("on watch: {s}\n", .{path}); try stderr.interface.print("on watch: {s}\n", .{path});
} }
if (builtin.os.tag == .linux) { if (nightwatch.linux_poll_mode) {
try run_linux(&watcher); try run_linux(&watcher);
} else if (builtin.os.tag == .windows) { } else if (builtin.os.tag == .windows) {
run_windows(); run_windows();

View file

@ -20,13 +20,19 @@ pub const Error = error{
WatchFailed, WatchFailed,
}; };
/// True when the Linux inotify backend runs in poll mode (caller drives the
/// event loop via poll_fd / handle_read_ready). False on all other platforms
/// and on Linux when the `linux_read_thread` build option is set.
pub const linux_poll_mode = builtin.os.tag == .linux and !build_options.linux_read_thread;
pub const Handler = struct { pub const Handler = struct {
vtable: *const VTable, vtable: *const VTable,
pub const VTable = struct { pub const VTable = struct {
change: *const fn (handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void, change: *const fn (handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void,
rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!void, rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!void,
wait_readable: if (builtin.os.tag == .linux) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void, /// Only present in Linux poll mode (linux_poll_mode == true).
wait_readable: if (linux_poll_mode) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void,
}; };
fn change(handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void { fn change(handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void {
@ -38,7 +44,11 @@ pub const Handler = struct {
} }
fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus { fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus {
if (comptime linux_poll_mode) {
return handler.vtable.wait_readable(handler); return handler.vtable.wait_readable(handler);
} else {
unreachable;
}
} }
}; };
@ -97,14 +107,18 @@ pub fn unwatch(self: *@This(), path: []const u8) void {
self.interceptor.backend.remove_watch(self.allocator, path); self.interceptor.backend.remove_watch(self.allocator, path);
} }
/// Drive event delivery by reading from the inotify fd.
/// Only available in Linux poll mode (linux_poll_mode == true).
pub fn handle_read_ready(self: *@This()) !void { pub fn handle_read_ready(self: *@This()) !void {
comptime if (!linux_poll_mode) @compileError("handle_read_ready is only available in Linux poll mode; use linux_read_thread=true for a background-thread model");
try self.interceptor.backend.handle_read_ready(self.allocator); try self.interceptor.backend.handle_read_ready(self.allocator);
} }
/// Returns the inotify file descriptor that should be polled for POLLIN /// Returns the inotify file descriptor that should be polled for POLLIN
/// before calling handle_read_ready(). Only available on Linux. /// before calling handle_read_ready().
/// Only available in Linux poll mode (linux_poll_mode == true).
pub fn poll_fd(self: *const @This()) std.posix.fd_t { pub fn poll_fd(self: *const @This()) std.posix.fd_t {
comptime if (builtin.os.tag != .linux) @compileError("poll_fd is only available on Linux"); comptime if (!linux_poll_mode) @compileError("poll_fd is only available in Linux poll mode; use linux_read_thread=true for a background-thread model");
return self.interceptor.backend.inotify_fd; return self.interceptor.backend.inotify_fd;
} }
@ -121,7 +135,7 @@ const Interceptor = struct {
const vtable = Handler.VTable{ const vtable = Handler.VTable{
.change = change_cb, .change = change_cb,
.rename = rename_cb, .rename = rename_cb,
.wait_readable = if (builtin.os.tag == .linux) wait_readable_cb else {}, .wait_readable = if (linux_poll_mode) wait_readable_cb else {},
}; };
fn change_cb(h: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void { fn change_cb(h: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void {
@ -172,6 +186,9 @@ const INotifyBackend = struct {
handler: *Handler, handler: *Handler,
inotify_fd: std.posix.fd_t, inotify_fd: std.posix.fd_t,
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
// Used only in linux_read_thread mode:
stop_pipe: if (build_options.linux_read_thread) [2]std.posix.fd_t else void,
thread: if (build_options.linux_read_thread) ?std.Thread else void,
const IN = std.os.linux.IN; const IN = std.os.linux.IN;
@ -181,28 +198,69 @@ const INotifyBackend = struct {
const in_flags: std.os.linux.O = .{ .NONBLOCK = true }; const in_flags: std.os.linux.O = .{ .NONBLOCK = true };
fn init(handler: *Handler) error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, Unexpected }!@This() { fn init(handler: *Handler) !@This() {
const inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags));
errdefer std.posix.close(inotify_fd);
if (comptime build_options.linux_read_thread) {
const stop_pipe = try std.posix.pipe();
return .{ return .{
.handler = handler, .handler = handler,
.inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags)), .inotify_fd = inotify_fd,
.watches = .empty, .watches = .empty,
.stop_pipe = stop_pipe,
.thread = null,
}; };
} else {
return .{
.handler = handler,
.inotify_fd = inotify_fd,
.watches = .empty,
.stop_pipe = {},
.thread = {},
};
}
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
if (comptime build_options.linux_read_thread) {
// Signal thread to stop and wait for it to exit.
_ = std.posix.write(self.stop_pipe[1], "x") catch {};
if (self.thread) |t| t.join();
std.posix.close(self.stop_pipe[0]);
std.posix.close(self.stop_pipe[1]);
}
var it = self.watches.iterator(); var it = self.watches.iterator();
while (it.next()) |entry| allocator.free(entry.value_ptr.*); while (it.next()) |entry| allocator.free(entry.value_ptr.*);
self.watches.deinit(allocator); self.watches.deinit(allocator);
std.posix.close(self.inotify_fd); std.posix.close(self.inotify_fd);
} }
fn arm(self: *@This(), _: std.mem.Allocator) error{HandlerFailed}!void { fn arm(self: *@This(), allocator: std.mem.Allocator) error{HandlerFailed}!void {
if (comptime build_options.linux_read_thread) {
if (self.thread != null) return; // already running
self.thread = std.Thread.spawn(.{}, thread_fn, .{ self, allocator }) catch return error.HandlerFailed;
} else {
return switch (self.handler.wait_readable() catch |e| switch (e) { return switch (self.handler.wait_readable() catch |e| switch (e) {
error.HandlerFailed => |e_| return e_, error.HandlerFailed => |e_| return e_,
}) { }) {
.will_notify => {}, .will_notify => {},
}; };
} }
}
fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void {
var pfds = [_]std.posix.pollfd{
.{ .fd = self.inotify_fd, .events = std.posix.POLL.IN, .revents = 0 },
.{ .fd = self.stop_pipe[0], .events = std.posix.POLL.IN, .revents = 0 },
};
while (true) {
_ = std.posix.poll(&pfds, -1) catch return;
if (pfds[1].revents & std.posix.POLL.IN != 0) return; // stop signal
if (pfds[0].revents & std.posix.POLL.IN != 0) {
self.handle_read_ready(allocator) catch return;
}
}
}
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void { fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{ OutOfMemory, WatchFailed }!void {
const path_z = try allocator.dupeZ(u8, path); const path_z = try allocator.dupeZ(u8, path);

View file

@ -49,7 +49,7 @@ const TestHandler = struct {
const vtable = nw.Handler.VTable{ const vtable = nw.Handler.VTable{
.change = change_cb, .change = change_cb,
.rename = rename_cb, .rename = rename_cb,
.wait_readable = if (builtin.os.tag == .linux) wait_readable_cb else {}, .wait_readable = if (nw.linux_poll_mode) wait_readable_cb else {},
}; };
fn change_cb(handler: *nw.Handler, path: []const u8, event_type: nw.EventType) error{HandlerFailed}!void { fn change_cb(handler: *nw.Handler, path: []const u8, event_type: nw.EventType) error{HandlerFailed}!void {
@ -170,7 +170,7 @@ fn removeTempDir(path: []const u8) void {
/// - Linux: call handle_read_ready() so inotify events are processed. /// - Linux: call handle_read_ready() so inotify events are processed.
/// - Others: the backend uses its own thread/callback; sleep briefly. /// - Others: the backend uses its own thread/callback; sleep briefly.
fn drainEvents(watcher: *Watcher) !void { fn drainEvents(watcher: *Watcher) !void {
if (builtin.os.tag == .linux) { if (nw.linux_poll_mode) {
try watcher.handle_read_ready(); try watcher.handle_read_ready();
} else { } else {
std.Thread.sleep(300 * std.time.ns_per_ms); std.Thread.sleep(300 * std.time.ns_per_ms);