refactor: move to an independent handler model

This commit is contained in:
CJ van den Berg 2026-02-26 10:25:00 +01:00
parent a1e5e3e9a5
commit 03e431c850
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -1,71 +1,82 @@
const std = @import("std");
const tp = @import("thespian");
const cbor = @import("cbor");
const log = @import("log");
const builtin = @import("builtin");
pid: tp.pid_ref,
const Self = @This();
const module_name = @typeName(Self);
pub const EventType = enum {
created,
modified,
deleted,
/// A new directory was created inside a watched directory. The receiver
/// should call watch() on the path to get events for files created in it.
/// A new directory was created inside a watched directory. The
/// receiver should call watch() on the path to get events for files
/// created in it.
dir_created,
/// Only produced on macOS and Windows where the OS gives no pairing info.
/// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead.
renamed,
};
pub const Error = FileWatcherError;
pub const FileWatcherError = error{
FileWatcherSendFailed,
ThespianSpawnFailed,
pub const Error = error{
HandlerFailed,
SpawnFailed,
OutOfMemory,
};
const SpawnError = error{ OutOfMemory, ThespianSpawnFailed };
const SpawnError = error{ OutOfMemory, SpawnFailed };
/// Watch a path (file or directory) for changes. The caller will receive:
/// .{ "FW", "change", path, event_type }
/// where event_type is a file_watcher.EventType tag string: "created", "modified", "deleted", "renamed"
/// On Linux, paired renames produce: .{ "FW", "rename", from_path, to_path }
pub fn watch(path: []const u8) FileWatcherError!void {
return send(.{ "watch", path });
pub const Handler = struct {
vtable: *const VTable,
pub const VTable = struct {
change: *const fn (handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!void,
rename: *const fn (handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!void,
wait_readable: if (builtin.os.tag == .linux) *const fn (handler: *Handler) error{HandlerFailed}!ReadableStatus else void,
};
fn change(handler: *Handler, path: []const u8, event_type: EventType) error{HandlerFailed}!ReadableStatus {
return handler.vtable.change(handler, path, event_type);
}
fn rename(handler: *Handler, src_path: []const u8, dst_path: []const u8) error{HandlerFailed}!ReadableStatus {
return handler.vtable.rename(handler, src_path, dst_path);
}
fn wait_readable(handler: *Handler) error{HandlerFailed}!ReadableStatus {
return handler.vtable.wait_readable(handler);
}
};
pub const ReadableStatus = enum {
is_readable, // backend may now read from fd
will_notify, // backend must wait for a handle_read_ready call
};
allocator: std.mem.Allocator,
backend: Backend,
pub fn init(allocator: std.mem.Allocator, handler: *Handler) !@This() {
var self: @This() = .{
.allocator = allocator,
.backend = try Backend.init(handler),
};
try self.backend.arm(self.allocator);
return self;
}
/// Stop watching a previously watched path.
pub fn unwatch(path: []const u8) FileWatcherError!void {
return send(.{ "unwatch", path });
pub fn deinit(self: *@This()) void {
self.backend.deinit(self.allocator);
}
pub fn start() SpawnError!void {
_ = try get();
/// Watch a path (file or directory) for changes. The handler will receive
/// `change` and (linux only) `rename` calls
pub fn watch(self: *@This(), path: []const u8) Error!void {
self.backend.add_watch(self.allocator, path) catch |e| std.log.err("nightwatch.watch: {}", .{e});
}
pub fn shutdown() void {
const pid = tp.env.get().proc(module_name);
if (pid.expired()) return;
pid.send(.{"shutdown"}) catch {};
/// Stop watching a previously watched path
pub fn unwatch(self: *@This(), path: []const u8) Error!void {
self.backend.remove_watch(self.allocator, path);
}
fn get() SpawnError!Self {
const pid = tp.env.get().proc(module_name);
return if (pid.expired()) create() else .{ .pid = pid };
}
fn send(message: anytype) FileWatcherError!void {
return (try get()).pid.send(message) catch error.FileWatcherSendFailed;
}
fn create() SpawnError!Self {
const pid = try Process.create();
defer pid.deinit();
tp.env.get().proc_set(module_name, pid.ref());
return .{ .pid = tp.env.get().proc(module_name) };
pub fn handle_read_ready(self: *@This()) !void {
try self.backend.handle_read_ready(self.allocator);
}
const Backend = switch (builtin.os.tag) {
@ -77,8 +88,8 @@ const Backend = switch (builtin.os.tag) {
};
const INotifyBackend = struct {
handler: *Handler,
inotify_fd: std.posix.fd_t,
fd_watcher: tp.file_descriptor,
watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path
const IN = std.os.linux.IN;
@ -89,24 +100,23 @@ const INotifyBackend = struct {
const in_flags: std.os.linux.O = .{ .NONBLOCK = true };
fn init() error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, Unexpected, ThespianFileDescriptorInitFailed }!@This() {
const ifd = try std.posix.inotify_init1(@bitCast(in_flags));
errdefer std.posix.close(ifd);
const fwd = try tp.file_descriptor.init(module_name, ifd);
return .{ .inotify_fd = ifd, .fd_watcher = fwd, .watches = .empty };
fn init(handler: *Handler) error{ ProcessFdQuotaExceeded, SystemFdQuotaExceeded, SystemResources, Unexpected }!@This() {
return .{
.handler = handler,
.inotify_fd = try std.posix.inotify_init1(@bitCast(in_flags)),
.watches = .empty,
};
}
fn deinit(self: *@This(), allocator: std.mem.Allocator) void {
self.fd_watcher.deinit();
var it = self.watches.iterator();
while (it.next()) |entry| allocator.free(entry.value_ptr.*);
self.watches.deinit(allocator);
std.posix.close(self.inotify_fd);
}
fn arm(self: *@This(), _: std.mem.Allocator, parent: tp.pid) error{ThespianFileDescriptorWaitReadFailed}!void {
parent.deinit();
try self.fd_watcher.wait_read();
fn arm(self: *@This(), _: std.mem.Allocator) error{HandlerFailed}!void {
return self.handler.wait_readable();
}
fn add_watch(self: *@This(), allocator: std.mem.Allocator, path: []const u8) error{OutOfMemory}!void {
@ -132,7 +142,7 @@ const INotifyBackend = struct {
}
}
fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid_ref) (std.posix.ReadError || error{ ThespianFileDescriptorWaitReadFailed, NoSpaceLeft, OutOfMemory, Exit })!void {
fn handle_read_ready(self: *@This(), allocator: std.mem.Allocator) (std.posix.ReadError || error{ ThespianFileDescriptorWaitReadFailed, NoSpaceLeft, OutOfMemory, Exit })!void {
// re-arm the file_discriptor
try self.fd_watcher.wait_read();
@ -154,7 +164,7 @@ const INotifyBackend = struct {
defer {
// Any unpaired MOVED_FROM means the file was moved out of the watched tree.
for (pending_renames.items) |r| {
parent.send(.{ "FW", "change", r.path, EventType.deleted }) catch {}; // moved outside watched tree
self.handler.vtable.change(r.path, EventType.deleted) catch {};
allocator.free(r.path);
}
pending_renames.deinit(allocator);
@ -201,14 +211,14 @@ const INotifyBackend = struct {
// Complete rename pair: emit a single atomic rename message.
const r = pending_renames.swapRemove(i);
defer allocator.free(r.path);
try parent.send(.{ "FW", "rename", r.path, full_path });
try self.handler.vtable.rename(r.path, full_path);
} else {
// No paired MOVED_FROM, file was moved in from outside the watched tree.
try parent.send(.{ "FW", "change", full_path, EventType.created });
try self.handler.vtable.change(full_path, EventType.created);
}
} else if (ev.mask & IN.MOVE_SELF != 0) {
// The watched directory itself was renamed/moved away.
try parent.send(.{ "FW", "change", full_path, EventType.deleted });
try self.handler.vtable.change(full_path, EventType.deleted);
} else {
const event_type: EventType = if (ev.mask & IN.CREATE != 0)
if (ev.mask & IN.ISDIR != 0) .dir_created else .created
@ -218,7 +228,7 @@ const INotifyBackend = struct {
.modified
else
continue;
try parent.send(.{ "FW", "change", full_path, event_type });
try self.handler.vtable.change(full_path, event_type);
}
}
}
@ -226,6 +236,7 @@ const INotifyBackend = struct {
};
const FSEventsBackend = struct {
handler: *Handler,
stream: ?*anyopaque, // FSEventStreamRef
queue: ?*anyopaque, // dispatch_queue_t
ctx: ?*CallbackContext, // heap-allocated, freed after stream is stopped
@ -279,7 +290,7 @@ const FSEventsBackend = struct {
};
const CallbackContext = struct {
parent: tp.pid,
handler: *Handler,
};
fn init() error{}!@This() {
@ -307,8 +318,7 @@ const FSEventsBackend = struct {
self.watches.deinit(allocator);
}
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) error{OutOfMemory}!void {
errdefer parent.deinit();
fn arm(self: *@This(), allocator: std.mem.Allocator) error{OutOfMemory}!void {
if (self.stream != null) return;
var cf_strings: std.ArrayListUnmanaged(?*anyopaque) = .empty;
@ -341,7 +351,7 @@ const FSEventsBackend = struct {
const ctx = try allocator.create(CallbackContext);
errdefer allocator.destroy(ctx);
ctx.* = .{ .parent = parent };
ctx.* = .{ .handler = self.handler };
const stream = cf.FSEventStreamCreate(
null,
@ -405,6 +415,7 @@ const FSEventsBackend = struct {
};
const KQueueBackend = struct {
handler: *Handler,
kq: std.posix.fd_t,
shutdown_pipe: [2]std.posix.fd_t, // [0]=read [1]=write; write a byte to wake the thread
thread: ?std.Thread,
@ -472,10 +483,9 @@ const KQueueBackend = struct {
std.posix.close(self.kq);
}
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
errdefer parent.deinit();
fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, parent });
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.kq, &self.watches, &self.snapshots, &self.snapshots_mutex, allocator, self.handler });
}
fn thread_fn(
@ -484,9 +494,8 @@ const KQueueBackend = struct {
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
parent: tp.pid,
handler: *Handler,
) void {
defer parent.deinit();
var events: [64]std.posix.Kevent = undefined;
while (true) {
// Block indefinitely until kqueue has events.
@ -500,11 +509,11 @@ const KQueueBackend = struct {
if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue;
const dir_path = entry.key_ptr.*;
if (ev.fflags & NOTE_DELETE != 0) {
parent.send(.{ "FW", "change", dir_path, EventType.deleted }) catch return;
handler.change(dir_path, EventType.deleted) catch return;
} else if (ev.fflags & NOTE_RENAME != 0) {
parent.send(.{ "FW", "change", dir_path, EventType.renamed }) catch return;
handler.change(dir_path, EventType.renamed) catch return;
} else if (ev.fflags & NOTE_WRITE != 0) {
scan_dir(dir_path, snapshots, snapshots_mutex, allocator, parent) catch {};
scan_dir(dir_path, snapshots, snapshots_mutex, allocator, handler) catch {};
}
break;
}
@ -518,7 +527,7 @@ const KQueueBackend = struct {
snapshots: *std.StringHashMapUnmanaged(std.StringHashMapUnmanaged(void)),
snapshots_mutex: *std.Thread.Mutex,
allocator: std.mem.Allocator,
parent: tp.pid,
handler: *Handler,
) !void {
var dir = std.fs.openDirAbsolute(dir_path, .{ .iterate = true }) catch return;
defer dir.close();
@ -547,7 +556,7 @@ const KQueueBackend = struct {
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.name }) catch continue;
// Only emit if not already watched.
if (!snapshots.contains(full_path))
try parent.send(.{ "FW", "change", full_path, EventType.dir_created });
try handler.change(full_path, EventType.dir_created);
}
snapshots_mutex.lock();
@ -564,7 +573,7 @@ const KQueueBackend = struct {
if (snapshot.contains(entry.key_ptr.*)) continue;
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, entry.key_ptr.* }) catch continue;
try parent.send(.{ "FW", "change", full_path, EventType.created });
try handler.change(full_path, EventType.created);
const owned = try allocator.dupe(u8, entry.key_ptr.*);
try snapshot.put(allocator, owned, {});
}
@ -580,7 +589,7 @@ const KQueueBackend = struct {
for (to_delete.items) |name| {
var path_buf: [std.fs.max_path_bytes]u8 = undefined;
const full_path = std.fmt.bufPrint(&path_buf, "{s}/{s}", .{ dir_path, name }) catch continue;
try parent.send(.{ "FW", "change", full_path, EventType.deleted });
try handler.change(full_path, EventType.deleted);
_ = snapshot.fetchRemove(name);
allocator.free(name);
}
@ -734,20 +743,18 @@ const WindowsBackend = struct {
_ = win32.CloseHandle(self.iocp);
}
fn arm(self: *@This(), allocator: std.mem.Allocator, parent: tp.pid) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
fn arm(self: *@This(), allocator: std.mem.Allocator) (error{AlreadyArmed} || std.Thread.SpawnError)!void {
_ = allocator;
errdefer parent.deinit();
if (self.thread != null) return error.AlreadyArmed;
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, parent });
self.thread = try std.Thread.spawn(.{}, thread_fn, .{ self.iocp, &self.watches, &self.watches_mutex, self.handler });
}
fn thread_fn(
iocp: windows.HANDLE,
watches: *std.StringHashMapUnmanaged(Watch),
watches_mutex: *std.Thread.Mutex,
parent: tp.pid,
handler: *Handler,
) void {
defer parent.deinit();
var bytes: windows.DWORD = 0;
var key: windows.ULONG_PTR = 0;
var overlapped_ptr: ?*windows.OVERLAPPED = null;
@ -803,7 +810,7 @@ const WindowsBackend = struct {
continue;
} else event_type;
watches_mutex.unlock();
parent.send(.{ "FW", "change", full_path, adjusted_event_type }) catch {
handler.change(full_path, adjusted_event_type) catch {
watches_mutex.lock();
break;
};
@ -869,77 +876,3 @@ const WindowsBackend = struct {
}
}
};
const Process = struct {
allocator: std.mem.Allocator,
parent: tp.pid,
logger: log.Logger,
receiver: Receiver,
backend: Backend,
const Receiver = tp.Receiver(*@This());
fn create() SpawnError!tp.pid {
const allocator = std.heap.c_allocator;
const self = try allocator.create(@This());
errdefer allocator.destroy(self);
self.* = .{
.allocator = allocator,
.parent = tp.self_pid().clone(),
.logger = log.logger(module_name),
.receiver = Receiver.init(@This().receive, self),
.backend = undefined,
};
return tp.spawn_link(self.allocator, self, @This().start, module_name);
}
fn deinit(self: *@This()) void {
self.backend.deinit(self.allocator);
self.parent.deinit();
self.logger.deinit();
self.allocator.destroy(self);
}
fn start(self: *@This()) tp.result {
errdefer self.deinit();
_ = tp.set_trap(true);
self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace());
self.backend.arm(self.allocator, self.parent.clone()) catch |e| return tp.exit_error(e, @errorReturnTrace());
tp.receive(&self.receiver);
}
fn receive(self: *@This(), from: tp.pid_ref, m: tp.message) tp.result {
errdefer self.deinit();
return self.receive_safe(from, m) catch |e| switch (e) {
error.ExitNormal => tp.exit_normal(),
else => {
const err = tp.exit_error(e, @errorReturnTrace());
self.logger.err("receive", err);
return err;
},
};
}
fn receive_safe(self: *@This(), _: tp.pid_ref, m: tp.message) (error{ExitNormal} || cbor.Error)!void {
var path: []const u8 = undefined;
var tag: []const u8 = undefined;
var err_code: i64 = 0;
var err_msg: []const u8 = undefined;
if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) {
self.backend.handle_read_ready(self.allocator, self.parent.ref()) catch |e| self.logger.err("handle_read_ready", e);
} else if (@hasDecl(Backend, "handle_read_ready") and try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) {
self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg });
} else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) {
self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e);
} else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) {
self.backend.remove_watch(self.allocator, path);
} else if (try cbor.match(m.buf, .{"shutdown"})) {
return error.ExitNormal;
} else if (try cbor.match(m.buf, .{ "exit", tp.more })) {
return error.ExitNormal;
} else {
self.logger.err("receive", tp.unexpected(m));
}
}
};