refactor: add directory snapshotting and diffing to kqueue watcher backend
This commit is contained in:
parent
6a6d9b07d2
commit
a0eece9f49
1 changed files with 125 additions and 19 deletions
|
|
@ -101,7 +101,7 @@ const INotifyBackend = struct {
|
||||||
std.posix.close(self.inotify_fd);
|
std.posix.close(self.inotify_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
|
fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
|
||||||
parent.deinit();
|
parent.deinit();
|
||||||
try self.fd_watcher.wait_read();
|
try self.fd_watcher.wait_read();
|
||||||
}
|
}
|
||||||
|
|
@ -414,6 +414,11 @@ const KQueueBackend = struct {
|
||||||
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
|
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
|
||||||
thread: ?std.Thread,
|
thread: ?std.Thread,
|
||||||
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
|
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
|
||||||
|
// Per-directory snapshots of filenames, used to diff on NOTE_WRITE.
|
||||||
|
// Key: owned dir path (same as watches key), value: set of owned filenames.
|
||||||
|
// Accessed from both the main thread (add_watch) and the background thread (scan_dir).
|
||||||
|
snapshots: std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
|
||||||
|
snapshots_mutex: std.Thread.Mutex,
|
||||||
|
|
||||||
const EVFILT_VNODE: i16 = -4;
|
const EVFILT_VNODE: i16 = -4;
|
||||||
const EVFILT_READ: i16 = -1;
|
const EVFILT_READ: i16 = -1;
|
||||||
|
|
@ -446,7 +451,7 @@ const KQueueBackend = struct {
|
||||||
.udata = 0,
|
.udata = 0,
|
||||||
};
|
};
|
||||||
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null);
|
_ = try std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null);
|
||||||
return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty };
|
return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty, .snapshots = .empty, .snapshots_mutex = .{} };
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
||||||
|
|
@ -461,16 +466,31 @@ const KQueueBackend = struct {
|
||||||
allocator.free(entry.key_ptr.*);
|
allocator.free(entry.key_ptr.*);
|
||||||
}
|
}
|
||||||
self.watches.deinit(allocator);
|
self.watches.deinit(allocator);
|
||||||
|
var sit = self.snapshots.iterator();
|
||||||
|
while (sit.next()) |entry| {
|
||||||
|
var names = entry.value_ptr.*;
|
||||||
|
var nit = names.iterator();
|
||||||
|
while (nit.next()) |ne| allocator.free(ne.key_ptr.*);
|
||||||
|
names.deinit(allocator);
|
||||||
|
}
|
||||||
|
self.snapshots.deinit(allocator);
|
||||||
std.posix.close(self.kq);
|
std.posix.close(self.kq);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
||||||
errdefer parent.deinit();
|
errdefer parent.deinit();
|
||||||
if (self.thread != null) return error.AlreadyArmed;
|
if (self.thread != null) return error.AlreadyArmed;
|
||||||
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, parent });
|
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent });
|
||||||
}
|
}
|
||||||
|
|
||||||
fn thread_fn(kq: std.posix.fd_t, watches: *const std.StringHashMapUnmanaged(std.posix.fd_t), parent: tp.pid) void {
|
fn thread_fn(
|
||||||
|
kq: std.posix.fd_t,
|
||||||
|
watches: *const std.StringHashMapUnmanaged(std.posix.fd_t),
|
||||||
|
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
|
||||||
|
snapshots_mutex: *std.Thread.Mutex,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
parent: tp.pid,
|
||||||
|
) void {
|
||||||
defer parent.deinit();
|
defer parent.deinit();
|
||||||
var events: [64]std.posix.Kevent = undefined;
|
var events: [64]std.posix.Kevent = undefined;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
@ -479,25 +499,86 @@ const KQueueBackend = struct {
|
||||||
for (events[0..n]) |ev| {
|
for (events[0..n]) |ev| {
|
||||||
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
|
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
|
||||||
if (ev.filter != EVFILT_VNODE) continue;
|
if (ev.filter != EVFILT_VNODE) continue;
|
||||||
var it = watches.iterator();
|
// Find the directory path for this fd.
|
||||||
while (it.next()) |entry| {
|
var wit = watches.iterator();
|
||||||
|
while (wit.next()) |entry| {
|
||||||
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
|
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
|
||||||
const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0)
|
const dir_path = entry.key_ptr.*;
|
||||||
.deleted
|
if (ev.fflags & NOTE_DELETE != 0) {
|
||||||
else if (ev.fflags & NOTE_RENAME != 0)
|
parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return;
|
||||||
.renamed
|
} else if (ev.fflags & NOTE_RENAME != 0) {
|
||||||
else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0)
|
parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return;
|
||||||
.modified
|
} else if (ev.fflags & NOTE_WRITE != 0) {
|
||||||
else
|
scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {};
|
||||||
break;
|
}
|
||||||
parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }) catch return;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) (std.posix.OpenError || std.posix.KEventError || error{OutOfMemory})!void {
|
// Scan a directory and diff against the snapshot, emitting created/deleted events.
|
||||||
|
fn scan_dir(
|
||||||
|
dir_path: []const u8,
|
||||||
|
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
|
||||||
|
snapshots_mutex: *std.Thread.Mutex,
|
||||||
|
allocator: std.mem.Allocator,
|
||||||
|
parent: tp.pid,
|
||||||
|
) !void {
|
||||||
|
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
|
||||||
|
defer dir.close();
|
||||||
|
|
||||||
|
// Collect current filenames (no lock needed, reading filesystem only).
|
||||||
|
var current: std.StringHashMapUnmanaged(void) = .empty;
|
||||||
|
defer {
|
||||||
|
var it = current.iterator();
|
||||||
|
while (it.next()) |e| allocator.free(e.key_ptr.*);
|
||||||
|
current.deinit(allocator);
|
||||||
|
}
|
||||||
|
var iter = dir.iterate();
|
||||||
|
while (try iter.next()) |entry| {
|
||||||
|
if (entry.kind != .file) continue;
|
||||||
|
const name = try allocator.dupe(u8, entry.name);
|
||||||
|
try current.put(allocator, name, {});
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshots_mutex.lock();
|
||||||
|
defer snapshots_mutex.unlock();
|
||||||
|
|
||||||
|
// Get or create the snapshot for this directory.
|
||||||
|
const gop = try snapshots.getOrPut(allocator, dir_path);
|
||||||
|
if (!gop.found_existing) gop.value_ptr.* = .empty;
|
||||||
|
const snapshot = gop.value_ptr;
|
||||||
|
|
||||||
|
// Emit created events for files in current but not in snapshot.
|
||||||
|
var cit = current.iterator();
|
||||||
|
while (cit.next()) |entry| {
|
||||||
|
if (snapshot.contains(entry.key_ptr.*)) continue;
|
||||||
|
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
|
||||||
|
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue;
|
||||||
|
try parent.send(.{ "FW", "change", full_path, EventType.created });
|
||||||
|
const owned = try allocator.dupe(u8, entry.key_ptr.*);
|
||||||
|
try snapshot.put(allocator, owned, {});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit deleted events for files in snapshot but not in current.
|
||||||
|
var to_delete: std.ArrayListUnmanaged([]const u8) = .empty;
|
||||||
|
defer to_delete.deinit(allocator);
|
||||||
|
var sit = snapshot.iterator();
|
||||||
|
while (sit.next()) |entry| {
|
||||||
|
if (current.contains(entry.key_ptr.*)) continue;
|
||||||
|
try to_delete.append(allocator, entry.key_ptr.*);
|
||||||
|
}
|
||||||
|
for (to_delete.items) |name| {
|
||||||
|
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
|
||||||
|
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
|
||||||
|
try parent.send(.{ "FW", "change", full_path, EventType.deleted });
|
||||||
|
_ = snapshot.fetchRemove(name);
|
||||||
|
allocator.free(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
|
||||||
if (self.watches.contains(path)) return;
|
if (self.watches.contains(path)) return;
|
||||||
const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0);
|
const path_fd = try std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0);
|
||||||
errdefer std.posix.close(path_fd);
|
errdefer std.posix.close(path_fd);
|
||||||
|
|
@ -513,6 +594,25 @@ const KQueueBackend = struct {
|
||||||
const owned_path = try allocator.dupe(u8, path);
|
const owned_path = try allocator.dupe(u8, path);
|
||||||
errdefer allocator.free(owned_path);
|
errdefer allocator.free(owned_path);
|
||||||
try self.watches.put(allocator, owned_path, path_fd);
|
try self.watches.put(allocator, owned_path, path_fd);
|
||||||
|
// Take initial snapshot so first NOTE_WRITE has a baseline to diff against.
|
||||||
|
try self.take_snapshot(allocator, owned_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn take_snapshot(self: *@This(), allocator: std.mem.Allocator, dir_path: []const u8) !void {
|
||||||
|
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
|
||||||
|
defer dir.close();
|
||||||
|
self.snapshots_mutex.lock();
|
||||||
|
defer self.snapshots_mutex.unlock();
|
||||||
|
const gop = try self.snapshots.getOrPut(allocator, dir_path);
|
||||||
|
if (!gop.found_existing) gop.value_ptr.* = .empty;
|
||||||
|
var snapshot = gop.value_ptr;
|
||||||
|
var iter = dir.iterate();
|
||||||
|
while (try iter.next()) |entry| {
|
||||||
|
if (entry.kind != .file) continue;
|
||||||
|
if (snapshot.contains(entry.name)) continue;
|
||||||
|
const owned = try allocator.dupe(u8, entry.name);
|
||||||
|
try snapshot.put(allocator, owned, {});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
|
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
|
||||||
|
|
@ -520,6 +620,12 @@ const KQueueBackend = struct {
|
||||||
std.posix.close(entry.value);
|
std.posix.close(entry.value);
|
||||||
allocator.free(entry.key);
|
allocator.free(entry.key);
|
||||||
}
|
}
|
||||||
|
if (self.snapshots.fetchRemove(path)) |entry| {
|
||||||
|
var names = entry.value;
|
||||||
|
var it = names.iterator();
|
||||||
|
while (it.next()) |ne| allocator.free(ne.key_ptr.*);
|
||||||
|
names.deinit(allocator);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -618,7 +724,7 @@ const WindowsBackend = struct {
|
||||||
_ = win32.CloseHandle(self.iocp);
|
_ = win32.CloseHandle(self.iocp);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
||||||
errdefer parent.deinit();
|
errdefer parent.deinit();
|
||||||
if (self.thread != null) return error.AlreadyArmed;
|
if (self.thread != null) return error.AlreadyArmed;
|
||||||
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
|
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
|
||||||
|
|
@ -763,7 +869,7 @@ const Process = struct {
|
||||||
errdefer self.deinit();
|
errdefer self.deinit();
|
||||||
_ = tp.set_trap(true);
|
_ = tp.set_trap(true);
|
||||||
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
|
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
|
||||||
self.backend.arm(self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
|
self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
|
||||||
tp.receive(&self.receiver);
|
tp.receive(&self.receiver);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue