fix(kqueue): add per-file watches to detect modifications and fix rename ordering

kqueue's NOTE_WRITE on a directory fires only when directory entries are
added or removed, not when file contents change. This meant writes to
existing files were never reported as 'modified' events on macOS/FreeBSD.

Fix by maintaining a second set of kqueue watches on individual files
(file_watches), registered with NOTE_WRITE|NOTE_EXTEND. When either flag
fires on a file fd, a 'modified' event is emitted. File watches are
registered in take_snapshot (for files already present when watch() is
called) and in scan_dir (for newly created files), and deregistered when
files are deleted or the directory is unwatched.

Also fix two related bugs:
- NOTE_DELETE was incorrectly defined as 0x04 (NOTE_EXTEND); the correct
  value is 0x01. This could cause NOTE_EXTEND events on watched directories
  to be misreported as directory-deleted events.
- scan_dir emitted created events before deleted events, so a rename
  (old name disappears, new name appears) reported the destination before
  the source. Swapped the order so deletions are always emitted first.

Simplify thread_fn/arm to pass *KQueueBackend directly now that the backend
lives at a stable heap address inside the heap-allocated Interceptor.

All 10 integration tests now pass on FreeBSD.
This commit is contained in:
CJ van den Berg 2026-03-07 20:18:05 +01:00
parent ae4b56b62a
commit 3554794234
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -513,8 +513,10 @@ const KQueueBackend = struct {
kq: std.posix.fd_t, kq: std.posix.fd_t,
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned dir path -> fd
watches_mutex: std.Thread.Mutex, watches_mutex: std.Thread.Mutex,
file_watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned file path -> fd
file_watches_mutex: std.Thread.Mutex,
// Per-directory snapshots of filenames, used to diff on NOTE_WRITE. // Per-directory snapshots of filenames, used to diff on NOTE_WRITE.
// Key: owned dir path (same as watches key), value: set of owned filenames. // Key: owned dir path (same as watches key), value: set of owned filenames.
// Accessed from both the main thread (add_watch) and the background thread (scan_dir). // Accessed from both the main thread (add_watch) and the background thread (scan_dir).
@ -527,11 +529,11 @@ const KQueueBackend = struct {
const EV_ENABLE: u16 = 0x0004; const EV_ENABLE: u16 = 0x0004;
const EV_CLEAR: u16 = 0x0020; const EV_CLEAR: u16 = 0x0020;
const EV_DELETE: u16 = 0x0002; const EV_DELETE: u16 = 0x0002;
const NOTE_DELETE: u32 = 0x00000001;
const NOTE_WRITE: u32 = 0x00000002; const NOTE_WRITE: u32 = 0x00000002;
const NOTE_DELETE: u32 = 0x00000004;
const NOTE_RENAME: u32 = 0x00000020;
const NOTE_ATTRIB: u32 = 0x00000008;
const NOTE_EXTEND: u32 = 0x00000004; const NOTE_EXTEND: u32 = 0x00000004;
const NOTE_ATTRIB: u32 = 0x00000008;
const NOTE_RENAME: u32 = 0x00000020;
fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() { fn init(handler: *Handler) (std.posix.KQueueError || std.posix.KEventError)!@This() {
const kq = try std.posix.kqueue(); const kq = try std.posix.kqueue();
@ -559,6 +561,8 @@ const KQueueBackend = struct {
.thread = null, .thread = null,
.watches = .empty, .watches = .empty,
.watches_mutex = .{}, .watches_mutex = .{},
.file_watches = .empty,
.file_watches_mutex = .{},
.snapshots = .empty, .snapshots = .empty,
.snapshots_mutex = .{}, .snapshots_mutex = .{},
}; };
@ -576,6 +580,12 @@ const KQueueBackend = struct {
allocator.free(entry.key_ptr.*); allocator.free(entry.key_ptr.*);
} }
self.watches.deinit(allocator); self.watches.deinit(allocator);
var fit = self.file_watches.iterator();
while (fit.next()) |entry| {
std.posix.close(entry.value_ptr.*);
allocator.free(entry.key_ptr.*);
}
self.file_watches.deinit(allocator);
var sit = self.snapshots.iterator(); var sit = self.snapshots.iterator();
while (sit.next()) |entry| { while (sit.next()) |entry| {
// Keys are borrowed from self.watches and freed in the watches loop above. // Keys are borrowed from self.watches and freed in the watches loop above.
@ -590,61 +600,53 @@ const KQueueBackend = struct {
fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, allocator });
self.kq,
&self.watches,
&self.watches_mutex,
&self.snapshots,
&self.snapshots_mutex,
allocator,
self.handler,
});
} }
fn thread_fn( fn thread_fn(self: *@This(), allocator: std.mem.Allocator) void {
kq: std.posix.fd_t,
watches: *const std.StringHashMapUnmanaged(std.posix.fd_t),
watches_mutex: *std.Thread.Mutex,
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
handler: *Handler,
) void {
var events: [64]std.posix.Kevent = undefined; var events: [64]std.posix.Kevent = undefined;
while (true) { while (true) {
// Block indefinitely until kqueue has events. // Block indefinitely until kqueue has events.
const n = std.posix.kevent(kq, &.{}, &events, null) catch break; const n = std.posix.kevent(self.kq, &.{}, &events, null) catch break;
for (events[0..n]) |ev| { for (events[0..n]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
// Find the directory path for this fd. const fd: std.posix.fd_t = @intCast(ev.ident);
watches_mutex.lock();
var wit = watches.iterator(); // Check if this is a file watch: NOTE_WRITE/NOTE_EXTEND modified.
const dir_path: ?[]const u8 = while (wit.next()) |entry| { self.file_watches_mutex.lock();
if (entry.value_ptr.* == @as(std.posix.fd_t, @intCast(ev.ident))) var fwit = self.file_watches.iterator();
break entry.key_ptr.*; const file_path: ?[]const u8 = while (fwit.next()) |entry| {
if (entry.value_ptr.* == fd) break entry.key_ptr.*;
} else null; } else null;
watches_mutex.unlock(); self.file_watches_mutex.unlock();
if (file_path) |fp| {
if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND) != 0)
self.handler.change(fp, EventType.modified) catch return;
continue;
}
// Otherwise look up the directory path for this fd.
self.watches_mutex.lock();
var wit = self.watches.iterator();
const dir_path: ?[]const u8 = while (wit.next()) |entry| {
if (entry.value_ptr.* == fd) break entry.key_ptr.*;
} else null;
self.watches_mutex.unlock();
if (dir_path == null) continue; if (dir_path == null) continue;
if (ev.fflags & NOTE_DELETE != 0) { if (ev.fflags & NOTE_DELETE != 0) {
handler.change(dir_path.?, EventType.deleted) catch return; self.handler.change(dir_path.?, EventType.deleted) catch return;
} else if (ev.fflags & NOTE_RENAME != 0) { } else if (ev.fflags & NOTE_RENAME != 0) {
handler.change(dir_path.?, EventType.renamed) catch return; self.handler.change(dir_path.?, EventType.renamed) catch return;
} else if (ev.fflags & NOTE_WRITE != 0) { } else if (ev.fflags & NOTE_WRITE != 0) {
scan_dir(dir_path.?, snapshots, snapshots_mutex, allocator, handler) catch {}; self.scan_dir(allocator, dir_path.?) catch {};
} }
} }
} }
} }
// Scan a directory and diff against the snapshot, emitting created/deleted events. // Scan a directory and diff against the snapshot, emitting created/deleted events.
fn scan_dir( fn scan_dir(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void {
dir_path: []const u8,
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
handler: *Handler,
) !void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close(); defer dir.close();
@ -683,17 +685,17 @@ const KQueueBackend = struct {
var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty; var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty;
defer new_dirs.deinit(allocator); defer new_dirs.deinit(allocator);
snapshots_mutex.lock(); self.snapshots_mutex.lock();
{ {
for (current_dirs.items) |name| { for (current_dirs.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined; var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
if (!snapshots.contains(full_path)) if (!self.snapshots.contains(full_path))
try new_dirs.append(allocator, full_path); try new_dirs.append(allocator, full_path);
} }
const gop = snapshots.getOrPut(allocator, dir_path) catch |e| { const gop = self.snapshots.getOrPut(allocator, dir_path) catch |e| {
snapshots_mutex.unlock(); self.snapshots_mutex.unlock();
return e; return e;
}; };
if (!gop.found_existing) gop.value_ptr.* = .empty; if (!gop.found_existing) gop.value_ptr.* = .empty;
@ -703,12 +705,12 @@ const KQueueBackend = struct {
while (cit.next()) |entry| { while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue; if (snapshot.contains(entry.key_ptr.*)) continue;
const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| { const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| {
snapshots_mutex.unlock(); self.snapshots_mutex.unlock();
return e; return e;
}; };
snapshot.put(allocator, owned, {}) catch |e| { snapshot.put(allocator, owned, {}) catch |e| {
allocator.free(owned); allocator.free(owned);
snapshots_mutex.unlock(); self.snapshots_mutex.unlock();
return e; return e;
}; };
try to_create.append(allocator, owned); try to_create.append(allocator, owned);
@ -721,21 +723,70 @@ const KQueueBackend = struct {
} }
for (to_delete.items) |name| _ = snapshot.fetchRemove(name); for (to_delete.items) |name| _ = snapshot.fetchRemove(name);
} }
snapshots_mutex.unlock(); self.snapshots_mutex.unlock();
// Emit all events outside the lock so handlers may safely call watch()/unwatch(). // Emit all events outside the lock so handlers may safely call watch()/unwatch().
// Emit dir_created, then deletions, then creations. Deletions first ensures that
// a rename (old disappears, new appears) reports the source path before the dest.
for (new_dirs.items) |full_path| for (new_dirs.items) |full_path|
try handler.change(full_path, EventType.dir_created); try self.handler.change(full_path, EventType.dir_created);
for (to_delete.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch {
allocator.free(name);
continue;
};
self.deregister_file_watch(allocator, full_path);
try self.handler.change(full_path, EventType.deleted);
allocator.free(name);
}
for (to_create.items) |name| { for (to_create.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined; var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try handler.change(full_path, EventType.created); self.register_file_watch(allocator, full_path);
try self.handler.change(full_path, EventType.created);
} }
for (to_delete.items) |name| { }
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; fn register_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
try handler.change(full_path, EventType.deleted); self.file_watches_mutex.lock();
allocator.free(name); const already = self.file_watches.contains(path);
self.file_watches_mutex.unlock();
if (already) return;
const fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return;
const kev = std.posix.Kevent{
.ident = @intCast(fd),
.filter = EVFILT_VNODE,
.flags = EV_ADD | EV_ENABLE | EV_CLEAR,
.fflags = NOTE_WRITE | NOTE_EXTEND,
.data = 0,
.udata = 0,
};
_ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch {
std.posix.close(fd);
return;
};
const owned = allocator.dupe(u8, path) catch {
std.posix.close(fd);
return;
};
self.file_watches_mutex.lock();
self.file_watches.put(allocator, owned, fd) catch {
self.file_watches_mutex.unlock();
std.posix.close(fd);
allocator.free(owned);
return;
};
self.file_watches_mutex.unlock();
}
fn deregister_file_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
self.file_watches_mutex.lock();
const kv = self.file_watches.fetchRemove(path);
self.file_watches_mutex.unlock();
if (kv) |entry| {
std.posix.close(entry.value);
allocator.free(entry.key);
} }
} }
@ -820,18 +871,33 @@ const KQueueBackend = struct {
fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void { fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close(); defer dir.close();
self.snapshots_mutex.lock(); // Collect file names first so we can register file watches without holding the lock.
defer self.snapshots_mutex.unlock(); var names: std.ArrayListUnmanaged([]u8) = .empty;
const gop = try self.snapshots.getOrPut(allocator, dir_path); defer {
if (!gop.found_existing) gop.value_ptr.* = .empty; for (names.items) |n| allocator.free(n);
var snapshot = gop.value_ptr; names.deinit(allocator);
}
var iter = dir.iterate(); var iter = dir.iterate();
while (try iter.next()) |entry| { while (try iter.next()) |entry| {
if (entry.kind != .file) continue; if (entry.kind != .file) continue;
if (snapshot.contains(entry.name)) continue; try names.append(allocator, try allocator.dupe(u8, entry.name));
const owned = try allocator.dupe(u8, entry.name); }
self.snapshots_mutex.lock();
const gop = try self.snapshots.getOrPut(allocator, dir_path);
if (!gop.found_existing) gop.value_ptr.* = .empty;
var snapshot = gop.value_ptr;
for (names.items) |name| {
if (snapshot.contains(name)) continue;
const owned = try allocator.dupe(u8, name);
try snapshot.put(allocator, owned, {}); try snapshot.put(allocator, owned, {});
} }
self.snapshots_mutex.unlock();
// Register a kqueue watch for each existing file so writes are detected.
for (names.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
self.register_file_watch(allocator, full_path);
}
} }
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
@ -845,7 +911,15 @@ const KQueueBackend = struct {
if (self.snapshots.fetchRemove(path)) |entry| { if (self.snapshots.fetchRemove(path)) |entry| {
var names = entry.value; var names = entry.value;
var it = names.iterator(); var it = names.iterator();
while (it.next()) |ne| allocator.free(ne.key_ptr.*); while (it.next()) |ne| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ path, ne.key_ptr.* }) catch {
allocator.free(ne.key_ptr.*);
continue;
};
self.deregister_file_watch(allocator, full_path);
allocator.free(ne.key_ptr.*);
}
names.deinit(allocator); names.deinit(allocator);
} }
} }