refactor: switch to blocking thread instead of watcher polling
This commit is contained in:
parent
e9e999c536
commit
54e047f63c
1 changed files with 90 additions and 27 deletions
|
|
@ -73,7 +73,7 @@ const INotifyBackend = struct {
|
|||
fd_watcher: tp.file_descriptor,
|
||||
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
|
||||
|
||||
const polling = false;
|
||||
const threaded = false;
|
||||
|
||||
const IN = std.os.linux.IN;
|
||||
|
||||
|
|
@ -98,8 +98,9 @@ const INotifyBackend = struct {
|
|||
std.posix.close(self.inotify_fd);
|
||||
}
|
||||
|
||||
fn arm(self: *@This()) void {
|
||||
self.fd_watcher.wait_read() catch {};
|
||||
fn arm(self: *@This(), parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
|
||||
parent.deinit();
|
||||
try self.fd_watcher.wait_read();
|
||||
}
|
||||
|
||||
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
|
||||
|
|
@ -217,16 +218,18 @@ const INotifyBackend = struct {
|
|||
|
||||
const KQueueBackend = struct {
|
||||
kq: std.posix.fd_t,
|
||||
poll_timer: ?tp.timeout,
|
||||
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
|
||||
thread: ?std.Thread,
|
||||
watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd
|
||||
|
||||
const polling = true;
|
||||
const poll_interval_ms: u64 = 50;
|
||||
const threaded = true;
|
||||
|
||||
const EVFILT_VNODE: i16 = -4;
|
||||
const EVFILT_READ: i16 = -1;
|
||||
const EV_ADD: u16 = 0x0001;
|
||||
const EV_ENABLE: u16 = 0x0004;
|
||||
const EV_CLEAR: u16 = 0x0020;
|
||||
const EV_DELETE: u16 = 0x0002;
|
||||
const NOTE_WRITE: u32 = 0x00000002;
|
||||
const NOTE_DELETE: u32 = 0x00000004;
|
||||
const NOTE_RENAME: u32 = 0x00000020;
|
||||
|
|
@ -235,11 +238,32 @@ const KQueueBackend = struct {
|
|||
|
||||
fn init() !@This() {
|
||||
const kq = std.posix.kqueue() catch return error.FileWatcherFailed;
|
||||
return .{ .kq = kq, .poll_timer = null, .watches = .empty };
|
||||
errdefer std.posix.close(kq);
|
||||
const pipe = std.posix.pipe() catch return error.FileWatcherFailed;
|
||||
errdefer {
|
||||
std.posix.close(pipe[0]);
|
||||
std.posix.close(pipe[1]);
|
||||
}
|
||||
// Register the read end of the shutdown pipe with kqueue so the thread
|
||||
// wakes up when we want to shut down.
|
||||
const shutdown_kev = std.posix.Kevent{
|
||||
.ident = @intCast(pipe[0]),
|
||||
.filter = EVFILT_READ,
|
||||
.flags = EV_ADD | EV_ENABLE,
|
||||
.fflags = 0,
|
||||
.data = 0,
|
||||
.udata = 0,
|
||||
};
|
||||
_ = std.posix.kevent(kq, &.{shutdown_kev}, &.{}, null) catch return error.FileWatcherFailed;
|
||||
return .{ .kq = kq, .shutdown_pipe = pipe, .thread = null, .watches = .empty };
|
||||
}
|
||||
|
||||
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
||||
if (self.poll_timer) |*t| t.deinit();
|
||||
// Signal the thread to exit by writing to the shutdown pipe.
|
||||
_ = std.posix.write(self.shutdown_pipe[1], &[_]u8{0}) catch {};
|
||||
if (self.thread) |t| t.join();
|
||||
std.posix.close(self.shutdown_pipe[0]);
|
||||
std.posix.close(self.shutdown_pipe[1]);
|
||||
var it = self.watches.iterator();
|
||||
while (it.next()) |entry| {
|
||||
std.posix.close(entry.value_ptr.*);
|
||||
|
|
@ -249,9 +273,26 @@ const KQueueBackend = struct {
|
|||
std.posix.close(self.kq);
|
||||
}
|
||||
|
||||
fn arm(self: *@This()) void {
|
||||
if (self.poll_timer) |*t| t.deinit();
|
||||
self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null;
|
||||
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
||||
errdefer parent.deinit();
|
||||
if (self.thread != null) return error.AlreadyArmed;
|
||||
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, parent });
|
||||
}
|
||||
|
||||
fn thread_fn(kq: std.posix.fd_t, parent: tp.pid) void {
|
||||
defer parent.deinit();
|
||||
var events: [64]std.posix.Kevent = undefined;
|
||||
while (true) {
|
||||
// Block indefinitely until kqueue has events.
|
||||
const n = std.posix.kevent(kq, &.{}, &events, null) catch break;
|
||||
var has_vnode_events = false;
|
||||
for (events[0..n]) |ev| {
|
||||
if (ev.filter == EVFILT_READ) return; // shutdown pipe readable, exit
|
||||
if (ev.filter == EVFILT_VNODE) has_vnode_events = true;
|
||||
}
|
||||
if (has_vnode_events)
|
||||
parent.send(.{"FW_event"}) catch break;
|
||||
}
|
||||
}
|
||||
|
||||
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
|
||||
|
|
@ -285,6 +326,7 @@ const KQueueBackend = struct {
|
|||
const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 };
|
||||
const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return;
|
||||
for (events[0..n]) |ev| {
|
||||
if (ev.filter != EVFILT_VNODE) continue;
|
||||
var it = self.watches.iterator();
|
||||
while (it.next()) |entry| {
|
||||
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
|
||||
|
|
@ -304,7 +346,7 @@ const KQueueBackend = struct {
|
|||
};
|
||||
|
||||
const WindowsBackend = struct {
|
||||
const polling = true;
|
||||
const threaded = true;
|
||||
const windows = std.os.windows;
|
||||
|
||||
const win32 = struct {
|
||||
|
|
@ -335,13 +377,20 @@ const WindowsBackend = struct {
|
|||
dwFlagsAndAttributes: windows.DWORD,
|
||||
hTemplateFile: ?windows.HANDLE,
|
||||
) callconv(.winapi) windows.HANDLE;
|
||||
pub extern "kernel32" fn PostQueuedCompletionStatus(
|
||||
CompletionPort: windows.HANDLE,
|
||||
dwNumberOfBytesTransferred: windows.DWORD,
|
||||
dwCompletionKey: windows.ULONG_PTR,
|
||||
lpOverlapped: ?*windows.OVERLAPPED,
|
||||
) callconv(.winapi) windows.BOOL;
|
||||
};
|
||||
|
||||
iocp: windows.HANDLE,
|
||||
poll_timer: ?tp.timeout,
|
||||
thread: ?std.Thread,
|
||||
watches: std.StringHashMapUnmanaged(Watch),
|
||||
|
||||
const poll_interval_ms: u64 = 50;
|
||||
// A completion key of zero is used to signal the background thread to exit.
|
||||
const SHUTDOWN_KEY: windows.ULONG_PTR = 0;
|
||||
|
||||
const Watch = struct {
|
||||
handle: windows.HANDLE,
|
||||
|
|
@ -376,11 +425,13 @@ const WindowsBackend = struct {
|
|||
fn init() !@This() {
|
||||
const iocp = windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1) catch
|
||||
return error.FileWatcherFailed;
|
||||
return .{ .iocp = iocp, .poll_timer = null, .watches = .empty };
|
||||
return .{ .iocp = iocp, .thread = null, .watches = .empty };
|
||||
}
|
||||
|
||||
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
|
||||
if (self.poll_timer) |*t| t.deinit();
|
||||
// Wake the background thread with a shutdown key, then wait for it.
|
||||
_ = win32.PostQueuedCompletionStatus(self.iocp, 0, SHUTDOWN_KEY, null);
|
||||
if (self.thread) |t| t.join();
|
||||
var it = self.watches.iterator();
|
||||
while (it.next()) |entry| {
|
||||
_ = win32.CloseHandle(entry.value_ptr.*.handle);
|
||||
|
|
@ -391,9 +442,23 @@ const WindowsBackend = struct {
|
|||
_ = win32.CloseHandle(self.iocp);
|
||||
}
|
||||
|
||||
fn arm(self: *@This()) void {
|
||||
if (self.poll_timer) |*t| t.deinit();
|
||||
self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null;
|
||||
fn arm(self: *@This(), parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
|
||||
errdefer parent.deinit();
|
||||
if (self.thread != null) return error.AlreadyArmed;
|
||||
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, parent });
|
||||
}
|
||||
|
||||
fn thread_fn(iocp: windows.HANDLE, parent: tp.pid) void {
|
||||
defer parent.deinit();
|
||||
var bytes: windows.DWORD = 0;
|
||||
var key: windows.ULONG_PTR = 0;
|
||||
var overlapped_ptr: ?*windows.OVERLAPPED = null;
|
||||
while (true) {
|
||||
// Block indefinitely until IOCP has a completion or shutdown signal.
|
||||
const ok = win32.GetQueuedCompletionStatus(iocp, &bytes, &key, &overlapped_ptr, windows.INFINITE);
|
||||
if (ok == 0 or key == SHUTDOWN_KEY) return;
|
||||
parent.send(.{"FW_event"}) catch return;
|
||||
}
|
||||
}
|
||||
|
||||
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) !void {
|
||||
|
|
@ -441,8 +506,9 @@ const WindowsBackend = struct {
|
|||
var key: windows.ULONG_PTR = 0;
|
||||
var overlapped_ptr: ?*windows.OVERLAPPED = null;
|
||||
while (true) {
|
||||
// Non-blocking drain, the blocking wait is done in the background thread.
|
||||
const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0);
|
||||
if (ok == 0 or overlapped_ptr == null) break;
|
||||
if (ok == 0 or overlapped_ptr == null or key == SHUTDOWN_KEY) break;
|
||||
const triggered_handle: windows.HANDLE = @ptrFromInt(key);
|
||||
var it = self.watches.iterator();
|
||||
while (it.next()) |entry| {
|
||||
|
|
@ -516,7 +582,7 @@ const Process = struct {
|
|||
errdefer self.deinit();
|
||||
_ = tp.set_trap(true);
|
||||
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
|
||||
self.backend.arm();
|
||||
self.backend.arm(tp.self_pid().clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
|
||||
tp.receive(&self.receiver);
|
||||
}
|
||||
|
||||
|
|
@ -538,15 +604,12 @@ const Process = struct {
|
|||
var err_code: i64 = 0;
|
||||
var err_msg: []const u8 = undefined;
|
||||
|
||||
if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) {
|
||||
if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) {
|
||||
self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e);
|
||||
self.backend.arm();
|
||||
} else if (!Backend.polling and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
|
||||
} else if (!Backend.threaded and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
|
||||
self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg });
|
||||
self.backend.arm();
|
||||
} else if (Backend.polling and try cbor.match(m.buf, .{"FW_poll"})) {
|
||||
} else if (Backend.threaded and try cbor.match(m.buf, .{"FW_event"})) {
|
||||
self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e);
|
||||
self.backend.arm();
|
||||
} else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) {
|
||||
self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e);
|
||||
} else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue