refactor: re-write watcher FSEvents backend to use FSEventStreamSetDispatchQueue

This commit is contained in:
CJ van den Berg 2026-02-20 20:07:31 +01:00
parent a0eece9f49
commit 6bcbc70e6b
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -220,11 +220,12 @@ const INotifyBackend = struct {
};
const FSEventsBackend = struct {
thread: ?std.Thread,
run_loop: ?*anyopaque, // CFRunLoopRef, set by the background thread before blocking
stream: ?*anyopaque, // FSEventStreamRef, created on the background thread
stream: ?*anyopaque, // FSEventStreamRef
queue: ?*anyopaque, // dispatch_queue_t
ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped
watches: std.StringArrayHashMapUnmanaged(void), // owned paths
mutex: std.Thread.Mutex, // protects run_loop
const threaded = false; // callback fires on GCD thread; no FW_event needed
const kFSEventStreamCreateFlagNoDefer: u32 = 0x00000002;
const kFSEventStreamCreateFlagFileEvents: u32 = 0x00000010;
@ -233,14 +234,14 @@ const FSEventsBackend = struct {
const kFSEventStreamEventFlagItemRenamed: u32 = 0x00000800;
const kFSEventStreamEventFlagItemModified: u32 = 0x00001000;
const kFSEventStreamEventIdSinceNow: u64 = 0xFFFFFFFFFFFFFFFF;
const kCFStringEncodingUTF8: u32 = 0x08000100;
// CoreFoundation / CoreServices extern declarations
const cf = struct {
pub extern "c" fn CFStringCreateWithBytesNoCopy(
alloc: ?*anyopaque,
bytes: [*]const u8,
numBytes: isize,
encoding: u32, // kCFStringEncodingUTF8 = 0x08000100
encoding: u32,
isExternalRepresentation: u8,
contentsDeallocator: ?*anyopaque,
) ?*anyopaque;
@ -251,10 +252,6 @@ const FSEventsBackend = struct {
callBacks: ?*anyopaque,
) ?*anyopaque;
pub extern "c" fn CFRelease(cf: *anyopaque) void;
pub extern "c" fn CFRunLoopGetCurrent() *anyopaque;
pub extern "c" fn CFRunLoopRun() void;
pub extern "c" fn CFRunLoopStop(rl: *anyopaque) void;
pub extern "c" fn CFRunLoopAddSource(rl: *anyopaque, source: *anyopaque, mode: *anyopaque) void;
pub extern "c" fn FSEventStreamCreate(
allocator: ?*anyopaque,
callback: *const anyopaque,
@ -264,61 +261,100 @@ const FSEventsBackend = struct {
latency: f64,
flags: u32,
) ?*anyopaque;
pub extern "c" fn FSEventStreamSchedule(stream: *anyopaque, runLoop: *anyopaque, runLoopMode: *anyopaque) void;
pub extern "c" fn FSEventStreamSetDispatchQueue(stream: *anyopaque, queue: *anyopaque) void;
pub extern "c" fn FSEventStreamStart(stream: *anyopaque) u8;
pub extern "c" fn FSEventStreamStop(stream: *anyopaque) void;
pub extern "c" fn FSEventStreamInvalidate(stream: *anyopaque) void;
pub extern "c" fn FSEventStreamRelease(stream: *anyopaque) void;
// kCFRunLoopDefaultMode, a well-known constant pointer exported by CoreFoundation
pub extern "c" var kCFRunLoopDefaultMode: *anyopaque;
pub extern "c" var kCFAllocatorDefault: *anyopaque;
pub extern "c" fn dispatch_queue_create(label: [*:0]const u8, attr: ?*anyopaque) *anyopaque;
pub extern "c" fn dispatch_release(obj: *anyopaque) void;
pub extern "c" var kCFAllocatorNull: *anyopaque;
};
const kCFStringEncodingUTF8: u32 = 0x08000100;
// Context passed to the FSEvents callback via the thread's stack.
const CallbackContext = struct {
parent: tp.pid,
};
fn init() error{}!@This() {
return .{
.thread = null,
.run_loop = null,
.stream = null,
.watches = .empty,
.mutex = .{},
};
return .{ .stream = null, .queue = null, .ctx = null, .watches = .empty };
}
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
// Stop the run loop, which causes the thread to exit.
self.mutex.lock();
const rl = self.run_loop;
self.mutex.unlock();
if (rl) |r| cf.CFRunLoopStop(r);
if (self.thread) |t| t.join();
// Stream is cleaned up by the thread before it exits.
if (self.stream) |s| {
cf.FSEventStreamStop(s);
cf.FSEventStreamInvalidate(s);
cf.FSEventStreamRelease(s);
self.stream = null;
}
if (self.queue) |q| {
cf.dispatch_release(q);
self.queue = null;
}
if (self.ctx) |c| {
c.parent.deinit();
allocator.destroy(c);
self.ctx = null;
}
var it = self.watches.iterator();
while (it.next()) |entry| allocator.free(entry.key_ptr.*);
self.watches.deinit(allocator);
}
fn arm(self: *@This(), parent: tp.pid) std.Thread.SpawnError!void {
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) error{OutOfMemory}!void {
errdefer parent.deinit();
if (self.thread != null) return;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, parent });
}
if (self.stream != null) return;
const FSEventStreamCallback = *const fn (
stream: *anyopaque,
info: ?*anyopaque,
numEvents: usize,
eventPaths: *anyopaque,
eventFlags: [*]const u32,
eventIds: [*]const u64,
) callconv(.c) void;
var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty;
defer cf_strings.deinit(allocator);
var it = self.watches.iterator();
while (it.next()) |entry| {
const path = entry.key_ptr.*;
const s = cf.CFStringCreateWithBytesNoCopy(
null,
path.ptr,
@intCast(path.len),
kCFStringEncodingUTF8,
0,
cf.kCFAllocatorNull,
) orelse continue;
cf_strings.append(allocator, s) catch {
cf.CFRelease(s);
break;
};
}
defer for (cf_strings.items) |s| cf.CFRelease(s.?);
const paths_array = cf.CFArrayCreate(
null,
cf_strings.items.ptr,
@intCast(cf_strings.items.len),
null,
) orelse return;
defer cf.CFRelease(paths_array);
const ctx = try allocator.create(CallbackContext);
errdefer allocator.destroy(ctx);
ctx.* = .{ .parent = parent };
const stream = cf.FSEventStreamCreate(
null,
@ptrCast(&callback),
ctx,
paths_array,
kFSEventStreamEventIdSinceNow,
0.1,
kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents,
) orelse return;
errdefer cf.FSEventStreamRelease(stream);
const queue = cf.dispatch_queue_create("flow.file_watcher", null);
cf.FSEventStreamSetDispatchQueue(stream, queue);
_ = cf.FSEventStreamStart(stream);
self.stream = stream;
self.queue = queue;
self.ctx = ctx;
}
fn callback(
_: *anyopaque,
@ -347,61 +383,13 @@ const FSEventsBackend = struct {
}
}
fn thread_fn(self: *@This(), parent: tp.pid) void {
var ctx = CallbackContext{ .parent = parent };
defer ctx.parent.deinit();
// Build the CFArray of paths to watch.
var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty;
defer cf_strings.deinit(std.heap.c_allocator);
var it = self.watches.iterator();
while (it.next()) |entry| {
const path = entry.key_ptr.*;
const s = cf.CFStringCreateWithBytesNoCopy(null, path.ptr, @intCast(path.len), kCFStringEncodingUTF8, 0, cf.kCFAllocatorNull) orelse continue;
cf_strings.append(std.heap.c_allocator, s) catch {
cf.CFRelease(s);
break;
};
}
defer for (cf_strings.items) |s| cf.CFRelease(s.?);
const paths_array = cf.CFArrayCreate(null, cf_strings.items.ptr, @intCast(cf_strings.items.len), null) orelse return;
defer cf.CFRelease(paths_array);
const stream = cf.FSEventStreamCreate(
null,
@ptrCast(&callback),
&ctx,
paths_array,
kFSEventStreamEventIdSinceNow,
0.1, // 100ms latency
kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents,
) orelse return;
defer {
cf.FSEventStreamStop(stream);
cf.FSEventStreamInvalidate(stream);
cf.FSEventStreamRelease(stream);
}
const rl = cf.CFRunLoopGetCurrent();
cf.FSEventStreamSchedule(stream, rl, cf.kCFRunLoopDefaultMode);
_ = cf.FSEventStreamStart(stream);
// Publish the run loop reference so deinit() can stop it.
self.mutex.lock();
self.run_loop = rl;
self.mutex.unlock();
cf.CFRunLoopRun(); // blocks until CFRunLoopStop is called
}
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void {
if (self.watches.contains(path)) return;
const owned = try allocator.dupe(u8, path);
errdefer allocator.free(owned);
try self.watches.put(allocator, owned, {});
// Note: watches added after arm() take effect on the next restart.
// In practice, all watches are added before the first open() call.
// Watches added after arm() take effect on the next restart.
// In practice all watches are added before arm() is called.
}
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {