fix(subprocess): use direct writes for stdin on windows

file_stream is not supported for stdin on windows apparently
This commit is contained in:
CJ van den Berg 2024-08-08 19:48:15 +02:00
parent b10131ee52
commit 5eb4b9b916

View file

@ -72,11 +72,8 @@ const Proc = struct {
child: std.process.Child, child: std.process.Child,
tag: [:0]const u8, tag: [:0]const u8,
stdin_buffer: std.ArrayList(u8), stdin_buffer: std.ArrayList(u8),
stream_stdin: ?tp.file_stream = null,
stream_stdout: ?tp.file_stream = null, stream_stdout: ?tp.file_stream = null,
stream_stderr: ?tp.file_stream = null, stream_stderr: ?tp.file_stream = null,
write_pending: bool = false,
stdin_close_pending: bool = false,
const Receiver = tp.Receiver(*Proc); const Receiver = tp.Receiver(*Proc);
@ -116,7 +113,6 @@ const Proc = struct {
fn deinit(self: *Proc) void { fn deinit(self: *Proc) void {
self.args.deinit(); self.args.deinit();
if (self.stream_stdin) |stream| stream.deinit();
if (self.stream_stdout) |stream| stream.deinit(); if (self.stream_stdout) |stream| stream.deinit();
if (self.stream_stderr) |stream| stream.deinit(); if (self.stream_stderr) |stream| stream.deinit();
self.stdin_buffer.deinit(); self.stdin_buffer.deinit();
@ -133,8 +129,6 @@ const Proc = struct {
}; };
_ = self.args.reset(.free_all); _ = self.args.reset(.free_all);
if (self.child.stdin_behavior == .Pipe)
self.stream_stdin = tp.file_stream.init("stdin", self.child.stdin.?.handle) catch |e| return tp.exit_error(e, @errorReturnTrace());
self.stream_stdout = tp.file_stream.init("stdout", self.child.stdout.?.handle) catch |e| return tp.exit_error(e, @errorReturnTrace()); self.stream_stdout = tp.file_stream.init("stdout", self.child.stdout.?.handle) catch |e| return tp.exit_error(e, @errorReturnTrace());
self.stream_stderr = tp.file_stream.init("stderr", self.child.stderr.?.handle) catch |e| return tp.exit_error(e, @errorReturnTrace()); self.stream_stderr = tp.file_stream.init("stderr", self.child.stderr.?.handle) catch |e| return tp.exit_error(e, @errorReturnTrace());
if (self.stream_stdout) |stream| stream.start_read() catch |e| return tp.exit_error(e, @errorReturnTrace()); if (self.stream_stdout) |stream| stream.start_read() catch |e| return tp.exit_error(e, @errorReturnTrace());
@ -146,7 +140,6 @@ const Proc = struct {
fn receive(self: *Proc, _: tp.pid_ref, m: tp.message) tp.result { fn receive(self: *Proc, _: tp.pid_ref, m: tp.message) tp.result {
errdefer self.deinit(); errdefer self.deinit();
var bytes: []u8 = ""; var bytes: []u8 = "";
var bytes_written: usize = 0;
var stream_name: []u8 = ""; var stream_name: []u8 = "";
var err: i64 = 0; var err: i64 = 0;
var err_msg: []u8 = ""; var err_msg: []u8 = "";
@ -156,25 +149,10 @@ const Proc = struct {
} else if (try m.match(.{ "stream", "stderr", "read_complete", tp.extract(&bytes) })) { } else if (try m.match(.{ "stream", "stderr", "read_complete", tp.extract(&bytes) })) {
try self.dispatch_stderr(bytes); try self.dispatch_stderr(bytes);
if (self.stream_stderr) |stream| stream.start_read() catch |e| return tp.exit_error(e, @errorReturnTrace()); if (self.stream_stderr) |stream| stream.start_read() catch |e| return tp.exit_error(e, @errorReturnTrace());
} else if (try m.match(.{ "stream", "stdin", "write_complete", tp.extract(&bytes_written) })) {
const old_buf = self.stdin_buffer.toOwnedSlice() catch |e| return tp.exit_error(e, @errorReturnTrace());
defer self.stdin_buffer.allocator.free(old_buf);
const bytes_left = old_buf[bytes_written..];
if (bytes_left.len > 0) {
try self.start_write(bytes_left);
} else {
self.write_pending = false;
if (self.stdin_close_pending)
self.stdin_close();
}
} else if (try m.match(.{ "stdin", tp.extract(&bytes) })) { } else if (try m.match(.{ "stdin", tp.extract(&bytes) })) {
try self.start_write(bytes); try self.start_write(bytes);
} else if (try m.match(.{"stdin_close"})) { } else if (try m.match(.{"stdin_close"})) {
if (self.write_pending) {
self.stdin_close_pending = true;
} else {
self.stdin_close(); self.stdin_close();
}
} else if (try m.match(.{"stdout_close"})) { } else if (try m.match(.{"stdout_close"})) {
if (self.child.stdout) |*fd| { if (self.child.stdout) |*fd| {
fd.close(); fd.close();
@ -188,9 +166,6 @@ const Proc = struct {
} else if (try m.match(.{"term"})) { } else if (try m.match(.{"term"})) {
const term_ = self.child.kill() catch |e| return tp.exit_error(e, @errorReturnTrace()); const term_ = self.child.kill() catch |e| return tp.exit_error(e, @errorReturnTrace());
return self.handle_term(term_); return self.handle_term(term_);
} else if (try m.match(.{ "stream", "stdin", "read_error", 109, tp.extract(&err_msg) })) {
// stdin closed
self.child.stdin = null;
} else if (try m.match(.{ "stream", "stdout", "read_error", 109, tp.extract(&err_msg) })) { } else if (try m.match(.{ "stream", "stdout", "read_error", 109, tp.extract(&err_msg) })) {
// stdout closed // stdout closed
self.child.stdout = null; self.child.stdout = null;
@ -204,11 +179,8 @@ const Proc = struct {
} }
fn start_write(self: *Proc, bytes: []const u8) tp.result { fn start_write(self: *Proc, bytes: []const u8) tp.result {
if (self.stream_stdin) |stream_stdin| { if (self.child.stdin) |stdin|
self.stdin_buffer.appendSlice(bytes) catch |e| return tp.exit_error(e, @errorReturnTrace()); stdin.writeAll(bytes) catch |e| return tp.exit_error(e, @errorReturnTrace());
stream_stdin.start_write(self.stdin_buffer.items) catch |e| return tp.exit_error(e, @errorReturnTrace());
self.write_pending = true;
}
} }
fn stdin_close(self: *Proc) void { fn stdin_close(self: *Proc) void {