diff --git a/src/file_watcher.zig b/src/file_watcher.zig index bf287b4..8681754 100644 --- a/src/file_watcher.zig +++ b/src/file_watcher.zig @@ -659,6 +659,7 @@ const WindowsBackend = struct { iocp: windows.HANDLE, thread: ?std.Thread, watches: std.StringHashMapUnmanaged(Watch), + watches_mutex: std.Thread.Mutex, // A completion key of zero is used to signal the background thread to exit. const SHUTDOWN_KEY: windows.ULONG_PTR = 0; @@ -695,7 +696,7 @@ const WindowsBackend = struct { fn init() windows.CreateIoCompletionPortError!@This() { const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1); - return .{ .iocp = iocp, .thread = null, .watches = .empty }; + return .{ .iocp = iocp, .thread = null, .watches = .empty, .watches_mutex = .{} }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { @@ -712,13 +713,19 @@ const WindowsBackend = struct { _ = win32.CloseHandle(self.iocp); } - fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + _ = allocator; errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, parent }); } - fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void { + fn thread_fn( + iocp: windows.HANDLE, + watches: *std.StringHashMapUnmanaged(Watch), + watches_mutex: *std.Thread.Mutex, + parent: tp.pid, + ) void { defer parent.deinit(); var bytes: windows.DWORD = 0; var key: windows.ULONG_PTR = 0; @@ -727,7 +734,52 @@ const WindowsBackend = struct { // Block indefinitely until IOCP has a completion or shutdown signal. const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE); if (ok == 0 or key == SHUTDOWN_KEY) return; - parent.send(.{"FW_event"}) catch return; + const triggered_handle: windows.HANDLE = @ptrFromInt(key); + watches_mutex.lock(); + var it = watches.iterator(); + while (it.next()) |entry| { + const w = entry.value_ptr; + if (w.handle != triggered_handle) continue; + if (bytes > 0) { + var offset: usize = 0; + while (offset < bytes) { + const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); + const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; + var name_buf: [std.fs.max_path_bytes]u8 = undefined; + const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; + const event_type: EventType = switch (info.Action) { + FILE_ACTION_ADDED => .created, + FILE_ACTION_REMOVED => .deleted, + FILE_ACTION_MODIFIED => .modified, + FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, + else => { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }, + }; + var full_buf: [std.fs.max_path_bytes]u8 = undefined; + const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch { + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + continue; + }; + watches_mutex.unlock(); + parent.send(.{ "FW", "change", full_path, event_type }) catch { + watches_mutex.lock(); + break; + }; + watches_mutex.lock(); + if (info.NextEntryOffset == 0) break; + offset += info.NextEntryOffset; + } + } + // Re-arm ReadDirectoryChangesW for the next batch. + w.overlapped = std.mem.zeroes(windows.OVERLAPPED); + _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); + break; + } + watches_mutex.unlock(); } } @@ -737,6 +789,8 @@ const WindowsBackend = struct { FileWatcherInvalidHandle, FileWatcherReadDirectoryChangesFailed, })!void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); if (self.watches.contains(path)) return; const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path); defer allocator.free(path_w); @@ -768,59 +822,14 @@ const WindowsBackend = struct { } fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void { + self.watches_mutex.lock(); + defer self.watches_mutex.unlock(); if (self.watches.fetchRemove(path)) |entry| { _ = win32.CloseHandle(entry.value.handle); allocator.free(entry.value.path); allocator.free(entry.value.buf); } } - - fn handle_fw_event(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) !void { - _ = allocator; - var bytes: windows.DWORD = 0; - var key: windows.ULONG_PTR = 0; - var overlapped_ptr: ?*windows.OVERLAPPED = null; - while (true) { - // Non-blocking drain, the blocking wait is done in the background thread. - const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0); - if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break; - const triggered_handle: windows.HANDLE = @ptrFromInt(key); - var it = self.watches.iterator(); - while (it.next()) |entry| { - const w = entry.value_ptr; - if (w.handle != triggered_handle) continue; - if (bytes > 0) { - var offset: usize = 0; - while (offset < bytes) { - const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); - const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; - var name_buf: [std.fs.max_path_bytes]u8 = undefined; - const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; - const event_type: EventType = switch (info.Action) { - FILE_ACTION_ADDED => .created, - FILE_ACTION_REMOVED => .deleted, - FILE_ACTION_MODIFIED => .modified, - FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, - else => { - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - continue; - }, - }; - var full_buf: [std.fs.max_path_bytes]u8 = undefined; - const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch continue; - try parent.send(.{ "FW", "change", full_path, event_type }); - if (info.NextEntryOffset == 0) break; - offset += info.NextEntryOffset; - } - } - // Re-arm ReadDirectoryChangesW for the next batch. - w.overlapped = std.mem.zeroes(windows.OVERLAPPED); - _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); - break; - } - } - } }; const Process = struct { @@ -883,8 +892,6 @@ const Process = struct { self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e); } else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); - } else if (@hasDecl(Backend, "handle_fw_event") and try cbor.match(m.buf, .{"FW_event"})) { - self.backend.handle_fw_event(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {