build(zig-0.16): port kqueue backends to zig-0.16

This commit is contained in:
CJ van den Berg 2026-04-13 13:23:29 +02:00
parent 8203d7bdc7
commit 5c863fbb4a
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9
2 changed files with 72 additions and 108 deletions

View file

@ -38,7 +38,12 @@ const NOTE_EXTEND: u32 = 0x00000004;
const NOTE_ATTRIB: u32 = 0x00000008; const NOTE_ATTRIB: u32 = 0x00000008;
const NOTE_RENAME: u32 = 0x00000020; const NOTE_RENAME: u32 = 0x00000020;
pub fn init(io: std.Io, handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { fn kevent_add(kq: std.posix.fd_t, kev: *const std.posix.Kevent) bool {
var ev_out: [1]std.posix.Kevent = undefined;
return std.posix.system.kevent(kq, @ptrCast(kev), 1, &ev_out, 0, null) >= 0;
}
pub fn init(io: std.Io, handler: *Handler) !@This() {
// Per-file kqueue watches require one open fd per watched file. Bump // Per-file kqueue watches require one open fd per watched file. Bump
// the soft NOFILE limit to the hard limit so large directory trees don't // the soft NOFILE limit to the hard limit so large directory trees don't
// exhaust the default quota (256 on macOS, 1024 on many FreeBSD installs). // exhaust the default quota (256 on macOS, 1024 on many FreeBSD installs).
@ -46,7 +51,11 @@ pub fn init(io: std.Io, handler: *Handler) (std.posix.KQueueError || std.posix.K
if (rl.cur < rl.max) if (rl.cur < rl.max)
std.posix.setrlimit(.NOFILE, .{ .cur = rl.max, .max = rl.max }) catch {}; std.posix.setrlimit(.NOFILE, .{ .cur = rl.max, .max = rl.max }) catch {};
} else |_| {} } else |_| {}
const kq = try std.posix.kqueue(); const kq: std.posix.fd_t = blk: {
const fd = std.posix.system.kqueue();
if (fd < 0) return error.SystemResources;
break :blk @intCast(fd);
};
errdefer std.Io.Threaded.closeFd(kq); errdefer std.Io.Threaded.closeFd(kq);
const pipe = try std.Io.Threaded.pipe2(.{}); const pipe = try std.Io.Threaded.pipe2(.{});
errdefer { errdefer {
@ -63,7 +72,7 @@ pub fn init(io: std.Io, handler: *Handler) (std.posix.KQueueError || std.posix.K
.data = 0, .data = 0,
.udata = 0, .udata = 0,
}; };
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); if (!kevent_add(kq, &shutdown_kev)) return error.SystemResources;
return .{ return .{
.handler = handler, .handler = handler,
.kq = kq, .kq = kq,
@ -118,11 +127,12 @@ fn thread_fn(self: *@This(), io: std.Io, allocator: std.mem.Allocator) void {
var events: [64]std.posix.Kevent = undefined; var events: [64]std.posix.Kevent = undefined;
while (true) { while (true) {
// Block indefinitely until kqueue has events. // Block indefinitely until kqueue has events.
const n = std.posix.kevent(self.kq, &.{}, &events, null) catch |e| { const n_raw = std.posix.system.kevent(self.kq, @ptrCast(&events), 0, &events, @intCast(events.len), null);
std.log.err("nightwatch: kevent failed: {s}, stopping watch thread", .{@errorName(e)}); if (n_raw < 0) {
std.log.err("nightwatch: kevent failed, stopping watch thread", .{});
break; break;
}; }
for (events[0..n]) |ev| { for (events[0..@intCast(n_raw)]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
const fd: std.posix.fd_t = @intCast(ev.ident); const fd: std.posix.fd_t = @intCast(ev.ident);
@ -344,7 +354,12 @@ fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []con
const already = self.file_watches.contains(path); const already = self.file_watches.contains(path);
self.file_watches_mutex.unlock(self.io); self.file_watches_mutex.unlock(self.io);
if (already) return; if (already) return;
const fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return; const path_z = std.posix.toPosixPath(path) catch return;
const fd: std.posix.fd_t = blk: {
const raw = std.posix.system.open(&path_z, .{}, @as(c_uint, 0));
if (raw < 0) return;
break :blk @intCast(raw);
};
const kev = std.posix.Kevent{ const kev = std.posix.Kevent{
.ident = @intCast(fd), .ident = @intCast(fd),
.filter = EVFILT_VNODE, .filter = EVFILT_VNODE,
@ -353,10 +368,10 @@ fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []con
.data = 0, .data = 0,
.udata = 0, .udata = 0,
}; };
_ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch { if (!kevent_add(self.kq, &kev)) {
std.Io.Threaded.closeFd(fd); std.Io.Threaded.closeFd(fd);
return; return;
}; }
const owned = allocator.dupe(u8, path) catch { const owned = allocator.dupe(u8, path) catch {
std.Io.Threaded.closeFd(fd); std.Io.Threaded.closeFd(fd);
return; return;
@ -392,35 +407,14 @@ pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8)
const already = self.watches.contains(path); const already = self.watches.contains(path);
self.watches_mutex.unlock(self.io); self.watches_mutex.unlock(self.io);
if (already) return; if (already) return;
const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { const path_z = std.posix.toPosixPath(path) catch return error.WatchFailed;
error.AccessDenied, const path_fd: std.posix.fd_t = blk: {
error.PermissionDenied, const raw = std.posix.system.open(&path_z, .{}, @as(c_uint, 0));
error.PathAlreadyExists, if (raw < 0) {
error.SymLinkLoop, std.log.err("{s} failed: open", .{@src().fn_name});
error.NameTooLong,
error.FileNotFound,
error.SystemResources,
error.NoSpaceLeft,
error.NotDir,
error.InvalidUtf8,
error.InvalidWtf8,
error.BadPathName,
error.NoDevice,
error.NetworkNotFound,
error.Unexpected,
error.ProcessFdQuotaExceeded,
error.SystemFdQuotaExceeded,
error.ProcessNotFound,
error.FileTooBig,
error.IsDir,
error.DeviceBusy,
error.FileLocksNotSupported,
error.FileBusy,
error.WouldBlock,
=> |e_| {
std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ });
return error.WatchFailed; return error.WatchFailed;
}, }
break :blk @intCast(raw);
}; };
errdefer std.Io.Threaded.closeFd(path_fd); errdefer std.Io.Threaded.closeFd(path_fd);
const kev = std.posix.Kevent{ const kev = std.posix.Kevent{
@ -431,17 +425,10 @@ pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8)
.data = 0, .data = 0,
.udata = 0, .udata = 0,
}; };
_ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { if (!kevent_add(self.kq, &kev)) {
error.AccessDenied, std.log.err("{s} failed: kevent", .{@src().fn_name});
error.SystemResources, return error.WatchFailed;
error.EventNotFound, }
error.ProcessNotFound,
error.Overflow,
=> |e_| {
std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ });
return error.WatchFailed;
},
};
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
self.watches_mutex.lockUncancelable(self.io); self.watches_mutex.lockUncancelable(self.io);
if (self.watches.contains(path)) { if (self.watches.contains(path)) {
@ -458,16 +445,11 @@ pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8)
self.watches_mutex.unlock(self.io); self.watches_mutex.unlock(self.io);
// Take initial snapshot so first NOTE_WRITE has a baseline to diff against. // Take initial snapshot so first NOTE_WRITE has a baseline to diff against.
self.take_snapshot(allocator, owned_path) catch |e| switch (e) { self.take_snapshot(allocator, owned_path) catch |e| switch (e) {
error.AccessDenied, error.OutOfMemory => return error.OutOfMemory,
error.PermissionDenied, else => |e_| {
error.SystemResources,
error.InvalidUtf8,
error.Unexpected,
=> |e_| {
std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ }); std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ });
return error.WatchFailed; return error.WatchFailed;
}, },
error.OutOfMemory => return error.OutOfMemory,
}; };
} }

