diff --git a/src/nightwatch.zig b/src/nightwatch.zig index 0ef9e52..748411e 100644 --- a/src/nightwatch.zig +++ b/src/nightwatch.zig @@ -1,71 +1,82 @@ const std = @import("std"); -const tp = @import("thespian"); -const cbor = @import("cbor"); -const log = @import("log"); const builtin = @import("builtin"); -pid: tp.pid_ref, - -const Self = @This(); -const module_name = @typeName(Self); - pub const EventType = enum { created, modified, deleted, - /// A new directory was created inside a watched directory. The receiver - /// should call watch() on the path to get events for files created in it. + /// A new directory was created inside a watched directory. The + /// receiver should call watch() on the path to get events for files + /// created in it. dir_created, /// Only produced on macOS and Windows where the OS gives no pairing info. /// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead. renamed, }; -pub const Error = FileWatcherError; -pub const FileWatcherError = error{ - FileWatcherSendFailed, - ThespianSpawnFailed, +pub const Error = error{ + HandlerFailed, + SpawnFailed, OutOfMemory, }; -const SpawnError = error{ OutOfMemory, ThespianSpawnFailed }; +const SpawnError = error{ OutOfMemory, SpawnFailed }; -/// Watch a path (file or directory) for changes. The caller will receive: -/// .{ "FW", "change", path, event_type } -/// where event_type is a file_watcher.EventType tag string: "created", "modified", "deleted", "renamed" -/// On Linux, paired renames produce: .{ "FW", "rename", from_path, to_path } -pub fn watch(path: []const u8) FileWatcherError!void { - return send(.{ "watch", path }); +pub const Handler = struct { + vtable: *const VTable, + + pub const VTable = struct { + change: *const fn (handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void, + rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!void, + wait_readable: if (builtin.os.tag == .linux) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void, + }; + + fn change(handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!ReadableStatus { + return handler.vtable.change(handler, path, event_type); + } + + fn rename(handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!ReadableStatus { + return handler.vtable.rename(handler, src_path, dst_path); + } + + fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus { + return handler.vtable.wait_readable(handler); + } +}; + +pub const ReadableStatus = enum { + is_readable, // backend may now read from fd + will_notify, // backend must wait for a handle_read_ready call +}; + +allocator: std.mem.Allocator, +backend: Backend, + +pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() { + var self: @This() = .{ + .allocator = allocator, + .backend = try Backend.init(handler), + }; + try self.backend.arm(self.allocator); + return self; } -/// Stop watching a previously watched path. -pub fn unwatch(path: []const u8) FileWatcherError!void { - return send(.{ "unwatch", path }); +pub fn deinit(self: *@This()) void { + self.backend.deinit(self.allocator); } -pub fn start() SpawnError!void { - _ = try get(); +/// Watch a path (file or directory) for changes. The handler will receive +/// `change` and (linux only) `rename` calls +pub fn watch(self: *@This(), path: []const u8) Error!void { + self.backend.add_watch(self.allocator, path) catch |e| std.log.err("nightwatch.watch: {}", .{e}); } -pub fn shutdown() void { - const pid = tp.env.get().proc(module_name); - if (pid.expired()) return; - pid.send(.{"shutdown"}) catch {}; +/// Stop watching a previously watched path +pub fn unwatch(self: *@This(), path: []const u8) Error!void { + self.backend.remove_watch(self.allocator, path); } -fn get() SpawnError!Self { - const pid = tp.env.get().proc(module_name); - return if (pid.expired()) create() else .{ .pid = pid }; -} - -fn send(message: anytype) FileWatcherError!void { - return (try get()).pid.send(message) catch error.FileWatcherSendFailed; -} - -fn create() SpawnError!Self { - const pid = try Process.create(); - defer pid.deinit(); - tp.env.get().proc_set(module_name, pid.ref()); - return .{ .pid = tp.env.get().proc(module_name) }; +pub fn handle_read_ready(self: *@This()) !void { + try self.backend.handle_read_ready(self.allocator); } const Backend = switch (builtin.os.tag) { @@ -77,8 +88,8 @@ const Backend = switch (builtin.os.tag) { }; const INotifyBackend = struct { + handler: *Handler, inotify_fd: std.posix.fd_t, - fd_watcher: tp.file_descriptor, watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path const IN = std.os.linux.IN; @@ -89,24 +100,23 @@ const INotifyBackend = struct { const in_flags: std.os.linux.O = .{ .NONBLOCK = true }; - fn init() error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, Unexpected, ThespianFileDescriptorInitFailed }!@This() { - const ifd = try std.posix.inotify_init1(@bitCast(in_flags)); - errdefer std.posix.close(ifd); - const fwd = try tp.file_descriptor.init(module_name, ifd); - return .{ .inotify_fd = ifd, .fd_watcher = fwd, .watches = .empty }; + fn init(handler: *Handler) error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, Unexpected }!@This() { + return .{ + .handler = handler, + .inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags)), + .watches = .empty, + }; } fn deinit(self: *@This(), allocator: std.mem.Allocator) void { - self.fd_watcher.deinit(); var it = self.watches.iterator(); while (it.next()) |entry| allocator.free(entry.value_ptr.*); self.watches.deinit(allocator); std.posix.close(self.inotify_fd); } - fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void { - parent.deinit(); - try self.fd_watcher.wait_read(); + fn arm(self: *@This(), _: std.mem.Allocator) error{HandlerFailed}!void { + return self.handler.wait_readable(); } fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void { @@ -132,7 +142,7 @@ const INotifyBackend = struct { } } - fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) (std.posix.ReadError || error{ ThespianFileDescriptorWaitReadFailed, NoSpaceLeft, OutOfMemory, Exit })!void { + fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ ThespianFileDescriptorWaitReadFailed, NoSpaceLeft, OutOfMemory, Exit })!void { // re-arm the file_discriptor try self.fd_watcher.wait_read(); @@ -154,7 +164,7 @@ const INotifyBackend = struct { defer { // Any unpaired MOVED_FROM means the file was moved out of the watched tree. for (pending_renames.items) |r| { - parent.send(.{ "FW", "change", r.path, EventType.deleted }) catch {}; // moved outside watched tree + self.handler.vtable.change(r.path, EventType.deleted) catch {}; allocator.free(r.path); } pending_renames.deinit(allocator); @@ -201,14 +211,14 @@ const INotifyBackend = struct { // Complete rename pair: emit a single atomic rename message. const r = pending_renames.swapRemove(i); defer allocator.free(r.path); - try parent.send(.{ "FW", "rename", r.path, full_path }); + try self.handler.vtable.rename(r.path, full_path); } else { // No paired MOVED_FROM, file was moved in from outside the watched tree. - try parent.send(.{ "FW", "change", full_path, EventType.created }); + try self.handler.vtable.change(full_path, EventType.created); } } else if (ev.mask & IN.MOVE_SELF != 0) { // The watched directory itself was renamed/moved away. - try parent.send(.{ "FW", "change", full_path, EventType.deleted }); + try self.handler.vtable.change(full_path, EventType.deleted); } else { const event_type: EventType = if (ev.mask & IN.CREATE != 0) if (ev.mask & IN.ISDIR != 0) .dir_created else .created @@ -218,7 +228,7 @@ const INotifyBackend = struct { .modified else continue; - try parent.send(.{ "FW", "change", full_path, event_type }); + try self.handler.vtable.change(full_path, event_type); } } } @@ -226,6 +236,7 @@ const INotifyBackend = struct { }; const FSEventsBackend = struct { + handler: *Handler, stream: ?*anyopaque, // FSEventStreamRef queue: ?*anyopaque, // dispatch_queue_t ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped @@ -279,7 +290,7 @@ const FSEventsBackend = struct { }; const CallbackContext = struct { - parent: tp.pid, + handler: *Handler, }; fn init() error{}!@This() { @@ -307,8 +318,7 @@ const FSEventsBackend = struct { self.watches.deinit(allocator); } - fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) error{OutOfMemory}!void { - errdefer parent.deinit(); + fn arm(self: *@This(), allocator: std.mem.Allocator) error{OutOfMemory}!void { if (self.stream != null) return; var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty; @@ -341,7 +351,7 @@ const FSEventsBackend = struct { const ctx = try allocator.create(CallbackContext); errdefer allocator.destroy(ctx); - ctx.* = .{ .parent = parent }; + ctx.* = .{ .handler = self.handler }; const stream = cf.FSEventStreamCreate( null, @@ -405,6 +415,7 @@ const FSEventsBackend = struct { }; const KQueueBackend = struct { + handler: *Handler, kq: std.posix.fd_t, shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread thread: ?std.Thread, @@ -472,10 +483,9 @@ const KQueueBackend = struct { std.posix.close(self.kq); } - fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { - errdefer parent.deinit(); + fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, self.handler }); } fn thread_fn( @@ -484,9 +494,8 @@ const KQueueBackend = struct { snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), snapshots_mutex: *std.Thread.Mutex, allocator: std.mem.Allocator, - parent: tp.pid, + handler: *Handler, ) void { - defer parent.deinit(); var events: [64]std.posix.Kevent = undefined; while (true) { // Block indefinitely until kqueue has events. @@ -500,11 +509,11 @@ const KQueueBackend = struct { if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; const dir_path = entry.key_ptr.*; if (ev.fflags & NOTE_DELETE != 0) { - parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return; + handler.change(dir_path, EventType.deleted) catch return; } else if (ev.fflags & NOTE_RENAME != 0) { - parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return; + handler.change(dir_path, EventType.renamed) catch return; } else if (ev.fflags & NOTE_WRITE != 0) { - scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {}; + scan_dir(dir_path, snapshots, snapshots_mutex, allocator, handler) catch {}; } break; } @@ -518,7 +527,7 @@ const KQueueBackend = struct { snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)), snapshots_mutex: *std.Thread.Mutex, allocator: std.mem.Allocator, - parent: tp.pid, + handler: *Handler, ) !void { var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return; defer dir.close(); @@ -547,7 +556,7 @@ const KQueueBackend = struct { const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue; // Only emit if not already watched. if (!snapshots.contains(full_path)) - try parent.send(.{ "FW", "change", full_path, EventType.dir_created }); + try handler.change(full_path, EventType.dir_created); } snapshots_mutex.lock(); @@ -564,7 +573,7 @@ const KQueueBackend = struct { if (snapshot.contains(entry.key_ptr.*)) continue; var path_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue; - try parent.send(.{ "FW", "change", full_path, EventType.created }); + try handler.change(full_path, EventType.created); const owned = try allocator.dupe(u8, entry.key_ptr.*); try snapshot.put(allocator, owned, {}); } @@ -580,7 +589,7 @@ const KQueueBackend = struct { for (to_delete.items) |name| { var path_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue; - try parent.send(.{ "FW", "change", full_path, EventType.deleted }); + try handler.change(full_path, EventType.deleted); _ = snapshot.fetchRemove(name); allocator.free(name); } @@ -734,20 +743,18 @@ const WindowsBackend = struct { _ = win32.CloseHandle(self.iocp); } - fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void { + fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void { _ = allocator; - errdefer parent.deinit(); if (self.thread != null) return error.AlreadyArmed; - self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, parent }); + self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, self.handler }); } fn thread_fn( iocp: windows.HANDLE, watches: *std.StringHashMapUnmanaged(Watch), watches_mutex: *std.Thread.Mutex, - parent: tp.pid, + handler: *Handler, ) void { - defer parent.deinit(); var bytes: windows.DWORD = 0; var key: windows.ULONG_PTR = 0; var overlapped_ptr: ?*windows.OVERLAPPED = null; @@ -803,7 +810,7 @@ const WindowsBackend = struct { continue; } else event_type; watches_mutex.unlock(); - parent.send(.{ "FW", "change", full_path, adjusted_event_type }) catch { + handler.change(full_path, adjusted_event_type) catch { watches_mutex.lock(); break; }; @@ -869,77 +876,3 @@ const WindowsBackend = struct { } } }; - -const Process = struct { - allocator: std.mem.Allocator, - parent: tp.pid, - logger: log.Logger, - receiver: Receiver, - backend: Backend, - - const Receiver = tp.Receiver(*@This()); - - fn create() SpawnError!tp.pid { - const allocator = std.heap.c_allocator; - const self = try allocator.create(@This()); - errdefer allocator.destroy(self); - self.* = .{ - .allocator = allocator, - .parent = tp.self_pid().clone(), - .logger = log.logger(module_name), - .receiver = Receiver.init(@This().receive, self), - .backend = undefined, - }; - return tp.spawn_link(self.allocator, self, @This().start, module_name); - } - - fn deinit(self: *@This()) void { - self.backend.deinit(self.allocator); - self.parent.deinit(); - self.logger.deinit(); - self.allocator.destroy(self); - } - - fn start(self: *@This()) tp.result { - errdefer self.deinit(); - _ = tp.set_trap(true); - self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); - self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace()); - tp.receive(&self.receiver); - } - - fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result { - errdefer self.deinit(); - return self.receive_safe(from, m) catch |e| switch (e) { - error.ExitNormal => tp.exit_normal(), - else => { - const err = tp.exit_error(e, @errorReturnTrace()); - self.logger.err("receive", err); - return err; - }, - }; - } - - fn receive_safe(self: *@This(), _: tp.pid_ref, m: tp.message) (error{ExitNormal} || cbor.Error)!void { - var path: []const u8 = undefined; - var tag: []const u8 = undefined; - var err_code: i64 = 0; - var err_msg: []const u8 = undefined; - - if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { - self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e); - } else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { - self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); - } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { - self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); - } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) { - self.backend.remove_watch(self.allocator, path); - } else if (try cbor.match(m.buf, .{"shutdown"})) { - return error.ExitNormal; - } else if (try cbor.match(m.buf, .{ "exit", tp.more })) { - return error.ExitNormal; - } else { - self.logger.err("receive", tp.unexpected(m)); - } - } -};