refactor(terminal): move pty input processing to an actor

This commit is contained in:
CJ van den Berg 2026-02-24 21:14:08 +01:00
parent d423696e7e
commit f8dd9f85b6
Signed by: neurocyte
GPG key ID: 8EB1E1BB660E3FB9

View file

@ -21,16 +21,12 @@ const widget_type: Widget.Type = .panel;
const Terminal = vaxis.widgets.Terminal;
/// Poll interval in microseconds how often we check the pty for new output.
/// 16 ms 60 Hz; Flow's render loop will coalesce multiple need_render calls.
const poll_interval_us: u64 = 16 * std.time.us_per_ms;
allocator: Allocator,
plane: Plane,
vt: Terminal,
env: std.process.EnvMap,
write_buf: [4096]u8,
poll_timer: ?tp.Cancellable = null,
pty_pid: ?tp.pid = null,
focused: bool = false,
cwd: std.ArrayListUnmanaged(u8) = .empty,
title: std.ArrayListUnmanaged(u8) = .empty,
@ -110,7 +106,7 @@ pub fn create_with_args(allocator: Allocator, parent: Plane, ctx: command.Contex
.vt = vt,
.env = env,
.write_buf = undefined, // managed via self.vt's pty_writer pointer
.poll_timer = null,
.pty_pid = null,
};
try self.vt.spawn();
@ -120,12 +116,16 @@ pub fn create_with_args(allocator: Allocator, parent: Plane, ctx: command.Contex
container.ctx = self;
try container.add(Widget.to(self));
self.schedule_poll();
self.pty_pid = try pty.spawn(allocator, &self.vt);
return container.widget();
}
pub fn receive(self: *Self, _: tp.pid_ref, m: tp.message) error{Exit}!bool {
if (try m.match(.{ "terminal_view", "output" })) {
tui.need_render(@src());
return true;
}
if (!self.focused) return false;
var evtype: u8 = 0;
var keycode: u21 = 0;
@ -162,10 +162,10 @@ pub fn deinit(self: *Self, allocator: Allocator) void {
if (self.focused) tui.release_keyboard_focus(Widget.to(self));
self.cwd.deinit(self.allocator);
self.title.deinit(self.allocator);
tui.message_filters().remove_ptr(self);
if (self.poll_timer) |*t| {
t.cancel() catch {};
t.deinit();
if (self.pty_pid) |pid| {
pid.send(.{ "pty_actor", "quit" }) catch {};
pid.deinit();
self.pty_pid = null;
}
self.vt.deinit();
self.env.deinit();
@ -217,30 +217,104 @@ pub fn handle_resize(self: *Self, pos: Widget.Box) void {
};
}
// The pty read thread pushes output into vt asynchronously. We use a
// recurring thespian delay_send to wake up every ~16 ms and check whether
// new output has arrived, requesting a render frame when it has.
fn schedule_poll(self: *Self) void {
self.poll_timer = tp.self_pid().delay_send_cancellable(
self.allocator,
"terminal_view.poll",
poll_interval_us,
.{"TERMINAL_VIEW_POLL"},
) catch null;
}
fn receive_filter(self: *Self, _: tp.pid_ref, m: tp.message) MessageFilter.Error!bool {
if (try cbor.match(m.buf, .{"TERMINAL_VIEW_POLL"})) {
if (self.poll_timer) |*t| {
t.deinit();
self.poll_timer = null;
}
if (self.vt.dirty)
tui.need_render(@src());
self.schedule_poll();
fn receive_filter(_: *Self, _: tp.pid_ref, m: tp.message) MessageFilter.Error!bool {
if (m.match(.{ "terminal_view", "output" }) catch false) {
tui.need_render(@src());
return true;
}
return false;
}
const pty = struct {
const Parser = Terminal.Parser;
const Receiver = tp.Receiver(*@This());
allocator: std.mem.Allocator,
vt: *Terminal,
fd: tp.file_descriptor,
pty_fd: std.posix.fd_t,
parser: Parser,
receiver: Receiver,
parent: tp.pid,
pub fn spawn(allocator: std.mem.Allocator, vt: *Terminal) !tp.pid {
const self = try allocator.create(@This());
errdefer allocator.destroy(self);
self.* = .{
.allocator = allocator,
.vt = vt,
.fd = undefined,
.pty_fd = vt.ptyFd(),
.parser = .{ .buf = try .initCapacity(allocator, 128) },
.receiver = Receiver.init(pty_receive, self),
.parent = tp.self_pid().clone(),
};
return tp.spawn_link(allocator, self, start, "pty_actor");
}
fn deinit(self: *@This()) void {
self.fd.deinit();
self.parser.buf.deinit();
self.parent.deinit();
self.allocator.destroy(self);
}
fn start(self: *@This()) tp.result {
errdefer self.deinit();
self.fd = tp.file_descriptor.init("pty", self.pty_fd) catch |e| return tp.exit_error(e, @errorReturnTrace());
self.fd.wait_read() catch |e| return tp.exit_error(e, @errorReturnTrace());
tp.receive(&self.receiver);
}
fn pty_receive(self: *@This(), _: tp.pid_ref, m: tp.message) tp.result {
errdefer self.deinit();
if (try m.match(.{ "fd", "pty", "read_ready" })) {
try self.read_and_process();
return;
}
if (try m.match(.{ "pty_actor", "quit" })) {
self.deinit();
return;
}
}
fn read_and_process(self: *@This()) tp.result {
var buf: [4096]u8 = undefined;
while (true) {
const n = std.posix.read(self.vt.ptyFd(), &buf) catch |e| switch (e) {
error.WouldBlock => break,
error.InputOutput => {
self.vt.event_queue.push(.exited);
self.vt.cmd.wait();
self.deinit();
return;
},
else => return tp.exit_error(e, @errorReturnTrace()),
};
if (n == 0) {
self.vt.event_queue.push(.exited);
self.vt.cmd.wait();
self.deinit();
return;
}
const exited = self.vt.processOutput(&self.parser, buf[0..n]) catch |e|
return tp.exit_error(e, @errorReturnTrace());
if (exited) {
self.vt.cmd.wait();
// Notify parent so it drains the .exited event on its next render.
self.parent.send(.{ "terminal_view", "output" }) catch {};
self.deinit();
return;
}
// Notify parent that new output is available.
self.parent.send(.{ "terminal_view", "output" }) catch {};
}
self.fd.wait_read() catch |e| return tp.exit_error(e, @errorReturnTrace());
}
};