View file

@ -22,7 +22,7 @@ watches_mutex: std.Io.Mutex,
// Used to diff on NOTE_WRITE: detects creates, deletes, and (opportunistically) // Used to diff on NOTE_WRITE: detects creates, deletes, and (opportunistically)
// modifications when the same directory fires another event later. // modifications when the same directory fires another event later.
// Key: independently owned dir path, value: map of owned filename -> mtime_ns. // Key: independently owned dir path, value: map of owned filename -> mtime_ns.
snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(i128)), snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(std.Io.Timestamp)),
snapshots_mutex: std.Io.Mutex, snapshots_mutex: std.Io.Mutex,
io: std.Io, io: std.Io,
@ -38,8 +38,17 @@ const NOTE_EXTEND: u32 = 0x00000004;
const NOTE_ATTRIB: u32 = 0x00000008; const NOTE_ATTRIB: u32 = 0x00000008;
const NOTE_RENAME: u32 = 0x00000020; const NOTE_RENAME: u32 = 0x00000020;
pub fn init(io: std.Io, handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { fn kevent_add(kq: std.posix.fd_t, kev: *const std.posix.Kevent) bool {
const kq = try std.posix.kqueue(); var ev_out: [1]std.posix.Kevent = undefined;
return std.posix.system.kevent(kq, @ptrCast(kev), 1, &ev_out, 0, null) >= 0;
}
pub fn init(io: std.Io, handler: *Handler) !@This() {
const kq: std.posix.fd_t = blk: {
const fd = std.posix.system.kqueue();
if (fd < 0) return error.SystemResources;
break :blk @intCast(fd);
};
errdefer std.Io.Threaded.closeFd(kq); errdefer std.Io.Threaded.closeFd(kq);
const pipe = try std.Io.Threaded.pipe2(.{}); const pipe = try std.Io.Threaded.pipe2(.{});
errdefer { errdefer {
@ -56,7 +65,7 @@ pub fn init(io: std.Io, handler: *Handler) (std.posix.KQueueError || std.posix.K
.data = 0, .data = 0,
.udata = 0, .udata = 0,
}; };
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); if (!kevent_add(kq, &shutdown_kev)) return error.SystemResources;
return .{ return .{
.handler = handler, .handler = handler,
.kq = kq, .kq = kq,
@ -103,11 +112,12 @@ fn thread_fn(self: *@This(), io: std.Io, allocator: std.mem.Allocator) void {
var events: [64]std.posix.Kevent = undefined; var events: [64]std.posix.Kevent = undefined;
while (true) { while (true) {
// Block indefinitely until kqueue has events. // Block indefinitely until kqueue has events.
const n = std.posix.kevent(self.kq, &.{}, &events, null) catch |e| { const n_raw = std.posix.system.kevent(self.kq, @ptrCast(&events), 0, &events, @intCast(events.len), null);
std.log.err("nightwatch: kevent failed: {s}, stopping watch thread", .{@errorName(e)}); if (n_raw < 0) {
std.log.err("nightwatch: kevent failed, stopping watch thread", .{});
break; break;
}; }
for (events[0..n]) |ev| { for (events[0..@intCast(n_raw)]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
const fd: std.posix.fd_t = @intCast(ev.ident); const fd: std.posix.fd_t = @intCast(ev.ident);
@ -186,7 +196,7 @@ fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8)
// Collect current files (name mtime_ns) and subdirectories. // Collect current files (name mtime_ns) and subdirectories.
// No lock held while doing filesystem I/O. // No lock held while doing filesystem I/O.
var current_files: std.StringHashMapUnmanaged(i128) = .empty; var current_files: std.StringHashMapUnmanaged(std.Io.Timestamp) = .empty;
var current_dirs: std.ArrayListUnmanaged([]u8) = .empty; var current_dirs: std.ArrayListUnmanaged([]u8) = .empty;
var iter = dir.iterate(); var iter = dir.iterate();
while (try iter.next(self.io)) |entry| { while (try iter.next(self.io)) |entry| {
@ -249,7 +259,7 @@ fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8)
while (cit.next()) |entry| { while (cit.next()) |entry| {
if (snapshot.getPtr(entry.key_ptr.*)) |stored_mtime| { if (snapshot.getPtr(entry.key_ptr.*)) |stored_mtime| {
// File exists in both - check for modification via mtime change. // File exists in both - check for modification via mtime change.
if (stored_mtime.* != entry.value_ptr.*) { if (!std.meta.eql(stored_mtime.*, entry.value_ptr.*)) {
stored_mtime.* = entry.value_ptr.*; stored_mtime.* = entry.value_ptr.*;
try to_modify.append(tmp, entry.key_ptr.*); // from current_files (tmp) try to_modify.append(tmp, entry.key_ptr.*); // from current_files (tmp)
} }
@ -338,35 +348,14 @@ pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8)
const already = self.watches.contains(path); const already = self.watches.contains(path);
self.watches_mutex.unlock(self.io); self.watches_mutex.unlock(self.io);
if (already) return; if (already) return;
const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch |e| switch (e) { const path_z = std.posix.toPosixPath(path) catch return error.WatchFailed;
error.AccessDenied, const path_fd: std.posix.fd_t = blk: {
error.PermissionDenied, const raw = std.posix.system.open(&path_z, .{}, @as(c_uint, 0));
error.PathAlreadyExists, if (raw < 0) {
error.SymLinkLoop, std.log.err("{s} failed: open", .{@src().fn_name});
error.NameTooLong,
error.FileNotFound,
error.SystemResources,
error.NoSpaceLeft,
error.NotDir,
error.InvalidUtf8,
error.InvalidWtf8,
error.BadPathName,
error.NoDevice,
error.NetworkNotFound,
error.Unexpected,
error.ProcessFdQuotaExceeded,
error.SystemFdQuotaExceeded,
error.ProcessNotFound,
error.FileTooBig,
error.IsDir,
error.DeviceBusy,
error.FileLocksNotSupported,
error.FileBusy,
error.WouldBlock,
=> |e_| {
std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ });
return error.WatchFailed; return error.WatchFailed;
}, }
break :blk @intCast(raw);
}; };
errdefer std.Io.Threaded.closeFd(path_fd); errdefer std.Io.Threaded.closeFd(path_fd);
const kev = std.posix.Kevent{ const kev = std.posix.Kevent{
@ -377,20 +366,13 @@ pub fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8)
.data = 0, .data = 0,
.udata = 0, .udata = 0,
}; };
_ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch |e| switch (e) { if (!kevent_add(self.kq, &kev)) {
error.AccessDenied, std.log.err("{s} failed: kevent", .{@src().fn_name});
error.SystemResources, return error.WatchFailed;
error.EventNotFound, }
error.ProcessNotFound,
error.Overflow,
=> |e_| {
std.log.err("{s} failed: {t}", .{ @src().fn_name, e_ });
return error.WatchFailed;
},
};
// Determine if the path is a regular file or a directory. // Determine if the path is a regular file or a directory.
const stat = std.posix.fstat(path_fd) catch null; var stat_buf: std.posix.Stat = undefined;
const is_file = if (stat) |s| std.posix.S.ISREG(s.mode) else false; const is_file = std.posix.system.fstat(path_fd, &stat_buf) == 0 and std.posix.S.ISREG(stat_buf.mode);
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
self.watches_mutex.lockUncancelable(self.io); self.watches_mutex.lockUncancelable(self.io);
if (self.watches.contains(path)) { if (self.watches.contains(path)) {
@ -420,7 +402,7 @@ fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const
var arena = std.heap.ArenaAllocator.init(allocator); var arena = std.heap.ArenaAllocator.init(allocator);
defer arena.deinit(); defer arena.deinit();
const tmp = arena.allocator(); const tmp = arena.allocator();
const Entry = struct { name: []u8, mtime: i128 }; const Entry = struct { name: []u8, mtime: std.Io.Timestamp };
var entries: std.ArrayListUnmanaged(Entry) = .empty; var entries: std.ArrayListUnmanaged(Entry) = .empty;
var iter = dir.iterate(); var iter = dir.iterate();
while (iter.next(self.io) catch null) |e| { while (iter.next(self.io) catch null) |e| {