feat: add file_stream and subprocess_windows

This commit is contained in:
CJ van den Berg 2024-06-07 22:09:46 +02:00
parent 0baeda5d16
commit 895d3dfb9e
11 changed files with 600 additions and 14 deletions

View file

@ -66,7 +66,7 @@ const auto MAX_THREAD =
const auto threads = min(sysconf(_SC_NPROCESSORS_ONLN), MAX_THREAD);
#else
namespace {
static long get_num_processors() {
static auto get_num_processors() -> long {
SYSTEM_INFO si;
GetSystemInfo(&si);
return si.dwNumberOfProcessors;
@ -481,11 +481,11 @@ namespace unx {
struct socket_impl {
explicit socket_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
socket_{*ctx->asio} {}
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{
*ctx->asio} {}
explicit socket_impl(strand &strand, int fd)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
socket_{*ctx->asio, fd} {}
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{*ctx->asio,
fd} {}
auto bind(string_view path) -> error_code {
error_code ec;
ec = socket_.bind(path, ec);
@ -610,12 +610,13 @@ void acceptor::close() { ref->close(); }
} // namespace unx
#if !defined(_WIN32)
namespace file_descriptor {
struct watcher_impl {
explicit watcher_impl(strand &strand, int fd)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
fd_{*ctx->asio, fd} {}
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, fd_{*ctx->asio,
fd} {}
void wait_read(watcher::handler h) {
if (!read_in_progress_) {
@ -672,6 +673,82 @@ void watcher::wait_write(watcher::handler h) { ref->wait_write(move(h)); }
void watcher::cancel() { ref->cancel(); }
} // namespace file_descriptor
#else
struct file_stream_impl {
explicit file_stream_impl(
strand &strand,
const asio::windows::stream_handle::native_handle_type &handle)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, handle_{*ctx->asio,
handle} {}
void start_read(file_stream::read_handler h) {
if (!read_in_progress_) {
read_in_progress_ = true;
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
handle_.async_read_some(
buffer(read_buffer_.data(), read_buffer_.size()),
bind_executor(strand_,
[c = ctx, b = &read_in_progress_, h = move(h),
t = move(weak_token), buf = read_buffer_.data()](
const error_code &ec, std::size_t bytes) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
*b = false;
h(ec, string_view(buf, bytes));
}
}));
}
}
void start_write(std::string_view data, file_stream::write_handler h) {
if (!write_in_progress_) {
write_in_progress_ = true;
write_buffer_.clear();
write_buffer_.assign(data.begin(), data.end());
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
handle_.async_write_some(
buffer(write_buffer_.data(), write_buffer_.size()),
bind_executor(strand_, [c = ctx, b = &write_in_progress_, h = move(h),
t = move(weak_token)](error_code ec,
size_t bytes_written) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
*b = false;
h(ec, bytes_written);
}
}));
}
}
void cancel() { handle_.cancel(); }
context_ref ctx;
io_context::strand strand_;
asio::windows::stream_handle handle_;
shared_ptr<bool> token_{make_shared<bool>(true)};
std::array<char, receive_buffer_size> read_buffer_{};
std::vector<char> write_buffer_{};
bool read_in_progress_{false};
bool write_in_progress_{false};
};
file_stream::file_stream(strand &strand, file_stream::handle_type handle)
: ref{file_stream_ref(new file_stream_impl(strand, handle),
[](file_stream_impl *p) { delete p; })} {}
void file_stream::start_read(file_stream::read_handler h) {
ref->start_read(move(h));
}
void file_stream::start_write(std::string_view data,
file_stream::write_handler h) {
ref->start_write(data, move(h));
}
void file_stream::cancel() { ref->cancel(); }
#endif
} // namespace thespian::executor