const std = @import("std"); const tp = @import("thespian"); const cbor = @import("cbor"); const log = @import("log"); const builtin = @import("builtin"); pid: tp.pid_ref, const Self = @This(); const module_name = @typeName(Self); pub const EventType = enum { created, modified, deleted, /// Only produced on macOS and Windows where the OS gives no pairing info. /// On Linux, paired renames are emitted as a { "FW", "rename", from, to } message instead. renamed, }; pub const Error = FileWatcherError; pub const FileWatcherError = error{ FileWatcherFailed, ThespianSpawnFailed, OutOfMemory }; const SpawnError = error{ OutOfMemory, ThespianSpawnFailed }; /// Watch a path (file or directory) for changes. The caller will receive: /// .{ "FW", "change", path, event_type } /// where event_type is a file_watcher.EventType tag string: "created", "modified", "deleted", "renamed" /// On Linux, paired renames produce: .{ "FW", "rename", from_path, to_path } pub fn watch(path: []const u8) FileWatcherError!void { return send(.{ "watch", path }); } /// Stop watching a previously watched path. pub fn unwatch(path: []const u8) FileWatcherError!void { return send(.{ "unwatch", path }); } pub fn start() SpawnError!void { _ = try get(); } pub fn shutdown() void { const pid = tp.env.get().proc(module_name); if (pid.expired()) return; pid.send(.{"shutdown"}) catch {}; } fn get() SpawnError!Self { const pid = tp.env.get().proc(module_name); return if (pid.expired()) create() else .{ .pid = pid }; } fn send(message: anytype) FileWatcherError!void { return (try get()).pid.send(message) catch error.FileWatcherFailed; } fn create() SpawnError!Self { const pid = try Process.create(); defer pid.deinit(); tp.env.get().proc_set(module_name, pid.ref()); return .{ .pid = tp.env.get().proc(module_name) }; } const Backend = switch (builtin.os.tag) { .linux => LinuxBackend, .macos => MacosBackend, .windows => WindowsBackend, else => @compileError("file_watcher: unsupported OS"), }; const LinuxBackend = struct { inotify_fd: std.posix.fd_t, fd_watcher: tp.file_descriptor, watches: std.AutoHashMapUnmanaged(i32, []u8), // wd -> owned path const IN = std.os.linux.IN; const watch_mask: u32 = IN.CREATE | IN.DELETE | IN.MODIFY | IN.MOVED_FROM | IN.MOVED_TO | IN.DELETE_SELF | IN.MOVE_SELF | IN.CLOSE_WRITE; const in_flags: std.os.linux.O = .{ .NONBLOCK = true }; fn init() !LinuxBackend { const ifd = std.posix.inotify_init1(@bitCast(in_flags)) catch return error.FileWatcherFailed; errdefer std.posix.close(ifd); const fwd = tp.file_descriptor.init(module_name, ifd) catch return error.FileWatcherFailed; return .{ .inotify_fd = ifd, .fd_watcher = fwd, .watches = .empty }; } fn deinit(self: *LinuxBackend, allocator: std.mem.Allocator) void { self.fd_watcher.deinit(); var it = self.watches.iterator(); while (it.next()) |entry| allocator.free(entry.value_ptr.*); self.watches.deinit(allocator); std.posix.close(self.inotify_fd); } fn arm(self: *LinuxBackend) void { self.fd_watcher.wait_read() catch {}; } fn add_watch(self: *LinuxBackend, allocator: std.mem.Allocator, path: []const u8) !void { const path_z = try allocator.dupeZ(u8, path); defer allocator.free(path_z); const wd = std.os.linux.inotify_add_watch(self.inotify_fd, path_z, watch_mask); if (wd < 0) return error.FileWatcherFailed; const owned_path = try allocator.dupe(u8, path); errdefer allocator.free(owned_path); const result = try self.watches.getOrPut(allocator, @intCast(wd)); if (result.found_existing) allocator.free(result.value_ptr.*); result.value_ptr.* = owned_path; } fn remove_watch(self: *LinuxBackend, allocator: std.mem.Allocator, path: []const u8) void { var it = self.watches.iterator(); while (it.next()) |entry| { if (!std.mem.eql(u8, entry.value_ptr.*, path)) continue; _ = std.os.linux.inotify_rm_watch(self.inotify_fd, entry.key_ptr.*); allocator.free(entry.value_ptr.*); self.watches.removeByPtr(entry.key_ptr); return; } } fn drain(self: *LinuxBackend, allocator: std.mem.Allocator, parent: tp.pid_ref) !void { const InotifyEvent = extern struct { wd: i32, mask: u32, cookie: u32, len: u32, }; // A pending MOVED_FROM waiting to be paired with a MOVED_TO by cookie. const PendingRename = struct { cookie: u32, path: []u8, // owned by drain's allocator }; var buf: [4096]u8 align(@alignOf(InotifyEvent)) = undefined; var pending_renames: std.ArrayListUnmanaged(PendingRename) = .empty; defer { // Any unpaired MOVED_FROM means the file was moved out of the watched tree. for (pending_renames.items) |r| { parent.send(.{ "FW", "change", r.path, EventType.deleted }) catch {}; // moved outside watched tree allocator.free(r.path); } pending_renames.deinit(allocator); } while (true) { const n = std.posix.read(self.inotify_fd, &buf) catch |e| switch (e) { error.WouldBlock => break, else => return e, }; var offset: usize = 0; while (offset + @sizeOf(InotifyEvent) <= n) { const ev: *const InotifyEvent = @ptrCast(@alignCast(buf[offset..].ptr)); const name_offset = offset + @sizeOf(InotifyEvent); offset = name_offset + ev.len; const watched_path = self.watches.get(ev.wd) orelse continue; const name: []const u8 = if (ev.len > 0) std.mem.sliceTo(buf[name_offset..][0..ev.len], 0) else ""; var full_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path: []const u8 = if (name.len > 0) try std.fmt.bufPrint(&full_buf, "{s}/{s}", .{ watched_path, name }) else watched_path; if (ev.mask & IN.MOVED_FROM != 0) { // Park it, we may receive a paired MOVED_TO with the same cookie. try pending_renames.append(allocator, .{ .cookie = ev.cookie, .path = try allocator.dupe(u8, full_path), }); } else if (ev.mask & IN.MOVED_TO != 0) { // Look for a paired MOVED_FROM. var found: ?usize = null; for (pending_renames.items, 0..) |r, i| { if (r.cookie == ev.cookie) { found = i; break; } } if (found) |i| { // Complete rename pair: emit a single atomic rename message. const r = pending_renames.swapRemove(i); defer allocator.free(r.path); try parent.send(.{ "FW", "rename", r.path, full_path }); } else { // No paired MOVED_FROM, file was moved in from outside the watched tree. try parent.send(.{ "FW", "change", full_path, EventType.created }); } } else if (ev.mask & IN.MOVE_SELF != 0) { // The watched directory itself was renamed/moved away. try parent.send(.{ "FW", "change", full_path, EventType.deleted }); } else { const event_type: EventType = if (ev.mask & IN.CREATE != 0) .created else if (ev.mask & (IN.DELETE | IN.DELETE_SELF) != 0) .deleted else if (ev.mask & (IN.MODIFY | IN.CLOSE_WRITE) != 0) .modified else continue; try parent.send(.{ "FW", "change", full_path, event_type }); } } } } }; const MacosBackend = struct { kq: std.posix.fd_t, fd_watcher: tp.file_descriptor, watches: std.StringHashMapUnmanaged(std.posix.fd_t), // owned path -> fd const EVFILT_VNODE: i16 = -4; const EV_ADD: u16 = 0x0001; const EV_ENABLE: u16 = 0x0004; const EV_CLEAR: u16 = 0x0020; const NOTE_WRITE: u32 = 0x00000002; const NOTE_DELETE: u32 = 0x00000004; const NOTE_RENAME: u32 = 0x00000020; const NOTE_ATTRIB: u32 = 0x00000008; const NOTE_EXTEND: u32 = 0x00000004; fn init() !MacosBackend { const kq = std.posix.kqueue() catch return error.FileWatcherFailed; errdefer std.posix.close(kq); const fwd = tp.file_descriptor.init(module_name, kq) catch return error.FileWatcherFailed; return .{ .kq = kq, .fd_watcher = fwd, .watches = .empty }; } fn deinit(self: *MacosBackend, allocator: std.mem.Allocator) void { self.fd_watcher.deinit(); var it = self.watches.iterator(); while (it.next()) |entry| { std.posix.close(entry.value_ptr.*); allocator.free(entry.key_ptr.*); } self.watches.deinit(allocator); std.posix.close(self.kq); } fn arm(self: *MacosBackend) void { self.fd_watcher.wait_read() catch {}; } fn add_watch(self: *MacosBackend, allocator: std.mem.Allocator, path: []const u8) !void { if (self.watches.contains(path)) return; const path_fd = std.posix.open(path, .{ .ACCMODE = .RDONLY }, 0) catch return error.FileWatcherFailed; errdefer std.posix.close(path_fd); const kev = std.posix.Kevent{ .ident = @intCast(path_fd), .filter = EVFILT_VNODE, .flags = EV_ADD | EV_ENABLE | EV_CLEAR, .fflags = NOTE_WRITE | NOTE_DELETE | NOTE_RENAME | NOTE_ATTRIB | NOTE_EXTEND, .data = 0, .udata = 0, }; _ = std.posix.kevent(self.kq, &.{kev}, &.{}, null) catch return error.FileWatcherFailed; const owned_path = try allocator.dupe(u8, path); errdefer allocator.free(owned_path); try self.watches.put(allocator, owned_path, path_fd); } fn remove_watch(self: *MacosBackend, allocator: std.mem.Allocator, path: []const u8) void { if (self.watches.fetchRemove(path)) |entry| { std.posix.close(entry.value); allocator.free(entry.key); } } fn drain(self: *MacosBackend, allocator: std.mem.Allocator, parent: tp.pid_ref) !void { _ = allocator; var events: [64]std.posix.Kevent = undefined; const immediate: std.posix.timespec = .{ .sec = 0, .nsec = 0 }; const n = std.posix.kevent(self.kq, &.{}, &events, &immediate) catch return; for (events[0..n]) |ev| { var it = self.watches.iterator(); while (it.next()) |entry| { if (entry.value_ptr.* != @as(std.posix.fd_t, @intCast(ev.ident))) continue; const event_type: EventType = if (ev.fflags & NOTE_DELETE != 0) .deleted else if (ev.fflags & NOTE_RENAME != 0) .renamed else if (ev.fflags & (NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB) != 0) .modified else continue; try parent.send(.{ "FW", "change", entry.key_ptr.*, event_type }); break; } } } }; const WindowsBackend = struct { const windows = std.os.windows; const win32 = struct { pub extern "kernel32" fn CloseHandle(hObject: windows.HANDLE) callconv(.winapi) windows.BOOL; pub extern "kernel32" fn ReadDirectoryChangesW( hDirectory: windows.HANDLE, lpBuffer: *anyopaque, nBufferLength: windows.DWORD, bWatchSubtree: windows.BOOL, dwNotifyFilter: windows.DWORD, lpBytesReturned: ?*windows.DWORD, lpOverlapped: ?*windows.OVERLAPPED, lpCompletionRoutine: ?*anyopaque, ) callconv(.winapi) windows.BOOL; pub extern "kernel32" fn GetQueuedCompletionStatus( CompletionPort: windows.HANDLE, lpNumberOfBytesTransferred: *windows.DWORD, lpCompletionKey: *windows.ULONG_PTR, lpOverlapped: *?*windows.OVERLAPPED, dwMilliseconds: windows.DWORD, ) callconv(.winapi) windows.BOOL; pub extern "kernel32" fn CreateFileW( lpFileName: [*:0]const windows.WCHAR, dwDesiredAccess: windows.DWORD, dwShareMode: windows.DWORD, lpSecurityAttributes: ?*anyopaque, dwCreationDisposition: windows.DWORD, dwFlagsAndAttributes: windows.DWORD, hTemplateFile: ?windows.HANDLE, ) callconv(.winapi) windows.HANDLE; }; iocp: windows.HANDLE, poll_timer: ?tp.timeout, watches: std.StringHashMapUnmanaged(Watch), const poll_interval_ms: u64 = 50; const Watch = struct { handle: windows.HANDLE, buf: Buf, overlapped: windows.OVERLAPPED, path: []u8, // owned }; const buf_size = 65536; const Buf = []align(4) u8; const FILE_NOTIFY_INFORMATION = extern struct { NextEntryOffset: windows.DWORD, Action: windows.DWORD, FileNameLength: windows.DWORD, FileName: [1]windows.WCHAR, }; const FILE_ACTION_ADDED: windows.DWORD = 1; const FILE_ACTION_REMOVED: windows.DWORD = 2; const FILE_ACTION_MODIFIED: windows.DWORD = 3; const FILE_ACTION_RENAMED_OLD_NAME: windows.DWORD = 4; const FILE_ACTION_RENAMED_NEW_NAME: windows.DWORD = 5; const notify_filter: windows.DWORD = 0x00000001 | // FILE_NOTIFY_CHANGE_FILE_NAME 0x00000002 | // FILE_NOTIFY_CHANGE_DIR_NAME 0x00000008 | // FILE_NOTIFY_CHANGE_SIZE 0x00000010 | // FILE_NOTIFY_CHANGE_LAST_WRITE 0x00000040; // FILE_NOTIFY_CHANGE_CREATION fn init() !WindowsBackend { const iocp = windows.CreateIoCompletionPort(windows.INVALID_HANDLE_VALUE, null, 0, 1) catch return error.FileWatcherFailed; return .{ .iocp = iocp, .poll_timer = null, .watches = .empty }; } fn deinit(self: *WindowsBackend, allocator: std.mem.Allocator) void { if (self.poll_timer) |*t| t.deinit(); var it = self.watches.iterator(); while (it.next()) |entry| { _ = win32.CloseHandle(entry.value_ptr.*.handle); allocator.free(entry.value_ptr.*.path); allocator.free(entry.value_ptr.*.buf); } self.watches.deinit(allocator); _ = win32.CloseHandle(self.iocp); } fn arm(self: *WindowsBackend) void { if (self.poll_timer) |*t| t.deinit(); self.poll_timer = tp.timeout.init_ms(poll_interval_ms, tp.message.fmt(.{"FW_poll"})) catch null; } fn add_watch(self: *WindowsBackend, allocator: std.mem.Allocator, path: []const u8) !void { if (self.watches.contains(path)) return; const path_w = try std.unicode.utf8ToUtf16LeAllocZ(allocator, path); defer allocator.free(path_w); const handle = win32.CreateFileW( path_w, windows.GENERIC_READ, windows.FILE_SHARE_READ | windows.FILE_SHARE_WRITE | windows.FILE_SHARE_DELETE, null, windows.OPEN_EXISTING, 0x02000000 | 0x40000000, // FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED null, ); if (handle == windows.INVALID_HANDLE_VALUE) return error.FileWatcherFailed; errdefer _ = win32.CloseHandle(handle); _ = windows.CreateIoCompletionPort(handle, self.iocp, @intFromPtr(handle), 0) catch return error.FileWatcherFailed; const buf = try allocator.alignedAlloc(u8, .fromByteUnits(4), buf_size); errdefer allocator.free(buf); const owned_path = try allocator.dupe(u8, path); errdefer allocator.free(owned_path); var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED); if (win32.ReadDirectoryChangesW(handle, buf.ptr, buf_size, 1, notify_filter, null, &overlapped, null) == 0) return error.FileWatcherFailed; try self.watches.put(allocator, owned_path, .{ .handle = handle, .buf = buf, .overlapped = overlapped, .path = owned_path, }); } fn remove_watch(self: *WindowsBackend, allocator: std.mem.Allocator, path: []const u8) void { if (self.watches.fetchRemove(path)) |entry| { _ = win32.CloseHandle(entry.value.handle); allocator.free(entry.value.path); allocator.free(entry.value.buf); } } fn drain(self: *WindowsBackend, allocator: std.mem.Allocator, parent: tp.pid_ref) !void { _ = allocator; var bytes: windows.DWORD = 0; var key: windows.ULONG_PTR = 0; var overlapped_ptr: ?*windows.OVERLAPPED = null; while (true) { const ok = win32.GetQueuedCompletionStatus(self.iocp, &bytes, &key, &overlapped_ptr, 0); if (ok == 0 or overlapped_ptr == null) break; const triggered_handle: windows.HANDLE = @ptrFromInt(key); var it = self.watches.iterator(); while (it.next()) |entry| { const w = entry.value_ptr; if (w.handle != triggered_handle) continue; if (bytes > 0) { var offset: usize = 0; while (offset < bytes) { const info: *FILE_NOTIFY_INFORMATION = @ptrCast(@alignCast(w.buf[offset..].ptr)); const name_wchars = (&info.FileName).ptr[0 .. info.FileNameLength / 2]; var name_buf: [std.fs.max_path_bytes]u8 = undefined; const name_len = std.unicode.utf16LeToUtf8(&name_buf, name_wchars) catch 0; const event_type: EventType = switch (info.Action) { FILE_ACTION_ADDED => .created, FILE_ACTION_REMOVED => .deleted, FILE_ACTION_MODIFIED => .modified, FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME => .renamed, else => { if (info.NextEntryOffset == 0) break; offset += info.NextEntryOffset; continue; }, }; var full_buf: [std.fs.max_path_bytes]u8 = undefined; const full_path = std.fmt.bufPrint(&full_buf, "{s}\\{s}", .{ w.path, name_buf[0..name_len] }) catch continue; try parent.send(.{ "FW", "change", full_path, event_type }); if (info.NextEntryOffset == 0) break; offset += info.NextEntryOffset; } } // Re-arm ReadDirectoryChangesW for the next batch. w.overlapped = std.mem.zeroes(windows.OVERLAPPED); _ = win32.ReadDirectoryChangesW(w.handle, w.buf.ptr, buf_size, 1, notify_filter, null, &w.overlapped, null); break; } } } }; const Process = struct { allocator: std.mem.Allocator, parent: tp.pid, logger: log.Logger, receiver: Receiver, backend: Backend, const Receiver = tp.Receiver(*Process); fn create() SpawnError!tp.pid { const allocator = std.heap.c_allocator; const self = try allocator.create(Process); errdefer allocator.destroy(self); self.* = .{ .allocator = allocator, .parent = tp.self_pid().clone(), .logger = log.logger(module_name), .receiver = Receiver.init(Process.receive, self), .backend = undefined, }; return tp.spawn_link(self.allocator, self, Process.start, module_name); } fn deinit(self: *Process) void { self.backend.deinit(self.allocator); self.parent.deinit(); self.logger.deinit(); self.allocator.destroy(self); } fn start(self: *Process) tp.result { errdefer self.deinit(); _ = tp.set_trap(true); self.backend = Backend.init() catch |e| return tp.exit_error(e, @errorReturnTrace()); self.backend.arm(); tp.receive(&self.receiver); } fn receive(self: *Process, from: tp.pid_ref, m: tp.message) tp.result { errdefer self.deinit(); return self.receive_safe(from, m) catch |e| switch (e) { error.ExitNormal => tp.exit_normal(), else => { const err = tp.exit_error(e, @errorReturnTrace()); self.logger.err("receive", err); return err; }, }; } fn receive_safe(self: *Process, _: tp.pid_ref, m: tp.message) (error{ExitNormal} || cbor.Error)!void { var path: []const u8 = undefined; var tag: []const u8 = undefined; var err_code: i64 = 0; var err_msg: []const u8 = undefined; if (try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_ready" })) { self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); self.backend.arm(); } else if (try cbor.match(m.buf, .{ "fd", tp.extract(&tag), "read_error", tp.extract(&err_code), tp.extract(&err_msg) })) { self.logger.print("fd read error on {s}: ({d}) {s}", .{ tag, err_code, err_msg }); self.backend.arm(); } else if (builtin.os.tag == .windows and try cbor.match(m.buf, .{"FW_poll"})) { self.backend.drain(self.allocator, self.parent.ref()) catch |e| self.logger.err("drain", e); self.backend.arm(); } else if (try cbor.match(m.buf, .{ "watch", tp.extract(&path) })) { self.backend.add_watch(self.allocator, path) catch |e| self.logger.err("watch", e); } else if (try cbor.match(m.buf, .{ "unwatch", tp.extract(&path) })) { self.backend.remove_watch(self.allocator, path); } else if (try cbor.match(m.buf, .{"shutdown"})) { return error.ExitNormal; } else if (try cbor.match(m.buf, .{ "exit", tp.more })) { return error.ExitNormal; } else { self.logger.err("receive", tp.unexpected(m)); } } };