refactor: add directory snapshotting and diffing to kqueue backend

This commit is contained in:
CJ van den Berg 2026-02-20 20:06:12 +01:00
parent a6c6aeeb4b
commit b6d3780283
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -101,7 +101,7 @@ const INotifyBackend = struct {
std.posix.close(self.inotify_fd); std.posix.close(self.inotify_fd);
} }
fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
parent.deinit(); parent.deinit();
try self.fd_watcher.wait_read(); try self.fd_watcher.wait_read();
} }
@ -414,6 +414,11 @@ const KQueueBackend = struct {
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread, thread: ?std.Thread,
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
// Per-directory snapshots of filenames, used to diff on NOTE_WRITE.
// Key: owned dir path (same as watches key), value: set of owned filenames.
// Accessed from both the main thread (add_watch) and the background thread (scan_dir).
snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: std.Thread.Mutex,
const EVFILT_VNODE: i16 = -4; const EVFILT_VNODE: i16 = -4;
const EVFILT_READ: i16 = -1; const EVFILT_READ: i16 = -1;
@ -446,7 +451,7 @@ const KQueueBackend = struct {
.udata = 0, .udata = 0,
}; };
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null); _ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null);
return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty }; return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} };
} }
fn deinit(self: *@This(), allocator: std.mem.Allocator) void { fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
@ -461,16 +466,31 @@ const KQueueBackend = struct {
allocator.free(entry.key_ptr.*); allocator.free(entry.key_ptr.*);
} }
self.watches.deinit(allocator); self.watches.deinit(allocator);
var sit = self.snapshots.iterator();
while (sit.next()) |entry| {
var names = entry.value_ptr.*;
var nit = names.iterator();
while (nit.next()) |ne| allocator.free(ne.key_ptr.*);
names.deinit(allocator);
}
self.snapshots.deinit(allocator);
std.posix.close(self.kq); std.posix.close(self.kq);
} }
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
errdefer parent.deinit(); errdefer parent.deinit();
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, parent }); self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent });
} }
fn thread_fn(kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), parent: tp.pid) void { fn thread_fn(
kq: std.posix.fd_t,
watches: *const std.StringHashMapUnmanaged(std.posix.fd_t),
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
parent: tp.pid,
) void {
defer parent.deinit(); defer parent.deinit();
var events: [64]std.posix.Kevent = undefined; var events: [64]std.posix.Kevent = undefined;
while (true) { while (true) {
@ -479,25 +499,86 @@ const KQueueBackend = struct {
for (events[0..n]) |ev| { for (events[0..n]) |ev| {
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
if (ev.filter != EVFILT_VNODE) continue; if (ev.filter != EVFILT_VNODE) continue;
var it = watches.iterator(); // Find the directory path for this fd.
while (it.next()) |entry| { var wit = watches.iterator();
while (wit.next()) |entry| {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) const dir_path = entry.key_ptr.*;
.deleted if (ev.fflags & NOTE_DELETE != 0) {
else if (ev.fflags & NOTE_RENAME != 0) parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return;
.renamed } else if (ev.fflags & NOTE_RENAME != 0) {
else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return;
.modified } else if (ev.fflags & NOTE_WRITE != 0) {
else scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {};
break; }
parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }) catch return;
break; break;
} }
} }
} }
} }
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) (std.posix.OpenError || std.posix.KEventError || error{OutOfMemory})!void { // Scan a directory and diff against the snapshot, emitting created/deleted events.
fn scan_dir(
dir_path: []const u8,
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
parent: tp.pid,
) !void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
// Collect current filenames (no lock needed, reading filesystem only).
var current: std.StringHashMapUnmanaged(void) = .empty;
defer {
var it = current.iterator();
while (it.next()) |e| allocator.free(e.key_ptr.*);
current.deinit(allocator);
}
var iter = dir.iterate();
while (try iter.next()) |entry| {
if (entry.kind != .file) continue;
const name = try allocator.dupe(u8, entry.name);
try current.put(allocator, name, {});
}
snapshots_mutex.lock();
defer snapshots_mutex.unlock();
// Get or create the snapshot for this directory.
const gop = try snapshots.getOrPut(allocator, dir_path);
if (!gop.found_existing) gop.value_ptr.* = .empty;
const snapshot = gop.value_ptr;
// Emit created events for files in current but not in snapshot.
var cit = current.iterator();
while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue;
try parent.send(.{ "FW", "change", full_path, EventType.created });
const owned = try allocator.dupe(u8, entry.key_ptr.*);
try snapshot.put(allocator, owned, {});
}
// Emit deleted events for files in snapshot but not in current.
var to_delete: std.ArrayListUnmanaged([]const u8) = .empty;
defer to_delete.deinit(allocator);
var sit = snapshot.iterator();
while (sit.next()) |entry| {
if (current.contains(entry.key_ptr.*)) continue;
try to_delete.append(allocator, entry.key_ptr.*);
}
for (to_delete.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try parent.send(.{ "FW", "change", full_path, EventType.deleted });
_ = snapshot.fetchRemove(name);
allocator.free(name);
}
}
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
if (self.watches.contains(path)) return; if (self.watches.contains(path)) return;
const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0); const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0);
errdefer std.posix.close(path_fd); errdefer std.posix.close(path_fd);
@ -513,6 +594,25 @@ const KQueueBackend = struct {
const owned_path = try allocator.dupe(u8, path); const owned_path = try allocator.dupe(u8, path);
errdefer allocator.free(owned_path); errdefer allocator.free(owned_path);
try self.watches.put(allocator, owned_path, path_fd); try self.watches.put(allocator, owned_path, path_fd);
// Take initial snapshot so first NOTE_WRITE has a baseline to diff against.
try self.take_snapshot(allocator, owned_path);
}
fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
self.snapshots_mutex.lock();
defer self.snapshots_mutex.unlock();
const gop = try self.snapshots.getOrPut(allocator, dir_path);
if (!gop.found_existing) gop.value_ptr.* = .empty;
var snapshot = gop.value_ptr;
var iter = dir.iterate();
while (try iter.next()) |entry| {
if (entry.kind != .file) continue;
if (snapshot.contains(entry.name)) continue;
const owned = try allocator.dupe(u8, entry.name);
try snapshot.put(allocator, owned, {});
}
} }
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
@ -520,6 +620,12 @@ const KQueueBackend = struct {
std.posix.close(entry.value); std.posix.close(entry.value);
allocator.free(entry.key); allocator.free(entry.key);
} }
if (self.snapshots.fetchRemove(path)) |entry| {
var names = entry.value;
var it = names.iterator();
while (it.next()) |ne| allocator.free(ne.key_ptr.*);
names.deinit(allocator);
}
} }
}; };
@ -618,7 +724,7 @@ const WindowsBackend = struct {
_ = win32.CloseHandle(self.iocp); _ = win32.CloseHandle(self.iocp);
} }
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
errdefer parent.deinit(); errdefer parent.deinit();
if (self.thread != null) return error.AlreadyArmed; if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent }); self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
@ -763,7 +869,7 @@ const Process = struct {
errdefer self.deinit(); errdefer self.deinit();
_ = tp.set_trap(true); _ = tp.set_trap(true);
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
self.backend.arm(self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
tp.receive(&self.receiver); tp.receive(&self.receiver);
} }