fix(kqueue): emit scan_dir events outside the snapshot lock

Handler callbacks invoked while holding snapshots_mutex could deadlock if
the handler called watch() or unwatch(), which also acquires that lock.
Refactor scan_dir to collect all pending events (dir_created, created,
deleted) into temporary lists under the lock, then emit them after
releasing it. Also consolidate the two directory iteration passes into one.
This commit is contained in:
CJ van den Berg 2026-03-07 18:19:00 +01:00
parent 8dc759db61
commit f3463dd0dc
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9
2 changed files with 74 additions and 46 deletions

View file

@ -574,65 +574,93 @@ const KQueueBackend = struct {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
// Collect current filenames (no lock needed, reading filesystem only).
var current: std.StringHashMapUnmanaged(void) = .empty;
// Collect current files and subdirectories (no lock, reading filesystem only).
var current_files: std.StringHashMapUnmanaged(void) = .empty;
defer {
var it = current.iterator();
var it = current_files.iterator();
while (it.next()) |e| allocator.free(e.key_ptr.*);
current.deinit(allocator);
current_files.deinit(allocator);
}
var current_dirs: std.ArrayListUnmanaged([]u8) = .empty;
defer {
for (current_dirs.items) |d| allocator.free(d);
current_dirs.deinit(allocator);
}
var iter = dir.iterate();
while (try iter.next()) |entry| {
if (entry.kind != .file) continue;
const name = try allocator.dupe(u8, entry.name);
try current.put(allocator, name, {});
switch (entry.kind) {
.file => {
const name = try allocator.dupe(u8, entry.name);
try current_files.put(allocator, name, {});
},
.directory => {
const name = try allocator.dupe(u8, entry.name);
try current_dirs.append(allocator, name);
},
else => {},
}
}
// Emit dir_created for new subdirectories outside the lock (no snapshot involvement).
var dir2 = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir2.close();
var dir_iter = dir2.iterate();
while (try dir_iter.next()) |entry| {
if (entry.kind != .directory) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue;
// Only emit if not already watched.
if (!snapshots.contains(full_path))
try handler.change(full_path, EventType.dir_created);
}
snapshots_mutex.lock();
defer snapshots_mutex.unlock();
// Get or create the snapshot for this directory.
const gop = try snapshots.getOrPut(allocator, dir_path);
if (!gop.found_existing) gop.value_ptr.* = .empty;
const snapshot = gop.value_ptr;
// Emit created events for files in current but not in snapshot.
var cit = current.iterator();
while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue;
try handler.change(full_path, EventType.created);
const owned = try allocator.dupe(u8, entry.key_ptr.*);
try snapshot.put(allocator, owned, {});
}
// Emit deleted events for files in snapshot but not in current.
// Diff against snapshot under the lock; collect events to emit after releasing it.
var to_create: std.ArrayListUnmanaged([]const u8) = .empty;
defer to_create.deinit(allocator);
var to_delete: std.ArrayListUnmanaged([]const u8) = .empty;
defer to_delete.deinit(allocator);
var sit = snapshot.iterator();
while (sit.next()) |entry| {
if (current.contains(entry.key_ptr.*)) continue;
try to_delete.append(allocator, entry.key_ptr.*);
var new_dirs: std.ArrayListUnmanaged([]const u8) = .empty;
defer new_dirs.deinit(allocator);
snapshots_mutex.lock();
{
for (current_dirs.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
if (!snapshots.contains(full_path))
try new_dirs.append(allocator, full_path);
}
const gop = snapshots.getOrPut(allocator, dir_path) catch |e| {
snapshots_mutex.unlock();
return e;
};
if (!gop.found_existing) gop.value_ptr.* = .empty;
const snapshot = gop.value_ptr;
var cit = current_files.iterator();
while (cit.next()) |entry| {
if (snapshot.contains(entry.key_ptr.*)) continue;
const owned = allocator.dupe(u8, entry.key_ptr.*) catch |e| {
snapshots_mutex.unlock();
return e;
};
snapshot.put(allocator, owned, {}) catch |e| {
allocator.free(owned);
snapshots_mutex.unlock();
return e;
};
try to_create.append(allocator, owned);
}
var sit = snapshot.iterator();
while (sit.next()) |entry| {
if (current_files.contains(entry.key_ptr.*)) continue;
try to_delete.append(allocator, entry.key_ptr.*);
}
for (to_delete.items) |name| _ = snapshot.fetchRemove(name);
}
snapshots_mutex.unlock();
// Emit all events outside the lock so handlers may safely call watch()/unwatch().
for (new_dirs.items) |full_path|
try handler.change(full_path, EventType.dir_created);
for (to_create.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try handler.change(full_path, EventType.created);
}
for (to_delete.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try handler.change(full_path, EventType.deleted);
_ = snapshot.fetchRemove(name);
allocator.free(name);
}
}

View file

@ -148,7 +148,7 @@ fn makeTempDir(allocator: std.mem.Allocator) ![]u8 {
const name = try std.fmt.allocPrint(
allocator,
"/tmp/nightwatch_test_{d}_{d}",
.{ std.c.getpid(), n },
.{ if (builtin.os.tag == .linux) std.os.linux.getpid() else std.c.getpid(), n },
);
errdefer allocator.free(name);
try std.fs.makeDirAbsolute(name);