diff --git a/src/file_watcher.zig b/src/file_watcher.zig index 373d53f..bf287b4 100644 --- a/src/file_watcher.zig +++ b/src/file_watcher.zig @@ -220,11 +220,12 @@ const INotifyBackend = struct { }; const FSEventsBackend = struct { - thread: ?std.Thread, - run_loop: ?*anyopaque, // CFRunLoopRef, set by the background thread before blocking - stream: ?*anyopaque, // FSEventStreamRef, created on the background thread + stream: ?*anyopaque, // FSEventStreamRef + queue: ?*anyopaque, // dispatch_queue_t + ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped watches: std.StringArrayHashMapUnmanaged(void), // owned paths - mutex: std.Thread.Mutex, // protects run_loop + + const threaded = false; // callback fires on GCD thread; no FW_event needed const kFSEventStreamCreateFlagNoDefer: u32 = 0x00000002; const kFSEventStreamCreateFlagFileEvents: u32 = 0x00000010; @@ -233,14 +234,14 @@ const FSEventsBackend = struct { const kFSEventStreamEventFlagItemRenamed: u32 = 0x00000800; const kFSEventStreamEventFlagItemModified: u32 = 0x00001000; const kFSEventStreamEventIdSinceNow: u64 = 0xFFFFFFFFFFFFFFFF; + const kCFStringEncodingUTF8: u32 = 0x08000100; - // CoreFoundation / CoreServices extern declarations const cf = struct { pub extern "c" fn CFStringCreateWithBytesNoCopy( alloc: ?*anyopaque, bytes: [*]const u8, numBytes: isize, - encoding: u32, // kCFStringEncodingUTF8 = 0x08000100 + encoding: u32, isExternalRepresentation: u8, contentsDeallocator: ?*anyopaque, ) ?*anyopaque; @@ -251,10 +252,6 @@ const FSEventsBackend = struct { callBacks: ?*anyopaque, ) ?*anyopaque; pub extern "c" fn CFRelease(cf: *anyopaque) void; - pub extern "c" fn CFRunLoopGetCurrent() *anyopaque; - pub extern "c" fn CFRunLoopRun() void; - pub extern "c" fn CFRunLoopStop(rl: *anyopaque) void; - pub extern "c" fn CFRunLoopAddSource(rl: *anyopaque, source: *anyopaque, mode: *anyopaque) void; pub extern "c" fn FSEventStreamCreate( allocator: ?*anyopaque, callback: *const anyopaque, @@ -264,61 +261,100 @@ const FSEventsBackend = struct { latency: f64, flags: u32, ) ?*anyopaque; - pub extern "c" fn FSEventStreamSchedule(stream: *anyopaque, runLoop: *anyopaque, runLoopMode: *anyopaque) void; + pub extern "c" fn FSEventStreamSetDispatchQueue(stream: *anyopaque, queue: *anyopaque) void; pub extern "c" fn FSEventStreamStart(stream: *anyopaque) u8; pub extern "c" fn FSEventStreamStop(stream: *anyopaque) void; pub extern "c" fn FSEventStreamInvalidate(stream: *anyopaque) void; pub extern "c" fn FSEventStreamRelease(stream: *anyopaque) void; - // kCFRunLoopDefaultMode, a well-known constant pointer exported by CoreFoundation - pub extern "c" var kCFRunLoopDefaultMode: *anyopaque; - pub extern "c" var kCFAllocatorDefault: *anyopaque; + pub extern "c" fn dispatch_queue_create(label: [*:0]const u8, attr: ?*anyopaque) *anyopaque; + pub extern "c" fn dispatch_release(obj: *anyopaque) void; pub extern "c" var kCFAllocatorNull: *anyopaque; }; - const kCFStringEncodingUTF8: u32 = 0x08000100; - - // Context passed to the FSEvents callback via the thread's stack. const CallbackContext = struct { parent: tp.pid, }; fn init() error{}!@This() { - return .{ - .thread = null, - .run_loop = null, - .stream = null, - .watches = .empty, - .mutex = .{}, - }; + return .{ .stream = null, .queue = null, .ctx = null, .watches = .empty }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - // Stop the run loop, which causes the thread to exit. - self.mutex.lock(); - const rl = self.run_loop; - self.mutex.unlock(); - if (rl) |r| cf.CFRunLoopStop(r); - if (self.thread) |t| t.join(); - // Stream is cleaned up by the thread before it exits. + if (self.stream) |s| { + cf.FSEventStreamStop(s); + cf.FSEventStreamInvalidate(s); + cf.FSEventStreamRelease(s); + self.stream = null; + } + if (self.queue) |q| { + cf.dispatch_release(q); + self.queue = null; + } + if (self.ctx) |c| { + c.parent.deinit(); + allocator.destroy(c); + self.ctx = null; + } var it = self.watches.iterator(); while (it.next()) |entry| allocator.free(entry.key_ptr.*); self.watches.deinit(allocator); } - fn arm(self: *@This(), parent: tp.pid) std.Thread.SpawnError!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) error{OutOfMemory}!void { errdefer parent.deinit(); - if (self.thread != null) return; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self, parent }); - } + if (self.stream != null) return; - const FSEventStreamCallback = *const fn ( - stream: *anyopaque, - info: ?*anyopaque, - numEvents: usize, - eventPaths: *anyopaque, - eventFlags: [*]const u32, - eventIds: [*]const u64, - ) callconv(.c) void; + var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; + defer cf_strings.deinit(allocator); + var it = self.watches.iterator(); + while (it.next()) |entry| { + const path = entry.key_ptr.*; + const s = cf.CFStringCreateWithBytesNoCopy( + null, + path.ptr, + @intCast(path.len), + kCFStringEncodingUTF8, + 0, + cf.kCFAllocatorNull, + ) orelse continue; + cf_strings.append(allocator, s) catch { + cf.CFRelease(s); + break; + }; + } + defer for (cf_strings.items) |s| cf.CFRelease(s.?); + + const paths_array = cf.CFArrayCreate( + null, + cf_strings.items.ptr, + @intCast(cf_strings.items.len), + null, + ) orelse return; + defer cf.CFRelease(paths_array); + + const ctx = try allocator.create(CallbackContext); + errdefer allocator.destroy(ctx); + ctx.* = .{ .parent = parent }; + + const stream = cf.FSEventStreamCreate( + null, + @ptrCast(&callback), + ctx, + paths_array, + kFSEventStreamEventIdSinceNow, + 0.1, + kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, + ) orelse return; + errdefer cf.FSEventStreamRelease(stream); + + const queue = cf.dispatch_queue_create("flow.file_watcher", null); + cf.FSEventStreamSetDispatchQueue(stream, queue); + _ = cf.FSEventStreamStart(stream); + + self.stream = stream; + self.queue = queue; + self.ctx = ctx; + } fn callback( _: *anyopaque, @@ -347,61 +383,13 @@ const FSEventsBackend = struct { } } - fn thread_fn(self: *@This(), parent: tp.pid) void { - var ctx = CallbackContext{ .parent = parent }; - defer ctx.parent.deinit(); - - // Build the CFArray of paths to watch. - var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; - defer cf_strings.deinit(std.heap.c_allocator); - var it = self.watches.iterator(); - while (it.next()) |entry| { - const path = entry.key_ptr.*; - const s = cf.CFStringCreateWithBytesNoCopy(null, path.ptr, @intCast(path.len), kCFStringEncodingUTF8, 0, cf.kCFAllocatorNull) orelse continue; - cf_strings.append(std.heap.c_allocator, s) catch { - cf.CFRelease(s); - break; - }; - } - defer for (cf_strings.items) |s| cf.CFRelease(s.?); - - const paths_array = cf.CFArrayCreate(null, cf_strings.items.ptr, @intCast(cf_strings.items.len), null) orelse return; - defer cf.CFRelease(paths_array); - - const stream = cf.FSEventStreamCreate( - null, - @ptrCast(&callback), - &ctx, - paths_array, - kFSEventStreamEventIdSinceNow, - 0.1, // 100ms latency - kFSEventStreamCreateFlagNoDefer | kFSEventStreamCreateFlagFileEvents, - ) orelse return; - defer { - cf.FSEventStreamStop(stream); - cf.FSEventStreamInvalidate(stream); - cf.FSEventStreamRelease(stream); - } - - const rl = cf.CFRunLoopGetCurrent(); - cf.FSEventStreamSchedule(stream, rl, cf.kCFRunLoopDefaultMode); - _ = cf.FSEventStreamStart(stream); - - // Publish the run loop reference so deinit() can stop it. - self.mutex.lock(); - self.run_loop = rl; - self.mutex.unlock(); - - cf.CFRunLoopRun(); // blocks until CFRunLoopStop is called - } - fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void { if (self.watches.contains(path)) return; const owned = try allocator.dupe(u8, path); errdefer allocator.free(owned); try self.watches.put(allocator, owned, {}); - // Note: watches added after arm() take effect on the next restart. - // In practice, all watches are added before the first open() call. + // Watches added after arm() take effect on the next restart. + // In practice all watches are added before arm() is called. } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {