refactor: re-work windows watcher backend to direct send notifications
This commit is contained in:
parent
6bcbc70e6b
commit
8526838230
1 changed files with 61 additions and 54 deletions
|
|
@ -659,6 +659,7 @@ const WindowsBackend = struct {
|
||||||
iocp: windows.HANDLE,
|
iocp: windows.HANDLE,
|
||||||
thread: ?std.Thread,
|
thread: ?std.Thread,
|
||||||
watches: std.StringHashMapUnmanaged(Watch),
|
watches: std.StringHashMapUnmanaged(Watch),
|
||||||
|
watches_mutex: std.Thread.Mutex,
|
||||||
|
|
||||||
// A completion key of zero is used to signal the background thread to exit.
|
// A completion key of zero is used to signal the background thread to exit.
|
||||||
const SHUTDOWN_KEY: windows.ULONG_PTR = 0;
|
const SHUTDOWN_KEY: windows.ULONG_PTR = 0;
|
||||||
|
|
@ -695,7 +696,7 @@ const WindowsBackend = struct {
|
||||||
|
|
||||||
fn init() windows.CreateIoCompletionPortError!@This() {
|
fn init() windows.CreateIoCompletionPortError!@This() {
|
||||||
const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1);
|
const iocp = try windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1);
|
||||||
return .{ .iocp = iocp, .thread = null, .watches = .empty };
|
return .{ .iocp = iocp, .thread = null, .watches = .empty, .watches_mutex = .{} };
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
||||||
|
|
@ -712,13 +713,19 @@ const WindowsBackend = struct {
|
||||||
_ = win32.CloseHandle(self.iocp);
|
_ = win32.CloseHandle(self.iocp);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
||||||
|
_ = allocator;
|
||||||
errdefer parent.deinit();
|
errdefer parent.deinit();
|
||||||
if (self.thread != null) return error.AlreadyArmed;
|
if (self.thread != null) return error.AlreadyArmed;
|
||||||
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
|
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, parent });
|
||||||
}
|
}
|
||||||
|
|
||||||
fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void {
|
fn thread_fn(
|
||||||
|
iocp: windows.HANDLE,
|
||||||
|
watches: *std.StringHashMapUnmanaged(Watch),
|
||||||
|
watches_mutex: *std.Thread.Mutex,
|
||||||
|
parent: tp.pid,
|
||||||
|
) void {
|
||||||
defer parent.deinit();
|
defer parent.deinit();
|
||||||
var bytes: windows.DWORD = 0;
|
var bytes: windows.DWORD = 0;
|
||||||
var key: windows.ULONG_PTR = 0;
|
var key: windows.ULONG_PTR = 0;
|
||||||
|
|
@ -727,7 +734,52 @@ const WindowsBackend = struct {
|
||||||
// Block indefinitely until IOCP has a completion or shutdown signal.
|
// Block indefinitely until IOCP has a completion or shutdown signal.
|
||||||
const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE);
|
const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE);
|
||||||
if (ok == 0 or key == SHUTDOWN_KEY) return;
|
if (ok == 0 or key == SHUTDOWN_KEY) return;
|
||||||
parent.send(.{"FW_event"}) catch return;
|
const triggered_handle: windows.HANDLE = @ptrFromInt(key);
|
||||||
|
watches_mutex.lock();
|
||||||
|
var it = watches.iterator();
|
||||||
|
while (it.next()) |entry| {
|
||||||
|
const w = entry.value_ptr;
|
||||||
|
if (w.handle != triggered_handle) continue;
|
||||||
|
if (bytes > 0) {
|
||||||
|
var offset: usize = 0;
|
||||||
|
while (offset < bytes) {
|
||||||
|
const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr));
|
||||||
|
const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2];
|
||||||
|
var name_buf: [std.fs.max_path_bytes]u8 = undefined;
|
||||||
|
const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0;
|
||||||
|
const event_type: EventType = switch (info.Action) {
|
||||||
|
FILE_ACTION_ADDED => .created,
|
||||||
|
FILE_ACTION_REMOVED => .deleted,
|
||||||
|
FILE_ACTION_MODIFIED => .modified,
|
||||||
|
FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed,
|
||||||
|
else => {
|
||||||
|
if (info.NextEntryOffset == 0) break;
|
||||||
|
offset += info.NextEntryOffset;
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
var full_buf: [std.fs.max_path_bytes]u8 = undefined;
|
||||||
|
const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch {
|
||||||
|
if (info.NextEntryOffset == 0) break;
|
||||||
|
offset += info.NextEntryOffset;
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
watches_mutex.unlock();
|
||||||
|
parent.send(.{ "FW", "change", full_path, event_type }) catch {
|
||||||
|
watches_mutex.lock();
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
watches_mutex.lock();
|
||||||
|
if (info.NextEntryOffset == 0) break;
|
||||||
|
offset += info.NextEntryOffset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Re-arm ReadDirectoryChangesW for the next batch.
|
||||||
|
w.overlapped = std.mem.zeroes(windows.OVERLAPPED);
|
||||||
|
_ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
watches_mutex.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -737,6 +789,8 @@ const WindowsBackend = struct {
|
||||||
FileWatcherInvalidHandle,
|
FileWatcherInvalidHandle,
|
||||||
FileWatcherReadDirectoryChangesFailed,
|
FileWatcherReadDirectoryChangesFailed,
|
||||||
})!void {
|
})!void {
|
||||||
|
self.watches_mutex.lock();
|
||||||
|
defer self.watches_mutex.unlock();
|
||||||
if (self.watches.contains(path)) return;
|
if (self.watches.contains(path)) return;
|
||||||
const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path);
|
const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path);
|
||||||
defer allocator.free(path_w);
|
defer allocator.free(path_w);
|
||||||
|
|
@ -768,59 +822,14 @@ const WindowsBackend = struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
|
fn remove_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) void {
|
||||||
|
self.watches_mutex.lock();
|
||||||
|
defer self.watches_mutex.unlock();
|
||||||
if (self.watches.fetchRemove(path)) |entry| {
|
if (self.watches.fetchRemove(path)) |entry| {
|
||||||
_ = win32.CloseHandle(entry.value.handle);
|
_ = win32.CloseHandle(entry.value.handle);
|
||||||
allocator.free(entry.value.path);
|
allocator.free(entry.value.path);
|
||||||
allocator.free(entry.value.buf);
|
allocator.free(entry.value.buf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_fw_event(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) !void {
|
|
||||||
_ = allocator;
|
|
||||||
var bytes: windows.DWORD = 0;
|
|
||||||
var key: windows.ULONG_PTR = 0;
|
|
||||||
var overlapped_ptr: ?*windows.OVERLAPPED = null;
|
|
||||||
while (true) {
|
|
||||||
// Non-blocking drain, the blocking wait is done in the background thread.
|
|
||||||
const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0);
|
|
||||||
if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break;
|
|
||||||
const triggered_handle: windows.HANDLE = @ptrFromInt(key);
|
|
||||||
var it = self.watches.iterator();
|
|
||||||
while (it.next()) |entry| {
|
|
||||||
const w = entry.value_ptr;
|
|
||||||
if (w.handle != triggered_handle) continue;
|
|
||||||
if (bytes > 0) {
|
|
||||||
var offset: usize = 0;
|
|
||||||
while (offset < bytes) {
|
|
||||||
const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr));
|
|
||||||
const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2];
|
|
||||||
var name_buf: [std.fs.max_path_bytes]u8 = undefined;
|
|
||||||
const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0;
|
|
||||||
const event_type: EventType = switch (info.Action) {
|
|
||||||
FILE_ACTION_ADDED => .created,
|
|
||||||
FILE_ACTION_REMOVED => .deleted,
|
|
||||||
FILE_ACTION_MODIFIED => .modified,
|
|
||||||
FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed,
|
|
||||||
else => {
|
|
||||||
if (info.NextEntryOffset == 0) break;
|
|
||||||
offset += info.NextEntryOffset;
|
|
||||||
continue;
|
|
||||||
},
|
|
||||||
};
|
|
||||||
var full_buf: [std.fs.max_path_bytes]u8 = undefined;
|
|
||||||
const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch continue;
|
|
||||||
try parent.send(.{ "FW", "change", full_path, event_type });
|
|
||||||
if (info.NextEntryOffset == 0) break;
|
|
||||||
offset += info.NextEntryOffset;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Re-arm ReadDirectoryChangesW for the next batch.
|
|
||||||
w.overlapped = std.mem.zeroes(windows.OVERLAPPED);
|
|
||||||
_ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const Process = struct {
|
const Process = struct {
|
||||||
|
|
@ -883,8 +892,6 @@ const Process = struct {
|
||||||
self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e);
|
self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e);
|
||||||
} else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
|
} else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
|
||||||
self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg });
|
self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg });
|
||||||
} else if (@hasDecl(Backend, "handle_fw_event") and try cbor.match(m.buf, .{"FW_event"})) {
|
|
||||||
self.backend.handle_fw_event(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e);
|
|
||||||
} else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) {
|
} else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) {
|
||||||
self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e);
|
self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e);
|
||||||
} else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {
|
} else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue