From 895d3dfb9ece213d614af97c9b636a0844e4e04b Mon Sep 17 00:00:00 2001 From: CJ van den Berg Date: Fri, 7 Jun 2024 22:09:46 +0200 Subject: [PATCH] feat: add file_stream and subprocess_windows --- build.zig | 1 + include/thespian/c/file_stream.h | 26 +++ include/thespian/file_descriptor.hpp | 5 +- include/thespian/file_stream.hpp | 34 ++++ src/c/file_descriptor.cpp | 4 + src/c/file_stream.cpp | 81 +++++++++ src/executor.hpp | 24 +++ src/executor_asio.cpp | 91 ++++++++++- src/instance.cpp | 75 ++++++++- src/subprocess_windows.zig | 236 +++++++++++++++++++++++++++ src/thespian.zig | 37 ++++- 11 files changed, 600 insertions(+), 14 deletions(-) create mode 100644 include/thespian/c/file_stream.h create mode 100644 include/thespian/file_stream.hpp create mode 100644 src/c/file_stream.cpp create mode 100644 src/subprocess_windows.zig diff --git a/build.zig b/build.zig index 94553ca..4c1fb66 100644 --- a/build.zig +++ b/build.zig @@ -50,6 +50,7 @@ pub fn build(b: *std.Build) void { "src/c/context.cpp", "src/c/env.cpp", "src/c/file_descriptor.cpp", + "src/c/file_stream.cpp", "src/c/handle.cpp", "src/c/instance.cpp", "src/c/metronome.cpp", diff --git a/include/thespian/c/file_stream.h b/include/thespian/c/file_stream.h new file mode 100644 index 0000000..b75ce3c --- /dev/null +++ b/include/thespian/c/file_stream.h @@ -0,0 +1,26 @@ +#pragma once + +#include // NOLINT + +#if defined(_WIN32) + +// NOLINTBEGIN(modernize-use-trailing-return-type) +#ifdef __cplusplus +extern "C" { +#endif + +struct thespian_file_stream_handle; +struct thespian_file_stream_handle *thespian_file_stream_create(const char *tag, + void *handle); +int thespian_file_stream_start_read(struct thespian_file_stream_handle *); +int thespian_file_stream_start_write(struct thespian_file_stream_handle *, + const char *, size_t); +int thespian_file_stream_cancel(struct thespian_file_stream_handle *); +void thespian_file_stream_destroy(struct thespian_file_stream_handle *); + +#ifdef __cplusplus +} +#endif +// NOLINTEND(modernize-use-trailing-return-type) + +#endif diff --git a/include/thespian/file_descriptor.hpp b/include/thespian/file_descriptor.hpp index c94c42e..9cf2028 100644 --- a/include/thespian/file_descriptor.hpp +++ b/include/thespian/file_descriptor.hpp @@ -1,8 +1,9 @@ #pragma once +#if !defined(_WIN32) + #include #include -#include namespace thespian { @@ -30,3 +31,5 @@ struct file_descriptor { }; } // namespace thespian + +#endif diff --git a/include/thespian/file_stream.hpp b/include/thespian/file_stream.hpp new file mode 100644 index 0000000..1101b15 --- /dev/null +++ b/include/thespian/file_stream.hpp @@ -0,0 +1,34 @@ +#pragma once + +#if defined(_WIN32) + +#include +#include + +namespace thespian { + +struct file_stream_impl; +using file_stream_dtor = void (*)(file_stream_impl *); +using file_stream_ref = std::unique_ptr; + +struct file_stream { + static auto create(std::string_view tag, void *handle) -> file_stream; + auto start_read() -> void; + auto start_write(std::string_view data) -> void; + auto cancel() -> void; + static void start_read(file_stream_impl *); + static void start_write(file_stream_impl *, std::string_view data); + static void cancel(file_stream_impl *); + static void destroy(file_stream_impl *); + + //->("stream", tag, "read_complete") + //->("stream", tag, "read_error", int err, string message) + //->("stream", tag, "write_complete", int bytes_written) + //->("stream", tag, "write_error", int err, string message) + + file_stream_ref ref; +}; + +} // namespace thespian + +#endif diff --git a/src/c/file_descriptor.cpp b/src/c/file_descriptor.cpp index bc3716a..7fbe4e1 100644 --- a/src/c/file_descriptor.cpp +++ b/src/c/file_descriptor.cpp @@ -1,3 +1,5 @@ +#if !defined(_WIN32) + #include #include #include @@ -74,3 +76,5 @@ void thespian_file_descriptor_destroy(thespian_file_descriptor_handle *p) { } } // NOLINTEND(*-reinterpret-cast, *-use-trailing-*) + +#endif diff --git a/src/c/file_stream.cpp b/src/c/file_stream.cpp new file mode 100644 index 0000000..6ef4d31 --- /dev/null +++ b/src/c/file_stream.cpp @@ -0,0 +1,81 @@ +#if defined(_WIN32) + +#include +#include +#include + +using thespian::file_stream; +using thespian::file_stream_impl; +using thespian::file_stream_ref; + +// NOLINTBEGIN(*-reinterpret-cast, *-use-trailing-*) +extern "C" { + +auto thespian_file_stream_create(const char *tag, void *handle) + -> thespian_file_stream_handle * { + try { + auto *p = file_stream::create(tag, handle).ref.release(); + return reinterpret_cast(p); + } catch (const std::exception &e) { + thespian_set_last_error(e.what()); + return nullptr; + } catch (...) { + thespian_set_last_error("unknown thespian_file_stream_create error"); + return nullptr; + } +} + +int thespian_file_stream_start_read(thespian_file_stream_handle *p) { + try { + file_stream::start_read(reinterpret_cast(p)); + return 0; + } catch (const std::exception &e) { + thespian_set_last_error(e.what()); + return -1; + } catch (...) { + thespian_set_last_error("unknown thespian_file_stream_start_read error"); + return -1; + } +} + +int thespian_file_stream_start_write(thespian_file_stream_handle *p, + const char *data, size_t size) { + try { + file_stream::start_write(reinterpret_cast(p), + std::string_view(data, size)); + return 0; + } catch (const std::exception &e) { + thespian_set_last_error(e.what()); + return -1; + } catch (...) { + thespian_set_last_error("unknown thespian_file_stream_start_write error"); + return -1; + } +} + +int thespian_file_stream_cancel(thespian_file_stream_handle *p) { + try { + file_stream::cancel(reinterpret_cast(p)); + return 0; + } catch (const std::exception &e) { + thespian_set_last_error(e.what()); + return -1; + } catch (...) { + thespian_set_last_error("unknown thespian_file_stream_start_read error"); + return -1; + } +} + +void thespian_file_stream_destroy(thespian_file_stream_handle *p) { + try { + file_stream::destroy(reinterpret_cast(p)); + } catch (const std::exception &e) { + thespian_set_last_error(e.what()); + } catch (...) { + thespian_set_last_error("unknown thespian_file_stream_destroy error"); + } +} +} +// NOLINTEND(*-reinterpret-cast, *-use-trailing-*) + +#endif diff --git a/src/executor.hpp b/src/executor.hpp index 29feb82..e2e043f 100644 --- a/src/executor.hpp +++ b/src/executor.hpp @@ -191,6 +191,8 @@ struct acceptor { } // namespace unx +#if !defined(_WIN32) + namespace file_descriptor { struct watcher_impl; @@ -209,4 +211,26 @@ struct watcher { } // namespace file_descriptor +#else + +struct file_stream_impl; +using file_stream_dtor = void (*)(file_stream_impl *); +using file_stream_ref = std::unique_ptr; + +struct file_stream { + using handle_type = void *; + using read_handler = + std::function; + using write_handler = + std::function; + + explicit file_stream(strand &, handle_type); + void start_read(read_handler); + void start_write(std::string_view data, write_handler); + void cancel(); + file_stream_ref ref; +}; + +#endif + } // namespace thespian::executor diff --git a/src/executor_asio.cpp b/src/executor_asio.cpp index 7577762..0dbb27c 100644 --- a/src/executor_asio.cpp +++ b/src/executor_asio.cpp @@ -66,7 +66,7 @@ const auto MAX_THREAD = const auto threads = min(sysconf(_SC_NPROCESSORS_ONLN), MAX_THREAD); #else namespace { -static long get_num_processors() { +static auto get_num_processors() -> long { SYSTEM_INFO si; GetSystemInfo(&si); return si.dwNumberOfProcessors; @@ -481,11 +481,11 @@ namespace unx { struct socket_impl { explicit socket_impl(strand &strand) - : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, - socket_{*ctx->asio} {} + : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{ + *ctx->asio} {} explicit socket_impl(strand &strand, int fd) - : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, - socket_{*ctx->asio, fd} {} + : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{*ctx->asio, + fd} {} auto bind(string_view path) -> error_code { error_code ec; ec = socket_.bind(path, ec); @@ -610,12 +610,13 @@ void acceptor::close() { ref->close(); } } // namespace unx #if !defined(_WIN32) + namespace file_descriptor { struct watcher_impl { explicit watcher_impl(strand &strand, int fd) - : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, - fd_{*ctx->asio, fd} {} + : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, fd_{*ctx->asio, + fd} {} void wait_read(watcher::handler h) { if (!read_in_progress_) { @@ -672,6 +673,82 @@ void watcher::wait_write(watcher::handler h) { ref->wait_write(move(h)); } void watcher::cancel() { ref->cancel(); } } // namespace file_descriptor + +#else + +struct file_stream_impl { + explicit file_stream_impl( + strand &strand, + const asio::windows::stream_handle::native_handle_type &handle) + : ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, handle_{*ctx->asio, + handle} {} + + void start_read(file_stream::read_handler h) { + if (!read_in_progress_) { + read_in_progress_ = true; + ctx->pending.fetch_add(1, memory_order_relaxed); + weak_ptr weak_token{token_}; + handle_.async_read_some( + buffer(read_buffer_.data(), read_buffer_.size()), + bind_executor(strand_, + [c = ctx, b = &read_in_progress_, h = move(h), + t = move(weak_token), buf = read_buffer_.data()]( + const error_code &ec, std::size_t bytes) { + c->pending.fetch_sub(1, memory_order_relaxed); + if (auto p = t.lock()) { + *b = false; + h(ec, string_view(buf, bytes)); + } + })); + } + } + + void start_write(std::string_view data, file_stream::write_handler h) { + if (!write_in_progress_) { + write_in_progress_ = true; + write_buffer_.clear(); + write_buffer_.assign(data.begin(), data.end()); + ctx->pending.fetch_add(1, memory_order_relaxed); + weak_ptr weak_token{token_}; + handle_.async_write_some( + buffer(write_buffer_.data(), write_buffer_.size()), + bind_executor(strand_, [c = ctx, b = &write_in_progress_, h = move(h), + t = move(weak_token)](error_code ec, + size_t bytes_written) { + c->pending.fetch_sub(1, memory_order_relaxed); + if (auto p = t.lock()) { + *b = false; + h(ec, bytes_written); + } + })); + } + } + + void cancel() { handle_.cancel(); } + + context_ref ctx; + io_context::strand strand_; + asio::windows::stream_handle handle_; + shared_ptr token_{make_shared(true)}; + std::array read_buffer_{}; + std::vector write_buffer_{}; + bool read_in_progress_{false}; + bool write_in_progress_{false}; +}; + +file_stream::file_stream(strand &strand, file_stream::handle_type handle) + : ref{file_stream_ref(new file_stream_impl(strand, handle), + [](file_stream_impl *p) { delete p; })} {} + +void file_stream::start_read(file_stream::read_handler h) { + ref->start_read(move(h)); +} +void file_stream::start_write(std::string_view data, + file_stream::write_handler h) { + ref->start_write(data, move(h)); +} +void file_stream::cancel() { ref->cancel(); } + #endif } // namespace thespian::executor diff --git a/src/instance.cpp b/src/instance.cpp index d235008..cc03122 100644 --- a/src/instance.cpp +++ b/src/instance.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -862,6 +863,7 @@ auto udp::create(string tag) -> udp { } #if !defined(_WIN32) + struct file_descriptor_impl { file_descriptor_impl(const file_descriptor_impl &) = delete; file_descriptor_impl(file_descriptor_impl &&) = delete; @@ -925,6 +927,75 @@ void file_descriptor::wait_write(file_descriptor_impl *p) { p->wait_write(); } void file_descriptor::wait_read(file_descriptor_impl *p) { p->wait_read(); } void file_descriptor::cancel(file_descriptor_impl *p) { p->cancel(); } void file_descriptor::destroy(file_descriptor_impl *p) { delete p; } + +#else + +struct file_stream_impl { + file_stream_impl(const file_stream_impl &) = delete; + file_stream_impl(file_stream_impl &&) = delete; + auto operator=(const file_stream_impl &) -> file_stream_impl & = delete; + auto operator=(file_stream_impl &&) -> file_stream_impl & = delete; + + file_stream_impl(string_view tag, void *handle) + : owner_(private_call()), strand_(owner_.get_strand()), + file_stream_(strand_, handle), tag_(tag) {} + + ~file_stream_impl() = default; + + void start_read() { + private_call(); + file_stream_.start_read([this, liffile_stream_elock{owner_.lifetime_}]( + error_code ec, string_view data) { + if (ec) { + auto _ = + owner_.send("stream", tag_, "read_error", ec.value(), ec.message()); + } else { + auto _ = owner_.send("stream", tag_, "read_complete", data); + } + }); + } + + void start_write(string_view data) { + private_call(); + file_stream_.start_write(data, [this, + liffile_stream_elock{owner_.lifetime_}]( + error_code ec, size_t bytes_written) { + if (ec) { + auto _ = + owner_.send("stream", tag_, "write_error", ec.value(), ec.message()); + } else { + auto _ = owner_.send("stream", tag_, "write_complete", bytes_written); + } + }); + } + + void cancel() { + private_call(); + file_stream_.cancel(); + } + + instance &owner_; + executor::strand strand_; + executor::file_stream file_stream_; + string tag_; +}; + +auto file_stream::create(string_view tag, void *handle) -> file_stream { + return {file_stream_ref(new file_stream_impl(tag, handle), + [](file_stream_impl *p) { delete p; })}; +} + +void file_stream::start_read() { ref->start_read(); } +void file_stream::start_write(string_view data) { ref->start_write(data); } +void file_stream::cancel() { ref->cancel(); } + +void file_stream::start_read(file_stream_impl *p) { p->start_read(); } +void file_stream::start_write(file_stream_impl *p, string_view data) { + p->start_write(data); +} +void file_stream::cancel(file_stream_impl *p) { p->cancel(); } +void file_stream::destroy(file_stream_impl *p) { delete p; } + #endif struct socket_impl { @@ -1665,8 +1736,8 @@ struct acceptor { string listen_path; acceptor(string_view path, mode m, handle o) - : a{::thespian::unx::acceptor::create(tag)}, owner{move(o)}, - listen_path{path} { + : a{::thespian::unx::acceptor::create(tag)}, owner{move(o)}, listen_path{ + path} { a.listen(path, m); } ~acceptor() = default; diff --git a/src/subprocess_windows.zig b/src/subprocess_windows.zig new file mode 100644 index 0000000..a32c006 --- /dev/null +++ b/src/subprocess_windows.zig @@ -0,0 +1,236 @@ +const std = @import("std"); +const cbor = @import("cbor"); +const tp = @import("thespian.zig"); + +pid: ?tp.pid, +stdin_behavior: std.process.Child.StdIo, + +const Self = @This(); +pub const max_chunk_size = 4096 - 32; +pub const Writer = std.io.Writer(*Self, error{Exit}, write); +pub const BufferedWriter = std.io.BufferedWriter(max_chunk_size, Writer); + +pub fn init(a: std.mem.Allocator, argv: tp.message, tag: [:0]const u8, stdin_behavior: std.process.Child.StdIo) !Self { + return .{ + .pid = try Proc.create(a, argv, tag, stdin_behavior), + .stdin_behavior = stdin_behavior, + }; +} + +pub fn deinit(self: *Self) void { + if (self.pid) |pid| { + pid.deinit(); + self.pid = null; + } +} + +pub fn write(self: *Self, bytes: []const u8) error{Exit}!usize { + try self.send(bytes); + return bytes.len; +} + +pub fn send(self: *const Self, bytes_: []const u8) tp.result { + if (self.stdin_behavior != .Pipe) return tp.exit("cannot send to closed stdin"); + const pid = if (self.pid) |pid| pid else return tp.exit_error(error.Closed); + var bytes = bytes_; + while (bytes.len > 0) + bytes = loop: { + if (bytes.len > max_chunk_size) { + try pid.send(.{ "stdin", bytes[0..max_chunk_size] }); + break :loop bytes[max_chunk_size..]; + } else { + try pid.send(.{ "stdin", bytes }); + break :loop &[_]u8{}; + } + }; +} + +pub fn close(self: *Self) tp.result { + defer self.deinit(); + if (self.stdin_behavior == .Pipe) + if (self.pid) |pid| if (!pid.expired()) try pid.send(.{"stdin_close"}); +} + +pub fn term(self: *Self) tp.result { + defer self.deinit(); + if (self.pid) |pid| if (!pid.expired()) try pid.send(.{"term"}); +} + +pub fn writer(self: *Self) Writer { + return .{ .context = self }; +} + +pub fn bufferedWriter(self: *Self) BufferedWriter { + return .{ .unbuffered_writer = self.writer() }; +} + +const Proc = struct { + a: std.mem.Allocator, + receiver: Receiver, + args: std.heap.ArenaAllocator, + parent: tp.pid, + child: std.process.Child, + tag: [:0]const u8, + stdin_buffer: std.ArrayList(u8), + stream_stdin: ?tp.file_stream = null, + stream_stdout: ?tp.file_stream = null, + stream_stderr: ?tp.file_stream = null, + write_pending: bool = false, + stdin_close_pending: bool = false, + + const Receiver = tp.Receiver(*Proc); + + fn create(a: std.mem.Allocator, argv: tp.message, tag: [:0]const u8, stdin_behavior: std.process.Child.StdIo) !tp.pid { + const self: *Proc = try a.create(Proc); + + var args = std.heap.ArenaAllocator.init(a); + const args_a = args.allocator(); + var iter = argv.buf; + var len = cbor.decodeArrayHeader(&iter) catch return error.InvalidArgument; + var argv_ = try args_a.alloc([]const u8, len); + var arg: []const u8 = undefined; + var i: usize = 0; + while (len > 0) : (len -= 1) { + if (!(cbor.matchString(&iter, &arg) catch return error.InvalidArgument)) + return error.InvalidArgument; + argv_[i] = try args_a.dupe(u8, arg); + i += 1; + } + + var child = std.process.Child.init(argv_, a); + child.stdin_behavior = stdin_behavior; + child.stdout_behavior = .Pipe; + child.stderr_behavior = .Pipe; + + self.* = .{ + .a = a, + .receiver = Receiver.init(receive, self), + .args = args, + .parent = tp.self_pid().clone(), + .child = child, + .tag = try a.dupeZ(u8, tag), + .stdin_buffer = std.ArrayList(u8).init(a), + }; + return tp.spawn_link(a, self, Proc.start, tag); + } + + fn deinit(self: *Proc) void { + self.args.deinit(); + if (self.stream_stdin) |stream| stream.deinit(); + if (self.stream_stdout) |stream| stream.deinit(); + if (self.stream_stderr) |stream| stream.deinit(); + self.stdin_buffer.deinit(); + self.parent.deinit(); + self.a.free(self.tag); + } + + fn start(self: *Proc) tp.result { + errdefer self.deinit(); + + self.child.spawn() catch |e| { + try self.parent.send(.{ self.tag, "term", e, 1 }); + return tp.exit_normal(); + }; + _ = self.args.reset(.free_all); + + if (self.child.stdin_behavior == .Pipe) + self.stream_stdin = tp.file_stream.init("stdin", self.child.stdin.?.handle) catch |e| return tp.exit_error(e); + self.stream_stdout = tp.file_stream.init("stdout", self.child.stdout.?.handle) catch |e| return tp.exit_error(e); + self.stream_stderr = tp.file_stream.init("stderr", self.child.stderr.?.handle) catch |e| return tp.exit_error(e); + if (self.stream_stdout) |stream| stream.start_read() catch |e| return tp.exit_error(e); + if (self.stream_stderr) |stream| stream.start_read() catch |e| return tp.exit_error(e); + + tp.receive(&self.receiver); + } + + fn receive(self: *Proc, _: tp.pid_ref, m: tp.message) tp.result { + errdefer self.deinit(); + var bytes: []u8 = ""; + var bytes_written: usize = 0; + var err: i64 = 0; + var err_msg: []u8 = ""; + if (try m.match(.{ "stream", "stdout", "read_complete", tp.extract(&bytes) })) { + try self.dispatch_stdout(bytes); + if (self.stream_stdout) |stream| stream.start_read() catch |e| return tp.exit_error(e); + } else if (try m.match(.{ "stream", "stderr", "read_complete", tp.extract(&bytes) })) { + try self.dispatch_stderr(bytes); + if (self.stream_stderr) |stream| stream.start_read() catch |e| return tp.exit_error(e); + } else if (try m.match(.{ "stream", "stdin", "write_complete", tp.extract(&bytes_written) })) { + const old_buf = self.stdin_buffer.toOwnedSlice() catch |e| return tp.exit_error(e); + defer self.stdin_buffer.allocator.free(old_buf); + const bytes_left = old_buf[bytes_written..]; + if (bytes_left.len > 0) { + try self.start_write(bytes_left); + } else { + self.write_pending = false; + if (self.stdin_close_pending) + self.stdin_close(); + } + } else if (try m.match(.{ "stdin", tp.extract(&bytes) })) { + try self.start_write(bytes); + } else if (try m.match(.{"stdin_close"})) { + if (self.write_pending) { + self.stdin_close_pending = true; + } else { + self.stdin_close(); + } + } else if (try m.match(.{"stdout_close"})) { + if (self.child.stdout) |*fd| { + fd.close(); + self.child.stdout = null; + } + } else if (try m.match(.{"stderr_close"})) { + if (self.child.stderr) |*fd| { + fd.close(); + self.child.stderr = null; + } + } else if (try m.match(.{"term"})) { + const term_ = self.child.kill() catch |e| return tp.exit_error(e); + return self.handle_term(term_); + } else if (try m.match(.{ "stream", tp.any, "read_error", tp.extract(&err), tp.extract(&err_msg) })) { + return tp.exit(err_msg); + } + } + + fn start_write(self: *Proc, bytes: []const u8) tp.result { + if (self.stream_stdin) |stream_stdin| { + self.stdin_buffer.appendSlice(bytes) catch |e| return tp.exit_error(e); + stream_stdin.start_write(self.stdin_buffer.items) catch |e| return tp.exit_error(e); + self.write_pending = true; + } + } + + fn stdin_close(self: *Proc) void { + if (self.child.stdin) |*fd| { + fd.close(); + self.child.stdin = null; + tp.env.get().trace(tp.message.fmt(.{ self.tag, "stdin", "closed" }).to(tp.message.c_buffer_type)); + } + } + + fn dispatch_stdout(self: *Proc, bytes: []const u8) tp.result { + if (bytes.len == 0) + return self.handle_terminate(); + try self.parent.send(.{ self.tag, "stdout", bytes }); + } + + fn dispatch_stderr(self: *Proc, bytes: []const u8) tp.result { + if (bytes.len == 0) + return; + try self.parent.send(.{ self.tag, "stderr", bytes }); + } + + fn handle_terminate(self: *Proc) tp.result { + return self.handle_term(self.child.wait() catch |e| return tp.exit_error(e)); + } + + fn handle_term(self: *Proc, term_: std.process.Child.Term) tp.result { + (switch (term_) { + .Exited => |val| self.parent.send(.{ self.tag, "term", "exited", val }), + .Signal => |val| self.parent.send(.{ self.tag, "term", "signal", val }), + .Stopped => |val| self.parent.send(.{ self.tag, "term", "stop", val }), + .Unknown => |val| self.parent.send(.{ self.tag, "term", "unknown", val }), + }) catch {}; + return tp.exit_normal(); + } +}; diff --git a/src/thespian.zig b/src/thespian.zig index 0c401da..b3441b2 100644 --- a/src/thespian.zig +++ b/src/thespian.zig @@ -1,6 +1,7 @@ const std = @import("std"); const c = @cImport({ @cInclude("thespian/c/file_descriptor.h"); + @cInclude("thespian/c/file_stream.h"); @cInclude("thespian/c/instance.h"); @cInclude("thespian/c/metronome.h"); @cInclude("thespian/c/timeout.h"); @@ -8,7 +9,9 @@ const c = @cImport({ @cInclude("thespian/backtrace.h"); }); const cbor = @import("cbor"); -pub const subprocess = @import("subprocess.zig"); +const builtin = @import("builtin"); + +pub const subprocess = if (builtin.os.tag == .windows) @import("subprocess_windows.zig") else @import("subprocess.zig"); pub const install_debugger = c.install_debugger; pub const max_message_size = 8 * 4096; @@ -616,7 +619,7 @@ pub const metronome = struct { const Self = @This(); pub fn init(tick_time_us: u64) !Self { - return .{ .handle = c.thespian_metronome_create_us(tick_time_us) orelse return error.ThespianMetronomeInitFailed }; + return .{ .handle = c.thespian_metronome_create_us(@intCast(tick_time_us)) orelse return error.ThespianMetronomeInitFailed }; } pub fn init_ms(tick_time_us: u64) !Self { @@ -642,11 +645,11 @@ pub const timeout = struct { const Self = @This(); pub fn init(tick_time_us: u64, m: message) !Self { - return .{ .handle = c.thespian_timeout_create_us(tick_time_us, m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed }; + return .{ .handle = c.thespian_timeout_create_us(@intCast(tick_time_us), m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed }; } pub fn init_ms(tick_time_us: u64, m: message) !Self { - return .{ .handle = c.thespian_timeout_create_ms(tick_time_us, m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed }; + return .{ .handle = c.thespian_timeout_create_ms(@intCast(tick_time_us), m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed }; } pub fn cancel(self: *const Self) !void { @@ -685,6 +688,32 @@ pub const file_descriptor = struct { } }; +pub const file_stream = struct { + handle: *c.struct_thespian_file_stream_handle, + + const Self = @This(); + + pub fn init(tag_: []const u8, handle: *anyopaque) !Self { + return .{ .handle = c.thespian_file_stream_create(tag_.ptr, handle) orelse return error.ThespianFileStreamInitFailed }; + } + + pub fn start_read(self: *const Self) !void { + return neg_to_error(c.thespian_file_stream_start_read(self.handle), error.ThespianFileStreamWaitReadFailed); + } + + pub fn start_write(self: *const Self, data: []const u8) !void { + return neg_to_error(c.thespian_file_stream_start_write(self.handle, data.ptr, data.len), error.ThespianFileStreamWaitWriteFailed); + } + + pub fn cancel(self: *const Self) !void { + return neg_to_error(c.thespian_file_stream_cancel(self.handle), error.ThespianFileStreamCancelFailed); + } + + pub fn deinit(self: *const Self) void { + c.thespian_file_stream_destroy(self.handle); + } +}; + const CallContext = struct { receiver: ReceiverT, to: pid_ref,