Initial Release

This commit is contained in:
CJ van den Berg 2024-02-08 22:23:40 +01:00
commit 5a00e06cb9
81 changed files with 12670 additions and 0 deletions

102
src/backtrace.cpp Normal file
View file

@ -0,0 +1,102 @@
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <sys/prctl.h>
#include <sys/wait.h>
#include <unistd.h>
static void msg(const char *msg, const char *arg) {
if (write(STDERR_FILENO, msg, strlen(msg)) != 0) {
}
if (write(STDERR_FILENO, arg, strlen(arg)) != 0) {
}
if (write(STDERR_FILENO, "\n", 1) != 0) {
}
}
static char binpath[512]; // NOLINT
static char pid_s[11]; // NOLINT
static void get_pid_binpath() {
pid_t pid = getpid();
sprintf(pid_s, "%d", pid); // NOLINT
size_t ret = readlink("/proc/self/exe", binpath, 511);
if (ret < 0)
return;
binpath[ret] = 0; // NOLINT
}
static const auto lldb{"/usr/bin/lldb"};
static const auto default_debugger{"/usr/bin/gdbserver"};
static auto get_debugger() -> const char * {
const char *debugger = secure_getenv("JITDEBUG");
if (not debugger)
return default_debugger;
if (strcmp(debugger, "on") == 0)
return default_debugger;
if (strcmp(debugger, "1") == 0)
return default_debugger;
if (strcmp(debugger, "true") == 0)
return default_debugger;
return debugger;
}
const char *const debugger = get_debugger();
void start_debugger(const char * dbg, const char **argv) {
#if defined(PR_SET_PTRACER)
prctl(PR_SET_PTRACER, PR_SET_PTRACER_ANY, 0, 0, 0); // NOLINT
#endif
int child_pid = fork();
if (!child_pid) {
dup2(2, 1); // redirect output to stderr
msg("debugging with ", dbg);
execv(dbg, const_cast<char *const *>(argv)); // NOLINT
_exit(1);
} else {
int stat(0);
waitpid(child_pid, &stat, 0);
}
}
extern "C" void sighdl_debugger(int no, siginfo_t * /*sigi*/, void * /*uco*/) {
get_pid_binpath();
const char *argv[] = {// NOLINT
debugger, "--attach", "[::1]:7777", pid_s, nullptr};
start_debugger(debugger, argv);
(void)raise(no);
}
extern "C" void sighdl_backtrace(int no, siginfo_t * /*sigi*/, void * /*uco*/) {
get_pid_binpath();
const char *argv[] = {// NOLINT
lldb, "--batch", "-p", pid_s,
"--one-line", "bt", binpath, nullptr};
start_debugger(lldb, argv);
(void)raise(no);
}
static void install_crash_handler(void (*hdlr)(int, siginfo_t *, void *)) {
struct sigaction action {};
sigemptyset(&action.sa_mask);
action.sa_flags = SA_SIGINFO | SA_RESETHAND;
#ifdef SA_FULLDUMP
action.sa_flags |= SA_FULLDUMP;
#endif
action.sa_sigaction = hdlr;
sigaction(SIGBUS, &action, nullptr);
sigaction(SIGSEGV, &action, nullptr);
sigaction(SIGABRT, &action, nullptr);
sigaction(SIGTRAP, &action, nullptr);
sigaction(SIGFPE, &action, nullptr);
}
extern "C" void install_debugger() { install_crash_handler(sighdl_debugger); }
extern "C" void install_backtrace() { install_crash_handler(sighdl_backtrace); }
extern "C" void install_jitdebugger() {
if (secure_getenv("JITDEBUG"))
install_debugger();
else
install_backtrace();
}

66
src/c/context.cpp Normal file
View file

@ -0,0 +1,66 @@
#include "thespian/env.hpp"
#include <__type_traits/is_swappable.h>
#include <thespian/c/context.h>
#include <thespian/c/handle.h>
#include <thespian/context.hpp>
#include <thespian/handle.hpp>
using std::string_view;
using thespian::context;
thread_local const char *last_error{}; // NOLINT
extern "C" {
auto thespian_get_last_error() -> const char * { return last_error; }
void thespian_set_last_error(const char *msg) { last_error = msg; }
thespian_context
thespian_context_create(thespian_context_destroy *d) { // NOLINT
return reinterpret_cast<thespian_context>( // NOLINT
context::create(reinterpret_cast<context::dtor *>(d))); // NOLINT
}
void thespian_context_run(thespian_context ctx) {
reinterpret_cast<context *>(ctx)->run(); // NOLINT
}
void thespian_context_on_last_exit(thespian_context ctx,
thespian_last_exit_handler h) {
reinterpret_cast<context *>(ctx)->on_last_exit(h); // NOLINT
}
auto thespian_context_spawn_link(thespian_context ctx_, thespian_behaviour b,
thespian_behaviour_state s,
thespian_exit_handler eh,
thespian_exit_handler_state es,
const char *name, thespian_env env,
thespian_handle *handle) -> int {
auto *ctx = reinterpret_cast<context *>(ctx_); // NOLINT
thespian::env_t empty_env_{};
thespian::env_t &env_ =
env ? *reinterpret_cast<thespian::env_t *>(env) : empty_env_; // NOLINT
auto ret = ctx->spawn_link(
[b, s]() -> thespian::result {
auto *ret = b(s);
if (ret) {
auto err = cbor::buffer();
const uint8_t *data = ret->base; // NOLINT
std::copy(data, data + ret->len, back_inserter(err)); // NOLINT
return thespian::to_error(err);
}
return thespian::ok();
},
[eh, es](auto e) {
if (eh)
eh(es, e.data(), e.size());
},
string_view(name), std::move(env_));
if (!ret)
return -1;
*handle = reinterpret_cast<thespian_handle>( // NOLINT
new thespian::handle{ret.value()});
return 0;
}
}

92
src/c/env.cpp Normal file
View file

@ -0,0 +1,92 @@
#include <cbor/cbor.hpp>
#include <thespian/c/env.h>
#include <thespian/c/handle.h>
#include <thespian/env.hpp>
#include <thespian/trace.hpp>
#include <cassert>
namespace {
auto to_env(thespian_env e) -> thespian::env_t & {
assert(e);
return *reinterpret_cast<thespian::env_t *>(e); // NOLINT
}
auto destroy(thespian_env e) -> void {
delete reinterpret_cast<thespian::env_t *>(e); // NOLINT
}
} // namespace
extern "C" {
void thespian_env_enable_all_channels(thespian_env e) {
to_env(e).enable_all_channels();
}
void thespian_env_disable_all_channels(thespian_env e) {
to_env(e).disable_all_channels();
}
void thespian_env_enable(thespian_env e, thespian_trace_channel c) {
to_env(e).enable(static_cast<thespian::channel>(c));
}
void thespian_env_disable(thespian_env e, thespian_trace_channel c) {
to_env(e).disable(static_cast<thespian::channel>(c));
}
auto thespian_env_enabled(thespian_env e, thespian_trace_channel c) -> bool {
return to_env(e).enabled(static_cast<thespian::channel>(c));
}
void thespian_env_on_trace(thespian_env e, thespian_trace_handler h) {
to_env(e).on_trace([h](const cbor::buffer &m) { h({m.data(), m.size()}); });
}
void thespian_env_trace(thespian_env e, cbor_buffer m) {
cbor::buffer buf;
const uint8_t *data = m.base;
std::copy(data, data + m.len, back_inserter(buf)); // NOLINT
to_env(e).trace(buf);
}
auto thespian_env_is(thespian_env e, c_string_view key) -> bool {
return to_env(e).is({key.base, key.len});
}
void thespian_env_set(thespian_env e, c_string_view key, bool value) {
to_env(e).is({key.base, key.len}) = value;
}
auto thespian_env_num(thespian_env e, c_string_view key) -> int64_t {
return to_env(e).num({key.base, key.len});
}
void thespian_env_num_set(thespian_env e, c_string_view key, int64_t value) {
to_env(e).num({key.base, key.len}) = value;
}
auto thespian_env_str(thespian_env e, c_string_view key) -> c_string_view {
auto &ret = to_env(e).str({key.base, key.len});
return {ret.data(), ret.size()};
}
void thespian_env_str_set(thespian_env e, c_string_view key,
c_string_view value) {
to_env(e).str({key.base, key.len}) = std::string_view{value.base, value.len};
}
auto thespian_env_proc(thespian_env e, c_string_view key) -> thespian_handle {
auto &ret = to_env(e).proc({key.base, key.len});
return reinterpret_cast<thespian_handle>(&ret); // NOLINT
}
void thespian_env_proc_set(thespian_env e, c_string_view key,
thespian_handle value) {
thespian::handle *h{reinterpret_cast<thespian::handle *>( // NOLINT
value)};
to_env(e).proc({key.base, key.len}) = *h;
}
auto thespian_env_clone(thespian_env e) -> thespian_env {
return reinterpret_cast<thespian_env>( // NOLINT
new thespian::env_t{to_env(e)});
}
void thespian_env_destroy(thespian_env e) { destroy(e); }
auto thespian_env_get() -> thespian_env {
return reinterpret_cast<thespian_env>(&thespian::env()); // NOLINT
}
auto thespian_env_new() -> thespian_env {
return reinterpret_cast<thespian_env>(new thespian::env_t); // NOLINT
}
}

76
src/c/file_descriptor.cpp Normal file
View file

@ -0,0 +1,76 @@
#include <thespian/c/context.h>
#include <thespian/c/file_descriptor.h>
#include <thespian/file_descriptor.hpp>
using thespian::file_descriptor;
using thespian::file_descriptor_impl;
using thespian::file_descriptor_ref;
// NOLINTBEGIN(*-reinterpret-cast, *-use-trailing-*)
extern "C" {
auto thespian_file_descriptor_create(const char *tag, int fd)
-> thespian_file_descriptor_handle * {
try {
auto *p = file_descriptor::create(tag, fd).ref.release();
return reinterpret_cast<thespian_file_descriptor_handle *>(p);
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_file_descriptor_create error");
return nullptr;
}
}
int thespian_file_descriptor_wait_write(thespian_file_descriptor_handle *p) {
try {
file_descriptor::wait_write(reinterpret_cast<file_descriptor_impl *>(p));
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error(
"unknown thespian_file_descriptor_wait_write error");
return -1;
}
}
int thespian_file_descriptor_wait_read(thespian_file_descriptor_handle *p) {
try {
file_descriptor::wait_read(reinterpret_cast<file_descriptor_impl *>(p));
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_file_descriptor_wait_read error");
return -1;
}
}
int thespian_file_descriptor_cancel(thespian_file_descriptor_handle *p) {
try {
file_descriptor::cancel(reinterpret_cast<file_descriptor_impl *>(p));
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_file_descriptor_wait_read error");
return -1;
}
}
void thespian_file_descriptor_destroy(thespian_file_descriptor_handle *p) {
try {
file_descriptor::destroy(reinterpret_cast<file_descriptor_impl *>(p));
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
} catch (...) {
thespian_set_last_error("unknown thespian_file_descriptor_destroy error");
}
}
}
// NOLINTEND(*-reinterpret-cast, *-use-trailing-*)

70
src/c/handle.cpp Normal file
View file

@ -0,0 +1,70 @@
#include <thespian/c/handle.h>
#include <thespian/handle.hpp>
#include <thespian/instance.hpp>
namespace {
static thread_local cbor::buffer
message_buffer; // NOLINT(*-avoid-non-const-global-variables)
static thread_local thespian_error
error_buffer; // NOLINT(*-avoid-non-const-global-variables)
auto to_thespian_result(thespian::result r) -> thespian_result {
if (r) {
return nullptr;
}
message_buffer = r.error();
error_buffer.base = message_buffer.data();
error_buffer.len = message_buffer.size();
return &error_buffer;
}
} // namespace
extern "C" {
auto thespian_handle_clone(thespian_handle h) -> thespian_handle {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
if (!h_)
return nullptr;
return reinterpret_cast<thespian_handle>( // NOLINT(*-reinterpret-cast)
new thespian::handle{*h_});
}
void thespian_handle_destroy(thespian_handle h) {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
delete h_;
}
auto thespian_handle_send_raw(thespian_handle h, cbor_buffer m)
-> thespian_result {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
cbor::buffer buf;
const uint8_t *data = m.base;
std::copy(data, data + m.len, // NOLINT(*-pointer-arithmetic)
back_inserter(buf));
return to_thespian_result(h_->send_raw(move(buf)));
}
auto thespian_handle_send_exit(thespian_handle h, c_string_view err)
-> thespian_result {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
return to_thespian_result(h_->exit(std::string(err.base, err.len)));
}
auto thespian_handle_is_expired(thespian_handle h) -> bool {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
return h_->expired();
}
}

65
src/c/instance.cpp Normal file
View file

@ -0,0 +1,65 @@
#include "cbor/c/cbor.h"
#include "thespian/c/handle.h"
#include <thespian/c/instance.h>
#include <thespian/instance.hpp>
using std::string_view;
extern "C" {
void thespian_receive(thespian_receiver r, thespian_behaviour_state s) {
thespian::receive([r, s](auto from, cbor::buffer msg) -> thespian::result {
thespian_handle from_handle = reinterpret_cast<thespian_handle>( // NOLINT
&from);
auto *ret = r(s, from_handle, {msg.data(), msg.size()});
if (ret) {
auto err = cbor::buffer();
const uint8_t *data = ret->base;
std::copy(data, data + ret->len, back_inserter(err)); // NOLINT
return thespian::to_error(err);
}
return thespian::ok();
});
}
auto thespian_get_trap() -> bool { return thespian::trap(); }
auto thespian_set_trap(bool on) -> bool { return thespian::trap(on); }
void thespian_link(thespian_handle h) {
thespian::handle *h_{
reinterpret_cast<thespian::handle *>( // NOLINT(*-reinterpret-cast)
h)};
thespian::link(*h_);
}
auto thespian_self() -> thespian_handle {
auto &self = thespian::self_ref();
return reinterpret_cast<thespian_handle>(&self); // NOLINT(*-reinterpret-cast)
}
auto thespian_spawn_link(thespian_behaviour b, thespian_behaviour_state s,
const char *name, thespian_env env,
thespian_handle *handle) -> int {
thespian::env_t empty_env_{};
thespian::env_t env_ =
env ? *reinterpret_cast<thespian::env_t *>(env) : empty_env_; // NOLINT
auto ret = spawn_link(
[b, s]() -> thespian::result {
auto *ret = b(s);
if (ret) {
auto err = cbor::buffer();
const uint8_t *data = ret->base; // NOLINT
std::copy(data, data + ret->len, back_inserter(err)); // NOLINT
return thespian::to_error(err);
}
return thespian::ok();
},
string_view(name), std::move(env_));
if (!ret)
return -1;
*handle = reinterpret_cast<thespian_handle>( // NOLINT
new thespian::handle{ret.value()});
return 0;
}
}

76
src/c/metronome.cpp Normal file
View file

@ -0,0 +1,76 @@
#include <thespian/c/context.h>
#include <thespian/c/metronome.h>
#include <thespian/metronome.hpp>
#include <chrono>
using std::chrono::microseconds;
using std::chrono::milliseconds;
using thespian::destroy_metronome;
using thespian::metronome_impl;
using thespian::metronome_ref;
using thespian::start_metronome;
using thespian::stop_metronome;
extern "C" {
auto thespian_metronome_create_ms(unsigned long ms)
-> thespian_metronome_handle * {
try {
auto *handle = thespian::create_metronome(milliseconds(ms)).ref.release();
return reinterpret_cast<thespian_metronome_handle *>(handle); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_metronome_create_ms error");
return nullptr;
}
}
auto thespian_metronome_create_us(unsigned long us)
-> thespian_metronome_handle * {
try {
auto *handle = thespian::create_metronome(microseconds(us)).ref.release();
return reinterpret_cast<thespian_metronome_handle *>(handle); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_metronome_create_us error");
return nullptr;
}
}
auto thespian_metronome_start(thespian_metronome_handle *handle) -> int {
try {
start_metronome(reinterpret_cast<metronome_impl *>(handle)); // NOLINT
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_metronome_start error");
return -1;
}
}
auto thespian_metronome_stop(thespian_metronome_handle *handle) -> int {
try {
stop_metronome(reinterpret_cast<metronome_impl *>(handle)); // NOLINT
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_metronome_stop error");
return -1;
}
}
void thespian_metronome_destroy(thespian_metronome_handle *handle) {
try {
destroy_metronome(reinterpret_cast<metronome_impl *>(handle)); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
} catch (...) {
thespian_set_last_error("unknown thespian_metronome_destroy error");
}
}
}

50
src/c/signal.cpp Normal file
View file

@ -0,0 +1,50 @@
#include <thespian/c/context.h>
#include <thespian/c/signal.h>
#include <thespian/signal.hpp>
using thespian::cancel_signal;
using thespian::destroy_signal;
using thespian::signal_impl;
using thespian::signal_ref;
extern "C" {
auto thespian_signal_create(int signum, cbor_buffer m)
-> thespian_signal_handle * {
try {
cbor::buffer buf;
const uint8_t *data = m.base;
std::copy(data, data + m.len, // NOLINT(*-pointer-arithmetic)
back_inserter(buf));
auto *handle = thespian::create_signal(signum, move(buf)).ref.release();
return reinterpret_cast<thespian_signal_handle *>(handle); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_signal_create error");
return nullptr;
}
}
auto thespian_signal_cancel(thespian_signal_handle *handle) -> int {
try {
cancel_signal(reinterpret_cast<signal_impl *>(handle)); // NOLINT
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_signal_start error");
return -1;
}
}
void thespian_signal_destroy(thespian_signal_handle *handle) {
try {
destroy_signal(reinterpret_cast<signal_impl *>(handle)); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
} catch (...) {
thespian_set_last_error("unknown thespian_signal_destroy error");
}
}
}

73
src/c/timeout.cpp Normal file
View file

@ -0,0 +1,73 @@
#include <thespian/c/context.h>
#include <thespian/c/timeout.h>
#include <thespian/timeout.hpp>
#include <chrono>
using std::chrono::microseconds;
using std::chrono::milliseconds;
using thespian::cancel_timeout;
using thespian::destroy_timeout;
using thespian::timeout_impl;
using thespian::timeout_ref;
extern "C" {
auto thespian_timeout_create_ms(unsigned long ms, cbor_buffer m)
-> thespian_timeout_handle * {
try {
cbor::buffer buf;
const uint8_t *data = m.base;
std::copy(data, data + m.len, // NOLINT(*-pointer-arithmetic)
back_inserter(buf));
auto *handle =
thespian::create_timeout(milliseconds(ms), move(buf)).ref.release();
return reinterpret_cast<thespian_timeout_handle *>(handle); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_timeout_create_ms error");
return nullptr;
}
}
auto thespian_timeout_create_us(unsigned long us, cbor_buffer m)
-> thespian_timeout_handle * {
try {
cbor::buffer buf;
const uint8_t *data = m.base;
std::copy(data, data + m.len, // NOLINT(*-pointer-arithmetic)
back_inserter(buf));
auto *handle =
thespian::create_timeout(microseconds(us), move(buf)).ref.release();
return reinterpret_cast<thespian_timeout_handle *>(handle); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return nullptr;
} catch (...) {
thespian_set_last_error("unknown thespian_timeout_create_us error");
return nullptr;
}
}
auto thespian_timeout_cancel(thespian_timeout_handle *handle) -> int {
try {
cancel_timeout(reinterpret_cast<timeout_impl *>(handle)); // NOLINT
return 0;
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
return -1;
} catch (...) {
thespian_set_last_error("unknown thespian_timeout_start error");
return -1;
}
}
void thespian_timeout_destroy(thespian_timeout_handle *handle) {
try {
destroy_timeout(reinterpret_cast<timeout_impl *>(handle)); // NOLINT
} catch (const std::exception &e) {
thespian_set_last_error(e.what());
} catch (...) {
thespian_set_last_error("unknown thespian_timeout_destroy error");
}
}
}

25
src/c/trace.cpp Normal file
View file

@ -0,0 +1,25 @@
#include <thespian/c/trace.h>
#include <thespian/trace.hpp>
extern "C" {
void thespian_on_trace(thespian_trace_handler h) {
thespian::on_trace([h](const cbor::buffer &m) { h({m.data(), m.size()}); });
}
void thespian_trace_to_json_file(const char *file_name) {
thespian::trace_to_json_file(file_name);
}
void thespian_trace_to_cbor_file(const char *file_name) {
thespian::trace_to_cbor_file(file_name);
}
void thespian_trace_to_mermaid_file(const char *file_name) {
thespian::trace_to_mermaid_file(file_name);
}
void thespian_trace_to_trace(thespian_trace_channel default_channel) {
thespian::trace_to_trace(default_channel);
}
}

926
src/cbor.cpp Normal file
View file

@ -0,0 +1,926 @@
#include <cbor/c/cbor.h>
#include <cbor/cbor.hpp>
#include <array>
#include <iomanip>
#include <limits>
#include <sstream>
#include <string>
#include <string_view>
#include <tuple>
using std::back_inserter;
using std::copy;
using std::domain_error;
using std::hex;
using std::make_tuple;
using std::numeric_limits;
using std::ostream;
using std::setfill;
using std::setw;
using std::string;
using std::string_view;
using std::stringstream;
using std::tuple;
namespace cbor {
const buffer buffer::null_value = buffer{0xF6};
auto buffer::push_typed_val(int type, uint64_t value) -> void {
type <<= 5;
if (value < 24ULL) {
push_back(type | value);
} else if (value < 256ULL) {
push_back(type | 24);
push_back(value);
} else if (value < 65536ULL) {
push_back(type | 25);
push_back(value >> 8);
push_back(value);
} else if (value < 4294967296ULL) {
push_back(type | 26);
push_back(value >> 24);
push_back(value >> 16);
push_back(value >> 8);
push_back(value);
} else {
push_back(type | 27);
push_back(value >> 56);
push_back(value >> 48);
push_back(value >> 40);
push_back(value >> 32);
push_back(value >> 24);
push_back(value >> 16);
push_back(value >> 8);
push_back(value);
}
}
auto buffer::push_string(const string &s) -> buffer & {
push_typed_val(3, s.size());
copy(s.begin(), s.end(), back_inserter(*this));
return *this;
}
auto buffer::push_string(const string_view &s) -> buffer & {
push_typed_val(3, s.size());
copy(s.begin(), s.end(), back_inserter(*this));
return *this;
}
using json_iter = string::const_iterator;
static auto match_wsp_char(json_iter &b, const json_iter &e) -> bool {
if (b == e)
return false;
char c = *b;
switch (c) {
case ' ':
case '\t':
case '\r':
case '\n':
++b;
return true;
default:
return false;
}
}
static auto skip_wsp(json_iter &b, const json_iter &e) -> void {
while (match_wsp_char(b, e))
;
}
static auto match_char(int c, json_iter &b, const json_iter &e) -> bool {
if (b == e || *b != c)
return false;
++b;
return true;
}
// NOLINTBEGIN(*-pointer-arithmetic)
static auto match_literal(const char *str, json_iter &b, const json_iter &e)
-> bool {
json_iter i = b;
while (*str && match_char(*str, i, e))
++str;
if (!*str) {
b = i;
return true;
}
return false;
}
constexpr auto int_str_max = 3 * sizeof(uint64_t) + 1;
static auto push_json_number(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
const bool neg = match_char('-', b, e);
std::array<char, int_str_max> s{};
char *p = s.data();
char *se = s.data() + int_str_max;
if (b == e)
return false;
char c = *b;
if (c >= '0' && c <= '9') {
++b;
*p = c;
++p;
if (p == se)
return false;
} else
return false;
while (true) {
if (b == e)
break;
c = *b;
if (c >= '0' && c <= '9') {
++b;
*p = c;
++p;
if (p == se)
return false;
} else {
break;
}
}
*p = 0;
char *ep = s.data();
int64_t i = strtoll(s.data(), &ep, 10);
if (ep != p)
return false;
if (neg)
i = -i;
buf.push_int(i);
return true;
}
// NOLINTEND(*-pointer-arithmetic)
static auto push_json_qstring_slow(buffer &buf, json_iter &b,
const json_iter &e) -> bool {
if (!match_char('"', b, e))
return false;
string s;
while (true) {
if (b == e)
return false;
char c = *b;
++b;
if (c == '\\') {
if (b == e)
return false;
c = *b;
++b;
switch (c) {
case 'b':
c = '\b';
break;
case 'f':
c = '\f';
break;
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case '\\':
c = '\\';
break;
default:
break;
}
} else if (c == '"') {
buf.push_string(s);
return true;
}
s.push_back(c);
}
}
static auto scan_json_qstring(json_iter &b, json_iter &e, bool &is_escaped)
-> bool {
if (!match_char('"', b, e))
return false;
json_iter i = b;
is_escaped = false;
while (true) {
if (i == e)
return false;
char c = *i;
if (c == '\\') {
is_escaped = true;
return true;
}
if (c == '"') {
e = i;
return true;
}
++i;
}
}
static auto push_json_qstring(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
json_iter sb = b;
json_iter se = e;
bool is_escaped{false};
if (!scan_json_qstring(sb, se, is_escaped))
return false;
if (is_escaped)
return push_json_qstring_slow(buf, b, e);
b = se;
++b;
buf.push_string(string_view(&*sb, (&*se - &*sb)));
return true;
}
static auto push_json_keyword(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
if (match_literal("false", b, e))
buf.push_bool(false);
else if (match_literal("true", b, e))
buf.push_bool(true);
else if (match_literal("null", b, e))
buf.push_null();
else
return false;
return true;
}
static auto push_json_value(buffer &buf, json_iter &b, const json_iter &e)
-> bool;
static auto push_json_array(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
if (!match_char('[', b, e))
return false;
skip_wsp(b, e);
buffer inner;
size_t count{0};
if (!push_json_value(inner, b, e))
return false;
++count;
while (true) {
skip_wsp(b, e);
if (b == e)
return false;
char c = *b;
switch (c) {
case ',':
++b;
skip_wsp(b, e);
continue;
case ']':
++b;
inner.to_cbor(buf.array_header(count));
return true;
default:
if (!push_json_value(inner, b, e))
return false;
++count;
}
}
}
static auto push_json_map(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
if (!match_char('{', b, e))
return false;
skip_wsp(b, e);
buffer inner;
size_t count{0};
if (!push_json_value(inner, b, e))
return false;
++count;
while (true) {
skip_wsp(b, e);
if (b == e)
return false;
char c = *b;
switch (c) {
case ':':
case ',':
++b;
skip_wsp(b, e);
continue;
case '}':
++b;
if (count % 2)
return false;
inner.to_cbor(buf.map_header(count / 2));
return true;
default:
if (!push_json_value(inner, b, e))
return false;
++count;
}
}
}
static auto push_json_value(buffer &buf, json_iter &b, const json_iter &e)
-> bool {
skip_wsp(b, e);
if (b == e)
return false;
char c = *b;
if ((c >= '0' && c <= '9') || c == '-') { // integer
return push_json_number(buf, b, e);
}
if ((c >= 'A' && c <= 'Z') || (c >= 'z' && c <= 'z')) { // keyword
return push_json_keyword(buf, b, e);
}
switch (c) {
case '"': // string
return push_json_qstring(buf, b, e);
case '[': // array
return push_json_array(buf, b, e);
case '{': // map
return push_json_map(buf, b, e);
default:
break;
}
return false;
}
auto buffer::push_json(const string &s) -> void {
json_iter b = s.cbegin();
if (!push_json_value(*this, b, s.cend())) {
stringstream ss;
ss << "invalid json value at pos " << std::distance(s.cbegin(), b)
<< " in: " << s;
throw domain_error{ss.str()};
}
}
using iter = buffer::const_iterator;
static auto decode_int_length_recurse(size_t length, iter &b, const iter &e,
int64_t i) -> int64_t {
if (b == e)
throw domain_error{"cbor message too short"};
i |= *b;
++b;
if (length == 1)
return i;
i <<= 8;
return decode_int_length_recurse(length - 1, b, e, i);
};
static auto decode_int_length(size_t length, iter &b, const iter &e)
-> int64_t {
return decode_int_length_recurse(length, b, e, 0);
}
static auto decode_pint(uint8_t type, iter &b, const iter &e) -> uint64_t {
if (type < 24)
return type;
switch (type) {
case 24: // 1 byte
return decode_int_length(1, b, e);
case 25: // 2 byte
return decode_int_length(2, b, e);
case 26: // 4 byte
return decode_int_length(4, b, e);
case 27: // 8 byte
return decode_int_length(8, b, e);
default:
throw domain_error{"cbor invalid integer type"};
}
}
static auto decode_nint(uint8_t type, iter &b, const iter &e) -> int64_t {
return -(decode_pint(type, b, e) + 1); // NOLINT(*-narrowing-conversions)
}
static auto decode_string(uint8_t type, iter &b, const iter &e) -> string_view {
auto len = decode_pint(type, b, e);
const uint8_t *s = &*b;
const auto sz = len;
while (len) {
if (b == e)
throw domain_error{"cbor message too short"};
++b;
--len;
}
return {
reinterpret_cast< // NOLINT(cppcoreguidelines-pro-type-reinterpret-cast)
const char *>(s),
sz};
}
static auto decode_bytes(uint8_t type, iter &b, const iter &e) -> string_view {
return decode_string(type, b, e);
}
static auto decode_type(iter &b, const iter &e)
-> tuple<uint8_t, uint8_t, uint8_t> {
if (b == e)
throw domain_error{"cbor message too short"};
uint8_t type = *b;
++b;
return make_tuple(uint8_t(type >> 5), uint8_t(type & 31), type);
}
auto buffer::decode_range_header(iter &b, const iter &e) -> size_t {
const auto [major, minor, type] = decode_type(b, e);
if (type == 0xf6)
return 0;
auto sz = decode_pint(minor, b, e);
switch (major) {
case 4: // array
return sz;
case 5: // map
return sz * 2;
default:
throw domain_error{"cbor unexpected type (expected array or map)"};
}
}
auto buffer::decode_array_header(iter &b, const iter &e) -> size_t {
const auto [major, minor, type] = decode_type(b, e);
if (type == 0xf6)
return 0;
if (major != 4)
throw domain_error{"cbor unexpected type (expected array)"};
return decode_pint(minor, b, e);
}
auto buffer::decode_map_header(iter &b, const iter &e) -> size_t {
const auto [major, minor, type] = decode_type(b, e);
if (type == 0xf6)
return 0;
if (major != 5)
throw domain_error{"cbor unexpected type (expected map)"};
return decode_pint(minor, b, e);
}
static auto skip_string(uint8_t type, iter &b, const iter &e) -> void {
auto len = decode_pint(type, b, e);
while (len) {
if (b == e)
throw domain_error{"cbor message too short"};
++b;
--len;
}
}
static auto skip_bytes(uint8_t type, iter &b, const iter &e) -> void {
return skip_string(type, b, e);
}
static auto skip_array(uint8_t type, iter &b, const iter &e) -> void;
static auto skip_map(uint8_t type, iter &b, const iter &e) -> void;
static auto skip_value_type(uint8_t major, uint8_t minor, iter &b,
const iter &e) -> void {
switch (major) {
case 0: // positive integer
decode_pint(minor, b, e);
break;
case 1: // negative integer
decode_nint(minor, b, e);
break;
case 2: // bytes
skip_bytes(minor, b, e);
break;
case 3: // string
skip_string(minor, b, e);
break;
case 4: // array
skip_array(minor, b, e);
break;
case 5: // map
skip_map(minor, b, e);
break;
case 6: // tag
throw domain_error{"cbor unsupported type tag"};
case 7: // special
break;
default:
throw domain_error{"cbor unsupported type unknown"};
}
}
static auto skip_value(iter &b, const iter &e) -> void {
const auto [major, minor, type] = decode_type(b, e);
skip_value_type(major, minor, b, e);
}
static auto skip_array(uint8_t type, iter &b, const iter &e) -> void {
auto len = decode_pint(type, b, e);
while (len) {
skip_value(b, e);
--len;
}
}
static auto skip_map(uint8_t type, iter &b, const iter &e) -> void {
auto len = decode_pint(type, b, e);
len = len * 2;
while (len) {
skip_value(b, e);
--len;
}
}
static auto match_type(iter &b, const iter &e, type &v) -> bool {
const auto [major, minor, type] = decode_type(b, e);
skip_value_type(major, minor, b, e);
switch (major) {
case 0: // positive integer
case 1: // negative integer
v = type::number;
break;
case 2: // bytes
v = type::bytes;
break;
case 3: // string
v = type::string;
break;
case 4: // array
v = type::array;
break;
case 5: // map
v = type::map;
break;
case 7: // special
if (type == 0xf6)
v = type::null;
else if (type == 0xf4 || type == 0xf5)
v = type::boolean;
break;
default:
return false;
}
return true;
}
auto buffer::match_value(iter &b, const iter &e, type t) -> bool {
type v{type::any};
if (match_type(b, e, v)) {
if (t == type::any)
return true;
return t == v;
}
return false;
}
static auto match_uint(iter &b, const iter &e, uint64_t &val) -> bool {
const auto [major, minor, type] = decode_type(b, e);
if (major != 0)
return false;
val = decode_pint(minor, b, e);
return true;
}
static auto match_int(iter &b, const iter &e, int64_t &val) -> bool {
const auto [major, minor, type] = decode_type(b, e);
switch (major) {
case 0: // positive integer
val = decode_pint(minor, b, e); // NOLINT(*-narrowing-conversions)
if (val < 0)
return false;
break;
case 1: // negative integer
val = decode_nint(minor, b, e);
break;
default:
return false;
}
return true;
}
auto buffer::match_value(iter &b, const iter &e, int64_t lit) -> bool {
int64_t val{0};
if (match_int(b, e, val))
return val == lit;
return false;
}
static auto match_bool(iter &b, const iter &e, bool &v) -> bool {
const auto [major, minor, type] = decode_type(b, e);
if (major == 7) { // special
if (type == 0xf4) {
v = false;
return true;
}
if (type == 0xf5) {
v = true;
return true;
}
}
return false;
}
auto buffer::match_value(iter &b, const iter &e, bool lit) -> bool {
bool val{};
if (match_bool(b, e, val))
return val == lit;
return false;
}
static auto match_string(iter &b, const iter &e, string_view &val) -> bool {
const auto [major, minor, type] = decode_type(b, e);
switch (major) {
case 2: // bytes
val = decode_bytes(minor, b, e);
break;
case 3: // string
val = decode_string(minor, b, e);
break;
default:
return false;
}
return true;
}
auto buffer::match_value(iter &b, const iter &e, const string_view lit)
-> bool {
string_view val;
if (match_string(b, e, val))
return val == lit;
return false;
}
auto buffer::match_value(iter &b, const iter &e, const string &lit) -> bool {
string_view val;
if (match_string(b, e, val))
return val == lit;
return false;
}
auto extract(type &t) -> buffer::extractor {
return [&t](iter &b, const iter &e) { return match_type(b, e, t); };
}
template <typename T> static auto extract_int(T &i) -> buffer::extractor {
return [&i](iter &b, const iter &e) {
int64_t i_{};
if (match_int(b, e, i_)) {
if (i_ < int64_t(numeric_limits<T>::min()) or
i_ > int64_t(numeric_limits<T>::max()))
return false;
i = i_;
return true;
}
return false;
};
}
auto extract(int64_t &i) -> buffer::extractor { return extract_int(i); }
auto extract(int32_t &i) -> buffer::extractor { return extract_int(i); }
auto extract(int16_t &i) -> buffer::extractor { return extract_int(i); }
auto extract(int8_t &i) -> buffer::extractor { return extract_int(i); }
auto extract(uint64_t &i) -> buffer::extractor {
return [&i](iter &b, const iter &e) { return match_uint(b, e, i); };
}
template <typename T> static auto extract_uint(T &i) -> buffer::extractor {
return [&i](iter &b, const iter &e) {
uint64_t i_{};
if (match_uint(b, e, i_)) {
if (i_ > uint64_t(numeric_limits<T>::max()))
return false;
i = i_;
return true;
}
return false;
};
}
auto extract(unsigned long long &i) -> buffer::extractor {
return extract_uint(i);
}
auto extract(uint32_t &i) -> buffer::extractor { return extract_uint(i); }
auto extract(uint16_t &i) -> buffer::extractor { return extract_uint(i); }
auto extract(uint8_t &i) -> buffer::extractor { return extract_uint(i); }
auto extract(bool &val) -> buffer::extractor {
return [&val](iter &b, const iter &e) { return match_bool(b, e, val); };
}
auto extract(std::string &s) -> buffer::extractor {
return [&s](iter &b, const iter &e) {
string_view val;
if (match_string(b, e, val)) {
s.assign(val.data(), val.size());
return true;
}
return false;
};
}
auto extract(string_view &s) -> buffer::extractor {
return [&s](iter &b, const iter &e) { return match_string(b, e, s); };
}
auto extract(buffer::range &r) -> buffer::extractor {
return [&r](iter &b, const iter &e) {
auto r_b = b;
type v{type::any};
if (match_type(b, e, v) &&
(v == type::null || v == type::array || v == type::map)) {
r.b_ = r_b;
r.e_ = b;
return true;
}
return false;
};
}
auto buffer::match_value(iter &b, const iter &e, const extractor &ex) -> bool {
return ex(b, e);
}
static auto tohex(ostream &os, uint8_t v) -> ostream & {
return os << hex << setfill('0') << setw(2) << static_cast<unsigned>(v);
}
static auto to_json(ostream &os, string_view s) -> ostream & {
os << '"';
for (auto c : s) {
switch (c) {
case '\b':
os << "\\b";
break;
case '\f':
os << "\\f";
break;
case '\n':
os << "\\n";
break;
case '\r':
os << "\\r";
break;
case '\t':
os << "\\t";
break;
case '\\':
os << "\\\\";
break;
case '"':
os << "\\\"";
break;
default:
if (c >= 0 && c <= 0x1f) {
os << "\\u00";
tohex(os, c);
} else {
os << c;
}
}
}
os << '"';
return os;
}
static auto to_json_stream(ostream &ss, iter &b, const iter &e) -> void {
const auto [major, minor, type] = decode_type(b, e);
switch (major) {
case 0: // positive integer
{
const auto i = decode_pint(minor, b, e);
ss << i;
} break;
case 1: // negative integer
{
const auto i = decode_nint(minor, b, e);
ss << i;
} break;
case 2: // bytes
{
const string_view v = decode_bytes(minor, b, e);
to_json(ss, v);
} break;
case 3: // string
{
const string_view v = decode_string(minor, b, e);
to_json(ss, v);
} break;
case 4: // array
{
auto i = decode_pint(minor, b, e);
ss << '[';
while (i) {
to_json_stream(ss, b, e);
--i;
if (i)
ss << ',';
}
ss << ']';
} break;
case 5: // map
{
auto i = decode_pint(minor, b, e);
ss << '{';
while (i) {
to_json_stream(ss, b, e);
ss << ':';
to_json_stream(ss, b, e);
--i;
if (i)
ss << ',';
}
ss << '}';
} break;
case 6: // tag
throw domain_error{"cbor unsupported type tag"};
case 7: // special
{
if (type == 0xf4)
ss << "false";
else if (type == 0xf5)
ss << "true";
else if (type == 0xf6)
ss << "null";
} break;
default:
throw domain_error{"cbor unsupported type unknown"};
}
}
static auto to_json_(const iter &b_, const iter &e) -> string {
stringstream ss;
iter b = b_;
try {
to_json_stream(ss, b, e);
} catch (const domain_error &e) {
throw domain_error{string("cbor to json failed: ") + e.what() +
"\nafter:\n" + ss.str()};
}
return ss.str();
}
auto buffer::to_json() const -> string {
return to_json_(raw_cbegin(), raw_cend());
}
auto buffer::range::to_json() const -> string {
return to_json_(raw_cbegin(), raw_cend());
}
auto buffer::to_json(ostream &os) const -> void {
auto b = raw_cbegin();
return to_json_stream(os, b, raw_cend());
}
auto buffer::range::to_json(ostream &os) const -> void {
auto b = raw_cbegin();
return to_json_stream(os, b, raw_cend());
}
extern "C" void cbor_to_json(cbor_buffer buf, cbor_to_json_callback cb) {
auto cbor = cbor::buffer();
const uint8_t *data = buf.base;
std::copy(data, data + buf.len, back_inserter(cbor)); // NOLINT(*-pointer-arithmetic)
auto json = cbor.to_json();
cb({json.data(), json.size()});
}
auto buffer::hexdump() const -> string {
stringstream ss;
ss << size() << ':';
for (auto c : static_cast<const buffer_base &>(*this)) {
ss << ' ';
tohex(ss, c);
}
return ss.str();
}
auto buffer::value_accessor::type_() const -> type {
type t{};
iter b_ = b;
if (not match_value(b_, e, extract(t)))
return type::unknown;
return t;
}
template <typename T> static auto get(iter b, const iter &e) -> T {
T val;
extract(val)(b, e);
return val;
}
buffer::value_accessor::operator type() const { return get<type>(b, e); }
buffer::value_accessor::operator int64_t() const { return get<int64_t>(b, e); }
buffer::value_accessor::operator uint64_t() const { return get<int64_t>(b, e); }
buffer::value_accessor::operator int32_t() const {
return get<int64_t>(b, e); // NOLINT(*-narrowing-conversions)
}
buffer::value_accessor::operator uint32_t() const { return get<int64_t>(b, e); }
buffer::value_accessor::operator int16_t() const {
return get<int64_t>(b, e); // NOLINT(*-narrowing-conversions)
}
buffer::value_accessor::operator uint16_t() const { return get<int64_t>(b, e); }
buffer::value_accessor::operator int8_t() const {
return get<int64_t>(b, e); // NOLINT(*-narrowing-conversions)
}
buffer::value_accessor::operator uint8_t() const { return get<int64_t>(b, e); }
buffer::value_accessor::operator bool() const { return get<bool>(b, e); }
buffer::value_accessor::operator string_view() const {
return get<string_view>(b, e);
}
buffer::value_accessor::operator buffer::range() const {
return get<buffer::range>(b, e);
}
} // namespace cbor

914
src/cbor.zig Normal file
View file

@ -0,0 +1,914 @@
const std = @import("std");
const eql = std.mem.eql;
const bufPrint = std.fmt.bufPrint;
const fixedBufferStream = std.io.fixedBufferStream;
const maxInt = std.math.maxInt;
const minInt = std.math.minInt;
const json = std.json;
const fba = std.heap.FixedBufferAllocator;
pub const CborError = error{
CborIntegerTooLarge,
CborIntegerTooSmall,
CborInvalidType,
CborTooShort,
OutOfMemory,
};
pub const CborJsonError = error{
BufferUnderrun,
CborIntegerTooLarge,
CborIntegerTooSmall,
CborInvalidType,
CborTooShort,
CborUnsupportedType,
NoSpaceLeft,
OutOfMemory,
Overflow,
SyntaxError,
UnexpectedEndOfInput,
};
const cbor_magic_null: u8 = 0xf6;
const cbor_magic_true: u8 = 0xf5;
const cbor_magic_false: u8 = 0xf4;
const cbor_magic_type_array: u8 = 4;
const cbor_magic_type_map: u8 = 5;
const value_type = enum(u8) {
number,
bytes,
string,
array,
map,
tag,
boolean,
null,
any,
more,
unknown,
};
pub const number = value_type.number;
pub const bytes = value_type.bytes;
pub const string = value_type.string;
pub const array = value_type.array;
pub const map = value_type.map;
pub const tag = value_type.tag;
pub const boolean = value_type.boolean;
pub const null_ = value_type.null;
pub const any = value_type.any;
pub const more = value_type.more;
const null_value_buf = [_]u8{0xF6};
pub const null_value: []const u8 = &null_value_buf;
pub fn isNull(val: []const u8) bool {
return eql(u8, val, null_value);
}
fn isAny(value: anytype) bool {
return if (comptime @TypeOf(value) == value_type) value == value_type.any else false;
}
fn isMore(value: anytype) bool {
return if (comptime @TypeOf(value) == value_type) value == value_type.more else false;
}
fn write(writer: anytype, value: u8) @TypeOf(writer).Error!void {
_ = try writer.write(&[_]u8{value});
}
fn writeTypedVal(writer: anytype, type_: u8, value: u64) @TypeOf(writer).Error!void {
const t: u8 = type_ << 5;
if (value < 24) {
try write(writer, t | @as(u8, @truncate(value)));
} else if (value < 256) {
try write(writer, t | 24);
try write(writer, @as(u8, @truncate(value)));
} else if (value < 65536) {
try write(writer, t | 25);
try write(writer, @as(u8, @truncate(value >> 8)));
try write(writer, @as(u8, @truncate(value)));
} else if (value < 4294967296) {
try write(writer, t | 26);
try write(writer, @as(u8, @truncate(value >> 24)));
try write(writer, @as(u8, @truncate(value >> 16)));
try write(writer, @as(u8, @truncate(value >> 8)));
try write(writer, @as(u8, @truncate(value)));
} else {
try write(writer, t | 27);
try write(writer, @as(u8, @truncate(value >> 56)));
try write(writer, @as(u8, @truncate(value >> 48)));
try write(writer, @as(u8, @truncate(value >> 40)));
try write(writer, @as(u8, @truncate(value >> 32)));
try write(writer, @as(u8, @truncate(value >> 24)));
try write(writer, @as(u8, @truncate(value >> 16)));
try write(writer, @as(u8, @truncate(value >> 8)));
try write(writer, @as(u8, @truncate(value)));
}
}
pub fn writeArrayHeader(writer: anytype, sz: usize) @TypeOf(writer).Error!void {
return writeTypedVal(writer, cbor_magic_type_array, sz);
}
pub fn writeMapHeader(writer: anytype, sz: usize) @TypeOf(writer).Error!void {
return writeTypedVal(writer, cbor_magic_type_map, sz);
}
pub fn writeArray(writer: anytype, args: anytype) @TypeOf(writer).Error!void {
const args_type_info = @typeInfo(@TypeOf(args));
if (args_type_info != .Struct) @compileError("expected tuple or struct argument");
const fields_info = args_type_info.Struct.fields;
try writeArrayHeader(writer, fields_info.len);
inline for (fields_info) |field_info|
try writeValue(writer, @field(args, field_info.name));
}
fn writeI64(writer: anytype, value: i64) @TypeOf(writer).Error!void {
return if (value < 0)
writeTypedVal(writer, 1, @as(u64, @bitCast(-(value + 1))))
else
writeTypedVal(writer, 0, @as(u64, @bitCast(value)));
}
fn writeU64(writer: anytype, value: u64) @TypeOf(writer).Error!void {
return writeTypedVal(writer, 0, value);
}
fn writeString(writer: anytype, s: []const u8) @TypeOf(writer).Error!void {
try writeTypedVal(writer, 3, s.len);
_ = try writer.write(s);
}
fn writeBool(writer: anytype, value: bool) @TypeOf(writer).Error!void {
return write(writer, if (value) cbor_magic_true else cbor_magic_false);
}
fn writeNull(writer: anytype) @TypeOf(writer).Error!void {
return write(writer, cbor_magic_null);
}
fn writeErrorset(writer: anytype, err: anyerror) @TypeOf(writer).Error!void {
var buf: [256]u8 = undefined;
const errmsg = try bufPrint(&buf, "error.{s}", .{@errorName(err)});
return writeString(writer, errmsg);
}
pub fn writeValue(writer: anytype, value: anytype) @TypeOf(writer).Error!void {
const T = @TypeOf(value);
switch (@typeInfo(T)) {
.Int, .ComptimeInt => return if (T == u64) writeU64(writer, value) else writeI64(writer, @intCast(value)),
.Bool => return writeBool(writer, value),
.Optional => return if (value) |v| writeValue(writer, v) else writeNull(writer),
.ErrorUnion => return if (value) |v| writeValue(writer, v) else |err| writeValue(writer, err),
.ErrorSet => return writeErrorset(writer, value),
.Union => |info| {
if (info.tag_type) |TagType| {
comptime var v = void;
inline for (info.fields) |u_field| {
if (value == @field(TagType, u_field.name))
v = @field(value, u_field.name);
}
try writeArray(writer, .{
@typeName(T),
@tagName(@as(TagType, value)),
v,
});
} else {
try writeArray(writer, .{@typeName(T)});
}
},
.Struct => |info| {
if (info.is_tuple) {
if (info.fields.len == 0) return writeNull(writer);
try writeArrayHeader(writer, info.fields.len);
inline for (info.fields) |f|
try writeValue(writer, @field(value, f.name));
} else {
if (info.fields.len == 0) return writeNull(writer);
try writeMapHeader(writer, info.fields.len);
inline for (info.fields) |f| {
try writeString(writer, f.name);
try writeValue(writer, @field(value, f.name));
}
}
},
.Pointer => |ptr_info| switch (ptr_info.size) {
.One => return writeValue(writer, value.*),
.Many, .C => @compileError("cannot write type '" ++ @typeName(T) ++ "' to cbor stream"),
.Slice => {
if (ptr_info.child == u8) return writeString(writer, value);
if (value.len == 0) return writeNull(writer);
try writeArrayHeader(writer, value.len);
for (value) |elem|
try writeValue(writer, elem);
},
},
.Array => |info| {
if (info.child == u8) return writeString(writer, &value);
if (value.len == 0) return writeNull(writer);
try writeArrayHeader(writer, value.len);
for (value) |elem|
try writeValue(writer, elem);
},
.Vector => |info| {
try writeArrayHeader(writer, info.len);
var i: usize = 0;
while (i < info.len) : (i += 1) {
try writeValue(writer, value[i]);
}
},
.Null => try writeNull(writer),
else => @compileError("cannot write type '" ++ @typeName(T) ++ "' to cbor stream"),
}
}
pub fn fmt(buf: []u8, value: anytype) []const u8 {
var stream = fixedBufferStream(buf);
writeValue(stream.writer(), value) catch unreachable;
return stream.getWritten();
}
const CborType = struct { type: u8, minor: u5, major: u3 };
pub fn decodeType(iter: *[]const u8) error{CborTooShort}!CborType {
if (iter.len < 1)
return error.CborTooShort;
const type_: u8 = iter.*[0];
const bits: packed struct { minor: u5, major: u3 } = @bitCast(type_);
iter.* = iter.*[1..];
return .{ .type = type_, .minor = bits.minor, .major = bits.major };
}
fn decodeUIntLengthRecurse(iter: *[]const u8, length: usize, acc: u64) !u64 {
if (iter.len < 1)
return error.CborTooShort;
const v: u8 = iter.*[0];
iter.* = iter.*[1..];
var i = acc | v;
if (length == 1)
return i;
i <<= 8;
// return @call(.always_tail, decodeUIntLengthRecurse, .{ iter, length - 1, i }); FIXME: @call(.always_tail) seems broken as of 0.11.0-dev.2964+e9cbdb2cf
return decodeUIntLengthRecurse(iter, length - 1, i);
}
fn decodeUIntLength(iter: *[]const u8, length: usize) !u64 {
return decodeUIntLengthRecurse(iter, length, 0);
}
fn decodePInt(iter: *[]const u8, minor: u5) !u64 {
if (minor < 24) return minor;
return switch (minor) {
24 => decodeUIntLength(iter, 1), // 1 byte
25 => decodeUIntLength(iter, 2), // 2 byte
26 => decodeUIntLength(iter, 4), // 4 byte
27 => decodeUIntLength(iter, 8), // 8 byte
else => error.CborInvalidType,
};
}
fn decodeNInt(iter: *[]const u8, minor: u5) CborError!i64 {
return -@as(i64, @intCast(try decodePInt(iter, minor) + 1));
}
pub fn decodeMapHeader(iter: *[]const u8) CborError!usize {
const t = try decodeType(iter);
if (t.type == cbor_magic_null)
return 0;
if (t.major != 5)
return error.CborInvalidType;
return decodePInt(iter, t.minor);
}
pub fn decodeArrayHeader(iter: *[]const u8) CborError!usize {
const t = try decodeType(iter);
if (t.type == cbor_magic_null)
return 0;
if (t.major != 4)
return error.CborInvalidType;
return decodePInt(iter, t.minor);
}
fn decodeString(iter_: *[]const u8, minor: u5) CborError![]const u8 {
var iter = iter_.*;
const len = try decodePInt(&iter, minor);
if (iter.len < len)
return error.CborTooShort;
const s = iter[0..len];
iter = iter[len..];
iter_.* = iter;
return s;
}
fn decodeBytes(iter: *[]const u8, minor: u5) CborError![]const u8 {
return decodeString(iter, minor);
}
fn decodeJsonArray(iter_: *[]const u8, minor: u5, arr: *json.Array) CborError!bool {
var iter = iter_.*;
var n = try decodePInt(&iter, minor);
while (n > 0) {
const value = try arr.addOne();
if (!try matchJsonValue(&iter, value, arr.allocator))
return false;
n -= 1;
}
iter_.* = iter;
return true;
}
fn decodeJsonObject(iter_: *[]const u8, minor: u5, obj: *json.ObjectMap) CborError!bool {
var iter = iter_.*;
var n = try decodePInt(&iter, minor);
while (n > 0) {
var key: []u8 = undefined;
var value: json.Value = .null;
if (!try matchString(&iter, &key))
return false;
if (!try matchJsonValue(&iter, &value, obj.allocator))
return false;
_ = try obj.getOrPutValue(key, value);
n -= 1;
}
iter_.* = iter;
return true;
}
pub fn matchInt(comptime T: type, iter_: *[]const u8, val: *T) CborError!bool {
var iter = iter_.*;
const t = try decodeType(&iter);
val.* = switch (t.major) {
0 => blk: { // positive integer
const v = try decodePInt(&iter, t.minor);
if (v > maxInt(T))
return error.CborIntegerTooLarge;
break :blk @intCast(v);
},
1 => blk: { // negative integer
const v = try decodeNInt(&iter, t.minor);
if (v < minInt(T))
return error.CborIntegerTooSmall;
break :blk @intCast(v);
},
else => return false,
};
iter_.* = iter;
return true;
}
pub fn matchIntValue(comptime T: type, iter: *[]const u8, val: T) CborError!bool {
var v: T = 0;
return if (try matchInt(T, iter, &v)) v == val else false;
}
pub fn matchBool(iter_: *[]const u8, v: *bool) CborError!bool {
var iter = iter_.*;
const t = try decodeType(&iter);
if (t.major == 7) { // special
if (t.type == cbor_magic_false) {
v.* = false;
iter_.* = iter;
return true;
}
if (t.type == cbor_magic_true) {
v.* = true;
iter_.* = iter;
return true;
}
}
return false;
}
fn matchBoolValue(iter: *[]const u8, val: bool) CborError!bool {
var v: bool = false;
return if (try matchBool(iter, &v)) v == val else false;
}
fn skipString(iter: *[]const u8, minor: u5) CborError!void {
const len = try decodePInt(iter, minor);
if (iter.len < len)
return error.CborTooShort;
iter.* = iter.*[len..];
}
fn skipBytes(iter: *[]const u8, minor: u5) CborError!void {
return skipString(iter, minor);
}
fn skipArray(iter: *[]const u8, minor: u5) CborError!void {
var len = try decodePInt(iter, minor);
while (len > 0) {
try skipValue(iter);
len -= 1;
}
}
fn skipMap(iter: *[]const u8, minor: u5) CborError!void {
var len = try decodePInt(iter, minor);
len *= 2;
while (len > 0) {
try skipValue(iter);
len -= 1;
}
}
pub fn skipValue(iter: *[]const u8) CborError!void {
const t = try decodeType(iter);
try skipValueType(iter, t.major, t.minor);
}
fn skipValueType(iter: *[]const u8, major: u3, minor: u5) CborError!void {
switch (major) {
0 => { // positive integer
_ = try decodePInt(iter, minor);
},
1 => { // negative integer
_ = try decodeNInt(iter, minor);
},
2 => { // bytes
try skipBytes(iter, minor);
},
3 => { // string
try skipString(iter, minor);
},
4 => { // array
try skipArray(iter, minor);
},
5 => { // map
try skipMap(iter, minor);
},
6 => { // tag
return error.CborInvalidType;
},
7 => { // special
return;
},
}
}
fn matchType(iter_: *[]const u8, v: *value_type) CborError!bool {
var iter = iter_.*;
const t = try decodeType(&iter);
try skipValueType(&iter, t.major, t.minor);
switch (t.major) {
0, 1 => v.* = value_type.number, // positive integer or negative integer
2 => v.* = value_type.bytes, // bytes
3 => v.* = value_type.string, // string
4 => v.* = value_type.array, // array
5 => v.* = value_type.map, // map
7 => { // special
if (t.type == cbor_magic_null) {
v.* = value_type.null;
} else {
if (t.type == cbor_magic_false or t.type == cbor_magic_true) {
v.* = value_type.boolean;
} else {
return false;
}
}
},
else => return false,
}
iter_.* = iter;
return true;
}
fn matchValueType(iter: *[]const u8, t: value_type) CborError!bool {
var v: value_type = value_type.unknown;
return if (try matchType(iter, &v)) (t == value_type.any or t == v) else false;
}
pub fn matchString(iter_: *[]const u8, val: *[]const u8) CborError!bool {
var iter = iter_.*;
const t = try decodeType(&iter);
val.* = switch (t.major) {
2 => try decodeBytes(&iter, t.minor), // bytes
3 => try decodeString(&iter, t.minor), // string
else => return false,
};
iter_.* = iter;
return true;
}
fn matchStringValue(iter: *[]const u8, lit: []const u8) CborError!bool {
var val: []const u8 = undefined;
return if (try matchString(iter, &val)) eql(u8, val, lit) else false;
}
fn matchError(comptime T: type) noreturn {
@compileError("cannot match type '" ++ @typeName(T) ++ "' to cbor stream");
}
pub fn matchValue(iter: *[]const u8, value: anytype) CborError!bool {
if (@TypeOf(value) == value_type)
return matchValueType(iter, value);
const T = comptime @TypeOf(value);
if (comptime isExtractor(T))
return value.extract(iter);
return switch (comptime @typeInfo(T)) {
.Int => return matchIntValue(T, iter, value),
.ComptimeInt => return matchIntValue(i64, iter, value),
.Bool => matchBoolValue(iter, value),
.Pointer => |info| switch (info.size) {
.One => matchValue(iter, value.*),
.Many, .C => matchError(T),
.Slice => if (info.child == u8) matchStringValue(iter, value) else matchArray(iter, value, info),
},
.Struct => |info| if (info.is_tuple)
matchArray(iter, value, info)
else
matchError(T),
.Array => |info| if (info.child == u8) matchStringValue(iter, &value) else matchArray(iter, value, info),
else => @compileError("cannot match value type '" ++ @typeName(T) ++ "' to cbor stream"),
};
}
fn matchJsonValue(iter_: *[]const u8, v: *json.Value, a: std.mem.Allocator) CborError!bool {
var iter = iter_.*;
const t = try decodeType(&iter);
const ret = switch (t.major) {
0 => ret: { // positive integer
v.* = json.Value{ .integer = @intCast(try decodePInt(&iter, t.minor)) };
break :ret true;
},
1 => ret: { // negative integer
v.* = json.Value{ .integer = try decodeNInt(&iter, t.minor) };
break :ret true;
},
2 => ret: { // bytes
break :ret false;
},
3 => ret: { // string
v.* = json.Value{ .string = try decodeString(&iter, t.minor) };
break :ret true;
},
4 => ret: { // array
v.* = json.Value{ .array = json.Array.init(a) };
break :ret try decodeJsonArray(&iter, t.minor, &v.array);
},
5 => ret: { // map
v.* = json.Value{ .object = json.ObjectMap.init(a) };
break :ret try decodeJsonObject(&iter, t.minor, &v.object);
},
6 => ret: { // tag
break :ret false;
},
7 => ret: { // special
switch (t.type) {
cbor_magic_false => {
v.* = json.Value{ .bool = false };
break :ret true;
},
cbor_magic_true => {
v.* = json.Value{ .bool = true };
break :ret true;
},
cbor_magic_null => {
v.* = json.Value{ .null = {} };
break :ret true;
},
else => break :ret false,
}
},
};
if (ret) iter_.* = iter;
return ret;
}
fn matchArrayMore(iter_: *[]const u8, n_: u64) CborError!bool {
var iter = iter_.*;
var n = n_;
while (n > 0) {
if (!try matchValue(&iter, value_type.any))
return false;
n -= 1;
}
iter_.* = iter;
return true;
}
fn matchArray(iter_: *[]const u8, arr: anytype, info: anytype) CborError!bool {
var iter = iter_.*;
var n = try decodeArrayHeader(&iter);
inline for (info.fields) |f| {
const value = @field(arr, f.name);
if (isMore(value))
break;
} else if (info.fields.len != n)
return false;
inline for (info.fields) |f| {
const value = @field(arr, f.name);
if (isMore(value))
return matchArrayMore(&iter, n);
if (n == 0) return false;
const matched = try matchValue(&iter, @field(arr, f.name));
if (!matched) return false;
n -= 1;
}
if (n == 0) iter_.* = iter;
return n == 0;
}
fn matchJsonObject(iter_: *[]const u8, obj: *json.ObjectMap) !bool {
var iter = iter_.*;
const t = try decodeType(&iter);
if (t.type == cbor_magic_null)
return true;
if (t.major != 5)
return error.CborInvalidType;
const ret = try decodeJsonObject(&iter, t.minor, obj);
if (ret) iter_.* = iter;
return ret;
}
pub fn match(buf: []const u8, pattern: anytype) CborError!bool {
var iter: []const u8 = buf;
return matchValue(&iter, pattern);
}
fn extractError(comptime T: type) noreturn {
@compileError("cannot extract type '" ++ @typeName(T) ++ "' from cbor stream");
}
fn hasExtractorTag(info: anytype) bool {
if (info.is_tuple) return false;
inline for (info.decls) |decl| {
if (comptime eql(u8, decl.name, "EXTRACTOR_TAG"))
return true;
}
return false;
}
fn isExtractor(comptime T: type) bool {
return comptime switch (@typeInfo(T)) {
.Struct => |info| hasExtractorTag(info),
else => false,
};
}
const JsonValueExtractor = struct {
dest: *T,
const Self = @This();
pub const EXTRACTOR_TAG = struct {};
const T = json.Value;
pub fn init(dest: *T) Self {
return .{ .dest = dest };
}
pub fn extract(self: Self, iter: *[]const u8) CborError!bool {
var null_heap_: [0]u8 = undefined;
var heap = fba.init(&null_heap_);
return matchJsonValue(iter, self.dest, heap.allocator());
}
};
const JsonObjectExtractor = struct {
dest: *T,
const Self = @This();
pub const EXTRACTOR_TAG = struct {};
const T = json.ObjectMap;
pub fn init(dest: *T) Self {
return .{ .dest = dest };
}
pub fn extract(self: Self, iter: *[]const u8) CborError!bool {
return matchJsonObject(iter, self.dest);
}
};
fn Extractor(comptime T: type) type {
if (T == json.Value)
return JsonValueExtractor;
if (T == json.ObjectMap)
return JsonObjectExtractor;
return struct {
dest: *T,
const Self = @This();
pub const EXTRACTOR_TAG = struct {};
pub fn init(dest: *T) Self {
return .{ .dest = dest };
}
pub fn extract(self: Self, iter: *[]const u8) CborError!bool {
switch (comptime @typeInfo(T)) {
.Int, .ComptimeInt => return matchInt(T, iter, self.dest),
.Bool => return matchBool(iter, self.dest),
.Pointer => |ptr_info| switch (ptr_info.size) {
.Slice => {
if (ptr_info.child == u8) return matchString(iter, self.dest) else extractError(T);
},
else => extractError(T),
},
else => extractError(T),
}
}
};
}
fn ExtractorType(comptime T: type) type {
const T_type_info = @typeInfo(T);
if (T_type_info != .Pointer) @compileError("extract requires a pointer argument");
return Extractor(T_type_info.Pointer.child);
}
pub fn extract(dest: anytype) ExtractorType(@TypeOf(dest)) {
comptime {
if (!isExtractor(ExtractorType(@TypeOf(dest))))
@compileError("isExtractor self check failed for " ++ @typeName(ExtractorType(@TypeOf(dest))));
}
return ExtractorType(@TypeOf(dest)).init(dest);
}
const CborExtractor = struct {
dest: *[]const u8,
const Self = @This();
pub const EXTRACTOR_TAG = struct {};
pub fn init(dest: *[]const u8) Self {
return .{ .dest = dest };
}
pub fn extract(self: Self, iter: *[]const u8) CborError!bool {
const b = iter.*;
try skipValue(iter);
self.dest.* = b[0..(b.len - iter.len)];
return true;
}
};
pub fn extract_cbor(dest: *[]const u8) CborExtractor {
return CborExtractor.init(dest);
}
pub fn JsonStream(comptime T: type) type {
return struct {
const Writer = T.Writer;
const JsonWriter = json.WriteStream(Writer, .{ .checked_to_fixed_depth = 256 });
fn jsonWriteArray(w: *JsonWriter, iter: *[]const u8, minor: u5) !void {
var count = try decodePInt(iter, minor);
try w.beginArray();
while (count > 0) : (count -= 1) {
try jsonWriteValue(w, iter);
}
try w.endArray();
}
fn jsonWriteMap(w: *JsonWriter, iter: *[]const u8, minor: u5) !void {
var count = try decodePInt(iter, minor);
try w.beginObject();
while (count > 0) : (count -= 1) {
const t = try decodeType(iter);
if (t.major != 3) return error.CborInvalidType;
try w.objectField(try decodeString(iter, t.minor));
try jsonWriteValue(w, iter);
}
try w.endObject();
}
pub fn jsonWriteValue(w: *JsonWriter, iter: *[]const u8) (CborJsonError || Writer.Error)!void {
const t = try decodeType(iter);
if (t.type == cbor_magic_false)
return w.write(false);
if (t.type == cbor_magic_true)
return w.write(true);
if (t.type == cbor_magic_null)
return w.write(null);
return switch (t.major) {
0 => w.write(try decodePInt(iter, t.minor)), // positive integer
1 => w.write(try decodeNInt(iter, t.minor)), // negative integer
2 => error.CborUnsupportedType, // bytes
3 => w.write(try decodeString(iter, t.minor)), // string
4 => jsonWriteArray(w, iter, t.minor), // array
5 => jsonWriteMap(w, iter, t.minor), // map
else => error.CborInvalidType,
};
}
};
}
pub fn toJson(cbor_buf: []const u8, json_buf: []u8) CborJsonError![]const u8 {
var fbs = fixedBufferStream(json_buf);
var s = json.writeStream(fbs.writer(), .{});
var iter: []const u8 = cbor_buf;
try JsonStream(@TypeOf(fbs)).jsonWriteValue(&s, &iter);
return fbs.getWritten();
}
pub fn toJsonPretty(cbor_buf: []const u8, json_buf: []u8) CborJsonError![]const u8 {
var fbs = fixedBufferStream(json_buf);
var s = json.writeStream(fbs.writer(), .{ .whitespace = .indent_1 });
var iter: []const u8 = cbor_buf;
try JsonStream(@TypeOf(fbs)).jsonWriteValue(&s, &iter);
return fbs.getWritten();
}
fn writeJsonValue(writer: anytype, value: json.Value) !void {
try switch (value) {
.array => |_| unreachable,
.object => |_| unreachable,
.null => writeNull(writer),
.float => |_| error.CborUnsupportedType,
inline else => |v| writeValue(writer, v),
};
}
fn jsonScanUntil(writer: anytype, scanner: *json.Scanner, end_token: anytype) CborJsonError!usize {
var partial = try std.BoundedArray(u8, 4096).init(0);
var count: usize = 0;
var token = try scanner.next();
while (token != end_token) : (token = try scanner.next()) {
count += 1;
switch (token) {
.object_begin => try writeJsonObject(writer, scanner),
.array_begin => try writeJsonArray(writer, scanner),
.true => try writeBool(writer, true),
.false => try writeBool(writer, false),
.null => try writeNull(writer),
.number => |v| {
try partial.appendSlice(v);
try writeJsonValue(writer, json.Value.parseFromNumberSlice(partial.slice()));
try partial.resize(0);
},
.partial_number => |v| {
try partial.appendSlice(v);
count -= 1;
},
.string => |v| {
try partial.appendSlice(v);
try writeString(writer, partial.slice());
try partial.resize(0);
},
.partial_string => |v| {
try partial.appendSlice(v);
count -= 1;
},
.partial_string_escaped_1 => |v| {
try partial.appendSlice(&v);
count -= 1;
},
.partial_string_escaped_2 => |v| {
try partial.appendSlice(&v);
count -= 1;
},
.partial_string_escaped_3 => |v| {
try partial.appendSlice(&v);
count -= 1;
},
.partial_string_escaped_4 => |v| {
try partial.appendSlice(&v);
count -= 1;
},
else => return error.SyntaxError,
}
}
return count;
}
pub const local_heap_size = 4096 * 16;
fn writeJsonArray(writer_: anytype, scanner: *json.Scanner) CborJsonError!void {
var buf: [local_heap_size]u8 = undefined;
var stream = fixedBufferStream(&buf);
const writer = stream.writer();
const count = try jsonScanUntil(writer, scanner, .array_end);
try writeArrayHeader(writer_, count);
try writer_.writeAll(stream.getWritten());
}
fn writeJsonObject(writer_: anytype, scanner: *json.Scanner) CborJsonError!void {
var buf: [local_heap_size]u8 = undefined;
var stream = fixedBufferStream(&buf);
const writer = stream.writer();
const count = try jsonScanUntil(writer, scanner, .object_end);
try writeMapHeader(writer_, count / 2);
try writer_.writeAll(stream.getWritten());
}
pub fn fromJson(json_buf: []const u8, cbor_buf: []u8) ![]const u8 {
var local_heap_: [local_heap_size]u8 = undefined;
var heap = fba.init(&local_heap_);
var stream = fixedBufferStream(cbor_buf);
const writer = stream.writer();
var scanner = json.Scanner.initCompleteInput(heap.allocator(), json_buf);
defer scanner.deinit();
_ = try jsonScanUntil(writer, &scanner, .end_of_document);
return stream.getWritten();
}

207
src/executor.hpp Normal file
View file

@ -0,0 +1,207 @@
#pragma once
#include <array>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <netinet/in.h>
#include <system_error>
#include <vector>
using port_t = unsigned short;
namespace thespian::executor {
struct strand;
struct context_impl;
using context_ref = std::shared_ptr<context_impl>;
struct context {
static auto create() -> context;
auto create_strand() -> strand;
void run();
auto pending_tasks() -> size_t;
auto pending_posts() -> size_t;
context_ref ref;
};
struct strand_impl;
using strand_ref = std::shared_ptr<strand_impl>;
struct strand {
void post(std::function<void()>);
strand_ref ref;
};
struct signal_impl;
using signal_dtor = void (*)(signal_impl *);
using signal_ref = std::unique_ptr<signal_impl, signal_dtor>;
struct signal {
using handler = std::function<void(const std::error_code &, int signum)>;
explicit signal(context &);
explicit signal(strand &);
void fires_on(int signum);
void on_fired(handler);
void cancel();
signal_ref ref;
};
struct timer_impl;
using timer_dtor = void (*)(timer_impl *);
using timer_ref = std::unique_ptr<timer_impl, timer_dtor>;
struct timer {
using handler = std::function<void(const std::error_code &)>;
explicit timer(context &);
explicit timer(strand &);
void expires_at(std::chrono::time_point<std::chrono::system_clock>);
void expires_after(std::chrono::microseconds);
void on_expired(handler);
void cancel();
timer_ref ref;
};
auto to_string(const in6_addr &ip) -> std::string;
constexpr auto receive_buffer_size{4L * 1024L};
struct endpoint {
in6_addr ip;
port_t port;
};
namespace udp {
struct socket_impl;
using socket_dtor = void (*)(socket_impl *);
using socket_ref = std::unique_ptr<socket_impl, socket_dtor>;
struct socket {
using handler =
std::function<void(const std::error_code &, std::size_t received)>;
explicit socket(strand &);
auto bind(const in6_addr &ip, port_t port) -> std::error_code;
[[nodiscard]] auto send_to(std::string_view data, in6_addr ip,
port_t port) const -> size_t;
void receive(handler);
void close();
[[nodiscard]] auto local_endpoint() const -> const endpoint &;
[[nodiscard]] auto remote_endpoint() const -> const endpoint &;
std::array<char, receive_buffer_size> receive_buffer{};
socket_ref ref;
};
} // namespace udp
namespace tcp {
struct socket_impl;
using socket_dtor = void (*)(socket_impl *);
using socket_ref = std::unique_ptr<socket_impl, socket_dtor>;
struct socket {
using connect_handler = std::function<void(const std::error_code &)>;
using read_handler =
std::function<void(const std::error_code &, std::size_t)>;
using write_handler =
std::function<void(const std::error_code &, std::size_t)>;
explicit socket(strand &);
explicit socket(strand &, int fd);
auto bind(const in6_addr &ip, port_t port) -> std::error_code;
void connect(const in6_addr &ip, port_t port, connect_handler);
void write(const std::vector<uint8_t> &data, write_handler);
void read(read_handler);
void close();
static void close(int fd);
auto release() -> int;
[[nodiscard]] auto local_endpoint() const -> const endpoint &;
std::array<char, receive_buffer_size> read_buffer{};
socket_ref ref;
};
struct acceptor_impl;
using acceptor_dtor = void (*)(acceptor_impl *);
using acceptor_ref = std::unique_ptr<acceptor_impl, acceptor_dtor>;
struct acceptor {
using handler = std::function<void(int fd, const std::error_code &)>;
explicit acceptor(strand &);
auto bind(const in6_addr &, port_t) -> std::error_code;
auto listen() -> std::error_code;
void accept(handler);
void close();
[[nodiscard]] auto local_endpoint() const -> endpoint;
acceptor_ref ref;
};
} // namespace tcp
namespace unx {
struct socket_impl;
using socket_dtor = void (*)(socket_impl *);
using socket_ref = std::unique_ptr<socket_impl, socket_dtor>;
struct socket {
using connect_handler = std::function<void(const std::error_code &)>;
using read_handler =
std::function<void(const std::error_code &, std::size_t)>;
using write_handler =
std::function<void(const std::error_code &, std::size_t)>;
explicit socket(strand &);
explicit socket(strand &, int fd);
auto bind(std::string_view path) -> std::error_code;
void connect(std::string_view path, connect_handler);
void write(const std::vector<uint8_t> &data, write_handler);
void read(read_handler);
void close();
auto release() -> int;
std::array<char, receive_buffer_size> read_buffer{};
socket_ref ref;
};
struct acceptor_impl;
using acceptor_dtor = void (*)(acceptor_impl *);
using acceptor_ref = std::unique_ptr<acceptor_impl, acceptor_dtor>;
struct acceptor {
using handler = std::function<void(int fd, const std::error_code &)>;
explicit acceptor(strand &);
auto bind(std::string_view path) -> std::error_code;
auto listen() -> std::error_code;
void accept(handler);
void close();
acceptor_ref ref;
};
} // namespace unx
namespace file_descriptor {
struct watcher_impl;
using watcher_dtor = void (*)(watcher_impl *);
using watcher_ref = std::unique_ptr<watcher_impl, watcher_dtor>;
struct watcher {
using handler = std::function<void(const std::error_code &)>;
explicit watcher(strand &, int fd);
void wait_read(handler);
void wait_write(handler);
void cancel();
watcher_ref ref;
};
} // namespace file_descriptor
} // namespace thespian::executor

645
src/executor_asio.cpp Normal file
View file

@ -0,0 +1,645 @@
#include "asio/error_code.hpp"
#include "asio/posix/descriptor_base.hpp"
#include "executor.hpp"
#include <cstddef>
#include <memory>
#include <asio/basic_socket_acceptor.hpp>
#include <asio/bind_executor.hpp>
#include <asio/io_context.hpp>
#include <asio/ip/address_v6.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/ip/udp.hpp>
#include <asio/local/stream_protocol.hpp>
#include <asio/posix/stream_descriptor.hpp>
#include <asio/signal_set.hpp>
#include <asio/socket_base.hpp>
#include <asio/strand.hpp>
#include <asio/system_timer.hpp>
#include <asio/thread.hpp>
#include <csignal>
#include <vector>
using asio::bind_executor;
using asio::buffer;
using asio::io_context;
using asio::string_view;
using asio::system_timer;
using asio::thread;
using asio::posix::stream_descriptor;
using std::atomic;
using std::error_code;
using std::function;
using std::make_shared;
using std::make_unique;
using std::max;
using std::memory_order_relaxed;
using std::min;
using std::move;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using std::weak_ptr;
using std::chrono::microseconds;
using std::chrono::system_clock;
using std::chrono::time_point;
namespace thespian::executor {
const char *MAX_THREAD_STR = getenv("MAX_THREAD"); // NOLINT
const auto MAX_THREAD =
static_cast<long>(atoi(MAX_THREAD_STR ? MAX_THREAD_STR : "64")); // NOLINT
const auto threads = min(sysconf(_SC_NPROCESSORS_ONLN), MAX_THREAD);
struct context_impl {
context_impl() : asio{make_unique<io_context>(threads)} {}
unique_ptr<io_context> asio;
atomic<size_t> pending;
atomic<size_t> posts;
};
auto context::create() -> context {
if (::signal(SIGPIPE, SIG_IGN) == SIG_ERR) // NOLINT
abort();
return {context_ref(new context_impl(), [](context_impl *p) { delete p; })};
}
struct strand_impl {
explicit strand_impl(const context_ref &ctx)
: ctx{ctx}, strand_{*ctx->asio} {}
void post(function<void()> f) {
ctx->posts.fetch_add(1);
strand_.post([&posts = ctx->posts, f = move(f)]() {
posts.fetch_sub(1);
f();
});
}
context_ref ctx;
io_context::strand strand_;
};
auto context::create_strand() -> strand {
return {make_shared<strand_impl>(ref)};
}
void strand::post(function<void()> f) { ref->post(move(f)); }
auto context::run() -> void {
const auto spawn_threads = max(threads - 1, 0L);
vector<thread *> running;
for (auto i = 0; i < spawn_threads; ++i) {
auto *t = new thread([ctx = ref]() { ctx->asio->run(); });
running.push_back(t);
}
ref->asio->run();
for (auto &t : running) {
t->join();
delete t;
}
}
auto context::pending_tasks() -> size_t { return ref->pending.load(); }
auto context::pending_posts() -> size_t { return ref->posts.load(); }
} // namespace thespian::executor
namespace {
static auto to_address(const in6_addr &ip) -> const asio::ip::address_v6 {
asio::ip::address_v6::bytes_type bytes;
memcpy(bytes.data(), ip.s6_addr, sizeof(ip.s6_addr));
return asio::ip::address_v6{bytes};
}
static auto to_in6_addr(const asio::ip::address_v6 &address) -> in6_addr {
auto bytes = address.to_bytes();
in6_addr ip{};
memcpy(ip.s6_addr, bytes.data(), sizeof(ip.s6_addr));
return ip;
}
static auto to_in6_addr(const asio::ip::udp::endpoint &ep) -> in6_addr {
return to_in6_addr(ep.address().to_v6());
}
static auto to_in6_addr(const asio::ip::tcp::endpoint &ep) -> in6_addr {
return to_in6_addr(ep.address().to_v6());
}
static auto to_endpoint_tcp(const in6_addr &ip, port_t port)
-> const asio::ip::tcp::endpoint {
return {to_address(ip), port};
}
static auto to_endpoint_udp(const in6_addr &ip, port_t port)
-> const asio::ip::udp::endpoint {
return {to_address(ip), port};
}
} // namespace
namespace thespian::executor {
auto to_string(const in6_addr &ip) -> string {
return to_address(ip).to_string();
}
struct signal_impl {
explicit signal_impl(const context_ref &ctx)
: ctx{ctx}, signals_{*ctx->asio} {}
explicit signal_impl(strand &strand)
: ctx{strand.ref->ctx}, signals_{*ctx->asio} {}
void fires_on(int signum) { signals_.add(signum); }
void on_fired(signal::handler f) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
signals_.async_wait([c_ = ctx, f = move(f), t = move(weak_token)](
const error_code &ec, int signum) {
c_->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
f(ec, signum);
}
});
}
void cancel() { signals_.cancel(); }
context_ref ctx;
asio::signal_set signals_;
shared_ptr<bool> token_{make_shared<bool>(true)};
};
signal::signal(context &ctx)
: ref{signal_ref(new signal_impl(ctx.ref),
[](signal_impl *p) { delete p; })} {}
signal::signal(strand &ref)
: ref{signal_ref(new signal_impl(ref), [](signal_impl *p) { delete p; })} {}
void signal::fires_on(int signum) { ref->fires_on(signum); }
void signal::on_fired(handler f) { ref->on_fired(move(f)); }
void signal::cancel() { ref->cancel(); }
struct timer_impl {
explicit timer_impl(const context_ref &ctx) : ctx{ctx}, timer_{*ctx->asio} {}
explicit timer_impl(strand &strand)
: ctx{strand.ref->ctx}, timer_{*ctx->asio} {}
void expires_at(time_point<system_clock> t) { timer_.expires_at(t); }
void expires_after(microseconds us) { timer_.expires_after(us); }
void on_expired(timer::handler f) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
timer_.async_wait(
[c_ = ctx, f = move(f), t = move(weak_token)](const error_code &ec) {
c_->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
f(ec);
}
});
}
void cancel() { timer_.cancel(); }
context_ref ctx;
system_timer timer_;
bool pending_{};
shared_ptr<bool> token_{make_shared<bool>(true)};
};
timer::timer(context &ctx)
: ref{timer_ref(new timer_impl(ctx.ref), [](timer_impl *p) { delete p; })} {
}
timer::timer(strand &ref)
: ref{timer_ref(new timer_impl(ref), [](timer_impl *p) { delete p; })} {}
void timer::expires_at(time_point<system_clock> t) { ref->expires_at(t); }
void timer::expires_after(microseconds us) { ref->expires_after(us); }
void timer::on_expired(handler f) { ref->on_expired(move(f)); }
void timer::cancel() { ref->cancel(); }
namespace udp {
struct socket_impl {
explicit socket_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
socket_{*ctx->asio, ::asio::ip::udp::v6()} {}
auto bind(const in6_addr &ip, port_t port) -> error_code {
error_code ec;
socket_.bind(to_endpoint_udp(ip, port), ec);
return ec;
}
[[nodiscard]] auto send_to(string_view data, in6_addr ip, port_t port)
-> size_t {
return socket_.send_to(buffer(data.data(), data.size()),
to_endpoint_udp(ip, port));
}
void receive(socket &s, socket::handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_receive_from(
buffer(s.receive_buffer), asio_remote_endpoint_,
bind_executor(strand_,
[c = ctx, ep = &remote_endpoint_,
aep = &asio_remote_endpoint_, t = move(weak_token),
h = move(h)](const error_code &ec, size_t received) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
ep->ip = to_in6_addr(*aep);
ep->port = aep->port();
h(ec, received);
}
}));
}
void close() { socket_.close(); }
auto local_endpoint() -> const endpoint & {
auto ep = socket_.local_endpoint();
local_endpoint_.ip = to_in6_addr(ep);
local_endpoint_.port = ep.port();
return local_endpoint_;
}
auto remote_endpoint() -> const endpoint & { return remote_endpoint_; }
context_ref ctx;
io_context::strand strand_;
asio::ip::udp::socket socket_;
asio::ip::udp::endpoint asio_remote_endpoint_;
executor::endpoint local_endpoint_{};
executor::endpoint remote_endpoint_{};
shared_ptr<bool> token_{make_shared<bool>(true)};
};
socket::socket(strand &strand)
: ref{socket_ref(new socket_impl(strand),
[](socket_impl *p) { delete p; })} {}
auto socket::bind(const in6_addr &ip, port_t port) -> error_code {
return ref->bind(ip, port);
}
auto socket::send_to(string_view data, in6_addr ip, port_t port) const
-> size_t {
return ref->send_to(data, ip, port);
}
void socket::receive(handler h) { ref->receive(*this, move(h)); }
void socket::close() {}
auto socket::local_endpoint() const -> const endpoint & {
return ref->local_endpoint();
}
auto socket::remote_endpoint() const -> const endpoint & {
return ref->remote_endpoint();
}
} // namespace udp
namespace tcp {
struct socket_impl {
explicit socket_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
socket_{*ctx->asio, asio::ip::tcp::v6()} {}
explicit socket_impl(strand &strand, int fd)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
socket_{*ctx->asio, asio::ip::tcp::v6(), fd} {}
auto bind(const in6_addr &ip, port_t port) -> error_code {
error_code ec;
socket_.bind(to_endpoint_tcp(ip, port), ec);
return ec;
}
void connect(const in6_addr &ip, port_t port, socket::connect_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_connect(
to_endpoint_tcp(ip, port),
bind_executor(strand_, [c = ctx, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec);
}));
}
void write(const vector<uint8_t> &data, socket::write_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_write_some(
buffer(data.data(), data.size()),
bind_executor(strand_, [c = ctx, h = move(h), t = move(weak_token)](
const error_code &ec, size_t written) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec, written);
}));
}
void read(socket &s, socket::read_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_read_some(
buffer(s.read_buffer.data(), s.read_buffer.size()),
bind_executor(strand_, [c = ctx, h = move(h), t = move(weak_token)](
const error_code &ec, size_t read) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec, read);
}));
}
void close() { socket_.close(); }
auto release() -> int { return socket_.release(); }
auto local_endpoint() -> const endpoint & {
auto ep = socket_.local_endpoint();
local_endpoint_.ip = to_in6_addr(ep);
local_endpoint_.port = ep.port();
return local_endpoint_;
}
context_ref ctx;
io_context::strand strand_;
asio::ip::tcp::socket socket_;
asio::ip::tcp::endpoint remote_endpoint_;
executor::endpoint local_endpoint_{};
shared_ptr<bool> token_{make_shared<bool>(true)};
};
socket::socket(strand &strand)
: ref{socket_ref(new socket_impl(strand),
[](socket_impl *p) { delete p; })} {}
socket::socket(strand &strand, int fd)
: ref{socket_ref(new socket_impl(strand, fd),
[](socket_impl *p) { delete p; })} {}
auto socket::bind(const in6_addr &ip, port_t port) -> error_code {
return ref->bind(ip, port);
}
void socket::connect(const in6_addr &ip, port_t port, connect_handler h) {
ref->connect(ip, port, move(h));
}
void socket::write(const vector<uint8_t> &data, write_handler h) {
ref->write(data, move(h));
}
void socket::read(read_handler h) { ref->read(*this, move(h)); }
void socket::close() { ref->close(); }
void socket::close(int fd) { ::close(fd); }
auto socket::release() -> int { return ref->release(); }
auto socket::local_endpoint() const -> const endpoint & {
return ref->local_endpoint();
}
struct acceptor_impl {
explicit acceptor_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
acceptor_{*ctx->asio, asio::ip::tcp::v6()}, socket_{*ctx->asio} {}
auto bind(const in6_addr &ip, port_t port) -> error_code {
error_code ec;
acceptor_.bind(to_endpoint_tcp(ip, port), ec);
return ec;
}
auto listen() -> error_code {
error_code ec;
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
return ec;
}
void accept(acceptor::handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
acceptor_.async_accept(
socket_,
bind_executor(strand_, [c = ctx, s = &socket_, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec ? 0 : s->release(), ec);
}));
}
void close() { acceptor_.close(); }
auto local_endpoint() -> const endpoint & {
auto ep = acceptor_.local_endpoint();
local_endpoint_.ip = to_in6_addr(ep);
local_endpoint_.port = ep.port();
return local_endpoint_;
}
context_ref ctx;
io_context::strand strand_;
asio::ip::tcp::acceptor acceptor_;
asio::ip::tcp::socket socket_;
executor::endpoint local_endpoint_{};
shared_ptr<bool> token_{make_shared<bool>(true)};
};
acceptor::acceptor(strand &strand)
: ref{acceptor_ref(new acceptor_impl(strand),
[](acceptor_impl *p) { delete p; })} {}
auto acceptor::bind(const in6_addr &ip, port_t port) -> error_code {
return ref->bind(ip, port);
}
auto acceptor::listen() -> error_code { return ref->listen(); }
void acceptor::accept(acceptor::handler h) { ref->accept(move(h)); }
void acceptor::close() { ref->close(); }
[[nodiscard]] auto acceptor::local_endpoint() const -> endpoint {
return ref->local_endpoint();
}
} // namespace tcp
namespace unx {
struct socket_impl {
explicit socket_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{
*ctx->asio} {}
explicit socket_impl(strand &strand, int fd)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, socket_{*ctx->asio,
fd} {}
auto bind(string_view path) -> error_code {
error_code ec;
socket_.bind(path, ec);
return ec;
}
void connect(string_view path, socket::connect_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_connect(
path,
bind_executor(strand_, [c = ctx, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec);
}));
}
void write(const vector<uint8_t> &data, socket::write_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_write_some(
buffer(data.data(), data.size()),
bind_executor(strand_, [c = ctx, h = move(h), t = move(weak_token)](
const error_code &ec, size_t written) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec, written);
}));
}
void read(socket &s, socket::read_handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
socket_.async_read_some(
buffer(s.read_buffer.data(), s.read_buffer.size()),
bind_executor(strand_, [c = ctx, h = move(h), t = move(weak_token)](
const error_code &ec, size_t read) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec, read);
}));
}
void close() { socket_.close(); }
auto release() -> int { return socket_.release(); }
context_ref ctx;
io_context::strand strand_;
asio::local::stream_protocol::socket socket_;
shared_ptr<bool> token_{make_shared<bool>(true)};
};
socket::socket(strand &strand)
: ref{socket_ref(new socket_impl(strand),
[](socket_impl *p) { delete p; })} {}
socket::socket(strand &strand, int fd)
: ref{socket_ref(new socket_impl(strand, fd),
[](socket_impl *p) { delete p; })} {}
auto socket::bind(string_view path) -> error_code { return ref->bind(path); }
void socket::connect(string_view path, connect_handler h) {
ref->connect(path, move(h));
}
void socket::write(const vector<uint8_t> &data, write_handler h) {
ref->write(data, move(h));
}
void socket::read(read_handler h) { ref->read(*this, move(h)); }
void socket::close() { ref->close(); }
auto socket::release() -> int { return ref->release(); }
struct acceptor_impl {
explicit acceptor_impl(strand &strand)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_},
acceptor_{*ctx->asio, asio::local::stream_protocol()},
socket_{*ctx->asio} {}
auto bind(string_view path) -> error_code {
error_code ec;
acceptor_.bind(path, ec);
return ec;
}
auto listen() -> error_code {
error_code ec;
acceptor_.listen(asio::socket_base::max_listen_connections, ec);
return ec;
}
void accept(acceptor::handler h) {
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
acceptor_.async_accept(
socket_,
bind_executor(strand_, [c = ctx, s = &socket_, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock())
h(ec ? 0 : s->release(), ec);
}));
}
void close() { acceptor_.close(); }
context_ref ctx;
io_context::strand strand_;
asio::local::stream_protocol::acceptor acceptor_;
asio::local::stream_protocol::socket socket_;
shared_ptr<bool> token_{make_shared<bool>(true)};
};
acceptor::acceptor(strand &strand)
: ref{acceptor_ref(new acceptor_impl(strand),
[](acceptor_impl *p) { delete p; })} {}
auto acceptor::bind(string_view path) -> error_code { return ref->bind(path); }
auto acceptor::listen() -> error_code { return ref->listen(); }
void acceptor::accept(acceptor::handler h) { ref->accept(move(h)); }
void acceptor::close() { ref->close(); }
} // namespace unx
namespace file_descriptor {
struct watcher_impl {
explicit watcher_impl(strand &strand, int fd)
: ctx{strand.ref->ctx}, strand_{strand.ref->strand_}, fd_{*ctx->asio,
fd} {}
void wait_read(watcher::handler h) {
if (!read_in_progress_) {
read_in_progress_ = true;
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
fd_.async_wait(
asio::posix::descriptor_base::wait_type::wait_read,
bind_executor(strand_, [c = ctx, b = &read_in_progress_, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
*b = false;
h(ec);
}
}));
}
}
void wait_write(watcher::handler h) {
if (!write_in_progress_) {
write_in_progress_ = true;
ctx->pending.fetch_add(1, memory_order_relaxed);
weak_ptr<bool> weak_token{token_};
fd_.async_wait(
asio::posix::descriptor_base::wait_type::wait_write,
bind_executor(strand_, [c = ctx, b = &write_in_progress_, h = move(h),
t = move(weak_token)](const error_code &ec) {
c->pending.fetch_sub(1, memory_order_relaxed);
if (auto p = t.lock()) {
*b = false;
h(ec);
}
}));
}
}
void cancel() { fd_.cancel(); }
context_ref ctx;
io_context::strand strand_;
stream_descriptor fd_;
shared_ptr<bool> token_{make_shared<bool>(true)};
bool read_in_progress_{false};
bool write_in_progress_{false};
};
watcher::watcher(strand &strand, int fd)
: ref{watcher_ref(new watcher_impl(strand, fd),
[](watcher_impl *p) { delete p; })} {}
void watcher::wait_read(watcher::handler h) { ref->wait_read(move(h)); }
void watcher::wait_write(watcher::handler h) { ref->wait_write(move(h)); }
void watcher::cancel() { ref->cancel(); }
} // namespace file_descriptor
} // namespace thespian::executor

157
src/hub.cpp Normal file
View file

@ -0,0 +1,157 @@
#include <thespian/debug.hpp>
#include <thespian/endpoint.hpp>
#include <thespian/hub.hpp>
#include <thespian/instance.hpp>
#include <deque>
#include <mutex>
#include <set>
#include <vector>
using cbor::buffer;
using cbor::extract;
using cbor::more;
using cbor::type;
using std::deque;
using std::lock_guard;
using std::make_shared;
using std::move;
using std::mutex;
using std::pair;
using std::set;
using std::shared_ptr;
using std::string;
using std::string_view;
using std::vector;
namespace thespian {
using filter = hub::filter;
hub::pipe::pipe() = default;
hub::pipe::~pipe() = default;
struct subscribe_queue : public hub::pipe {
[[nodiscard]] auto push(const handle &from, filter f) {
lock_guard<mutex> lock(m_);
q_.push_back(move(f));
return from.send("subscribe_filtered");
}
auto pop() -> filter {
lock_guard<mutex> lock(m_);
filter f = move(q_.front());
q_.pop_front();
return f;
}
private:
deque<filter> q_;
mutex m_;
};
struct hub_impl {
hub_impl(const hub_impl &) = delete;
hub_impl(hub_impl &&) = delete;
auto operator=(const hub_impl &) -> hub_impl & = delete;
auto operator=(hub_impl &&) -> hub_impl & = delete;
explicit hub_impl(shared_ptr<subscribe_queue> q) : q_(move(q)) {}
~hub_impl() = default;
auto receive(handle from, buffer msg) {
if (msg("broadcast", type::array)) {
msg.erase(msg.raw_cbegin(), msg.raw_cbegin() + 11);
for (auto const &s : subscribers_) {
result ret = ok();
if (s.second) {
if (s.second(msg)) {
ret = s.first.send_raw(msg);
}
} else {
ret = s.first.send_raw(msg);
}
// if (not ret)
// FIXME: remove dead subscriptions from subscribers_
}
return ok();
}
if (msg("subscribe")) {
subscribers_.emplace_back(move(from), filter{});
return ok();
}
if (msg("subscribe_filtered")) {
subscribers_.emplace_back(move(from), q_->pop());
return ok();
}
buffer::range r;
if (msg("subscribe_messages", extract(r))) {
set<string> msgs;
for (const auto val : r)
msgs.insert(string(val));
subscribers_.emplace_back(move(from), [msgs](auto m) {
string tag;
return m(extract(tag), more) && (msgs.find(tag) != msgs.end());
});
return ok();
}
string_view desc;
if (msg("listen", extract(desc))) {
thespian::endpoint::unx::listen(desc);
}
if (msg("shutdown"))
return exit("shutdown");
return unexpected(msg);
}
using subscriber_list_t = vector<pair<handle, filter>>;
subscriber_list_t subscribers_;
shared_ptr<subscribe_queue> q_;
};
hub::hub(const handle &h, shared_ptr<pipe> p) : handle(h), pipe_(move(p)) {}
auto hub::subscribe() const -> result {
link(*this);
return send("subscribe");
}
auto hub::subscribe(filter f) const -> result {
link(*this);
return dynamic_cast<subscribe_queue &>(*pipe_).push(
static_cast<const handle &>(*this), move(f));
}
auto hub::listen(string_view desc) const -> result {
return send("listen", desc);
}
auto hub::create(string_view name) -> expected<hub, error> {
auto q = make_shared<subscribe_queue>();
auto ret = spawn_link(
[=]() {
receive([p{make_shared<hub_impl>(q)}](auto from, auto m) {
return p->receive(move(from), move(m));
});
return ok();
},
name);
if (not ret)
return to_error(ret.error());
return hub{ret.value(), q};
}
auto operator==(const handle &a, const hub &b) -> bool {
return a == static_cast<const handle &>(b);
}
auto operator==(const hub &a, const handle &b) -> bool {
return static_cast<const handle &>(a) == b;
}
} // namespace thespian

1988
src/instance.cpp Normal file

File diff suppressed because it is too large Load diff

233
src/subprocess.zig Normal file
View file

@ -0,0 +1,233 @@
const std = @import("std");
const cbor = @import("cbor");
const tp = @import("thespian.zig");
pid: ?tp.pid,
stdin_behavior: std.ChildProcess.StdIo,
const Self = @This();
pub const max_chunk_size = 4096 - 32;
pub const Writer = std.io.Writer(*Self, error{Exit}, write);
pub const BufferedWriter = std.io.BufferedWriter(max_chunk_size, Writer);
pub fn init(a: std.mem.Allocator, argv: tp.message, tag: [:0]const u8, stdin_behavior: std.ChildProcess.StdIo) !Self {
return .{
.pid = try Proc.create(a, argv, tag, stdin_behavior),
.stdin_behavior = stdin_behavior,
};
}
pub fn deinit(self: *Self) void {
if (self.pid) |pid| {
pid.deinit();
self.pid = null;
}
}
pub fn write(self: *Self, bytes: []const u8) error{Exit}!usize {
try self.send(bytes);
return bytes.len;
}
pub fn send(self: *const Self, bytes_: []const u8) tp.result {
if (self.stdin_behavior != .Pipe) return tp.exit("cannot send to closed stdin");
const pid = if (self.pid) |pid| pid else return tp.exit_error(error.Closed);
var bytes = bytes_;
while (bytes.len > 0)
bytes = loop: {
if (bytes.len > max_chunk_size) {
try pid.send(.{ "stdin", bytes[0..max_chunk_size] });
break :loop bytes[max_chunk_size..];
} else {
try pid.send(.{ "stdin", bytes });
break :loop &[_]u8{};
}
};
}
pub fn close(self: *Self) tp.result {
defer self.deinit();
if (self.stdin_behavior == .Pipe)
if (self.pid) |pid| try pid.send(.{"stdin_close"});
}
pub fn writer(self: *Self) Writer {
return .{ .context = self };
}
pub fn bufferedWriter(self: *Self) BufferedWriter {
return .{ .unbuffered_writer = self.writer() };
}
const Proc = struct {
a: std.mem.Allocator,
receiver: Receiver,
args: std.heap.ArenaAllocator,
parent: tp.pid,
child: std.ChildProcess,
tag: [:0]const u8,
stdin_buffer: std.ArrayList(u8),
fd_stdin: ?tp.file_descriptor = null,
fd_stdout: ?tp.file_descriptor = null,
fd_stderr: ?tp.file_descriptor = null,
write_pending: bool = false,
stdin_close_pending: bool = false,
const Receiver = tp.Receiver(*Proc);
fn create(a: std.mem.Allocator, argv: tp.message, tag: [:0]const u8, stdin_behavior: std.ChildProcess.StdIo) !tp.pid {
const self: *Proc = try a.create(Proc);
var args = std.heap.ArenaAllocator.init(a);
const args_a = args.allocator();
var iter = argv.buf;
var len = cbor.decodeArrayHeader(&iter) catch return error.InvalidArgument;
var argv_ = try args_a.alloc([]const u8, len);
var arg: []const u8 = undefined;
var i: usize = 0;
while (len > 0) : (len -= 1) {
if (!(cbor.matchString(&iter, &arg) catch return error.InvalidArgument))
return error.InvalidArgument;
argv_[i] = try args_a.dupe(u8, arg);
i += 1;
}
var child = std.ChildProcess.init(argv_, a);
child.stdin_behavior = stdin_behavior;
child.stdout_behavior = .Pipe;
child.stderr_behavior = .Pipe;
self.* = .{
.a = a,
.receiver = Receiver.init(receive, self),
.args = args,
.parent = tp.self_pid().clone(),
.child = child,
.tag = try a.dupeZ(u8, tag),
.stdin_buffer = std.ArrayList(u8).init(a),
};
return tp.spawn_link(a, self, Proc.start, tag);
}
fn deinit(self: *Proc) void {
self.args.deinit();
if (self.fd_stdin) |fd| fd.deinit();
if (self.fd_stdout) |fd| fd.deinit();
if (self.fd_stderr) |fd| fd.deinit();
self.stdin_buffer.deinit();
self.parent.deinit();
self.a.free(self.tag);
}
fn start(self: *Proc) tp.result {
errdefer self.deinit();
self.child.spawn() catch |e| {
try self.parent.send(.{ self.tag, "term", e, 1 });
return tp.exit_normal();
};
_ = self.args.reset(.free_all);
if (self.child.stdin_behavior == .Pipe)
self.fd_stdin = tp.file_descriptor.init("stdin", self.child.stdin.?.handle) catch |e| return tp.exit_error(e);
self.fd_stdout = tp.file_descriptor.init("stdout", self.child.stdout.?.handle) catch |e| return tp.exit_error(e);
self.fd_stderr = tp.file_descriptor.init("stderr", self.child.stderr.?.handle) catch |e| return tp.exit_error(e);
if (self.fd_stdout) |fd_stdout| fd_stdout.wait_read() catch |e| return tp.exit_error(e);
if (self.fd_stderr) |fd_stderr| fd_stderr.wait_read() catch |e| return tp.exit_error(e);
tp.receive(&self.receiver);
}
fn receive(self: *Proc, _: tp.pid_ref, m: tp.message) tp.result {
errdefer self.deinit();
var bytes: []u8 = "";
var err: i64 = 0;
var err_msg: []u8 = "";
if (try m.match(.{ "fd", "stdout", "read_ready" })) {
try self.dispatch_stdout();
if (self.fd_stdout) |fd_stdout| fd_stdout.wait_read() catch |e| return tp.exit_error(e);
} else if (try m.match(.{ "fd", "stderr", "read_ready" })) {
try self.dispatch_stderr();
if (self.fd_stderr) |fd_stderr| fd_stderr.wait_read() catch |e| return tp.exit_error(e);
} else if (try m.match(.{ "fd", "stdin", "write_ready" })) {
if (self.stdin_buffer.items.len > 0) {
if (self.child.stdin) |stdin| {
const written = stdin.write(self.stdin_buffer.items) catch |e| return tp.exit_error(e);
self.write_pending = false;
defer {
if (self.stdin_close_pending and !self.write_pending)
self.stdin_close();
}
if (written == self.stdin_buffer.items.len) {
self.stdin_buffer.clearRetainingCapacity();
} else {
std.mem.copyForwards(u8, self.stdin_buffer.items, self.stdin_buffer.items[written..]);
self.stdin_buffer.items.len = self.stdin_buffer.items.len - written;
if (self.fd_stdin) |fd_stdin| {
fd_stdin.wait_write() catch |e| return tp.exit_error(e);
self.write_pending = true;
}
}
}
}
} else if (try m.match(.{ "stdin", tp.extract(&bytes) })) {
if (self.fd_stdin) |fd_stdin| {
self.stdin_buffer.appendSlice(bytes) catch |e| return tp.exit_error(e);
fd_stdin.wait_write() catch |e| return tp.exit_error(e);
self.write_pending = true;
}
} else if (try m.match(.{"stdin_close"})) {
if (self.write_pending) {
self.stdin_close_pending = true;
} else {
self.stdin_close();
}
} else if (try m.match(.{"stdout_close"})) {
if (self.child.stdout) |*fd| {
fd.close();
self.child.stdout = null;
}
} else if (try m.match(.{"stderr_close"})) {
if (self.child.stderr) |*fd| {
fd.close();
self.child.stderr = null;
}
} else if (try m.match(.{ "fd", tp.any, "read_error", tp.extract(&err), tp.extract(&err_msg) })) {
return tp.exit(err_msg);
}
}
fn stdin_close(self: *Proc) void {
if (self.child.stdin) |*fd| {
fd.close();
self.child.stdin = null;
}
}
fn dispatch_stdout(self: *Proc) tp.result {
var buffer: [max_chunk_size]u8 = undefined;
const bytes = self.child.stdout.?.read(&buffer) catch |e| return tp.exit_error(e);
if (bytes == 0)
return self.handle_terminate();
try self.parent.send(.{ self.tag, "stdout", buffer[0..bytes] });
}
fn dispatch_stderr(self: *Proc) tp.result {
var buffer: [max_chunk_size]u8 = undefined;
const bytes = self.child.stderr.?.read(&buffer) catch |e| return tp.exit_error(e);
if (bytes == 0)
return;
try self.parent.send(.{ self.tag, "stderr", buffer[0..bytes] });
}
fn handle_terminate(self: *Proc) tp.result {
const term = self.child.wait() catch |e| return tp.exit_error(e);
(switch (term) {
.Exited => |val| self.parent.send(.{ self.tag, "term", "exited", val }),
.Signal => |val| self.parent.send(.{ self.tag, "term", "signal", val }),
.Stopped => |val| self.parent.send(.{ self.tag, "term", "stop", val }),
.Unknown => |val| self.parent.send(.{ self.tag, "term", "unknown", val }),
}) catch {};
return tp.exit_normal();
}
};

716
src/thespian.zig Normal file
View file

@ -0,0 +1,716 @@
const std = @import("std");
const c = @cImport({
@cInclude("thespian/c/file_descriptor.h");
@cInclude("thespian/c/instance.h");
@cInclude("thespian/c/metronome.h");
@cInclude("thespian/c/timeout.h");
@cInclude("thespian/c/signal.h");
@cInclude("thespian/backtrace.h");
});
const cbor = @import("cbor");
pub const subprocess = @import("subprocess.zig");
pub const install_debugger = c.install_debugger;
pub const max_message_size = 8 * 4096;
threadlocal var message_buffer: [max_message_size]u8 = undefined;
threadlocal var error_message_buffer: [256]u8 = undefined;
threadlocal var error_buffer_tl: c.thespian_error = .{
.base = null,
.len = 0,
};
pub const result = error{Exit}!void;
pub const extract = cbor.extract;
pub const extract_cbor = cbor.extract_cbor;
pub const number = cbor.number;
pub const bytes = cbor.bytes;
pub const string = cbor.string;
pub const array = cbor.array;
pub const map = cbor.map;
pub const tag = cbor.tag;
pub const boolean = cbor.boolean;
pub const null_ = cbor.null_;
pub const any = cbor.any;
pub const more = cbor.more;
pub const Ownership = enum {
Wrapped,
Owned,
};
fn Pid(comptime own: Ownership) type {
return struct {
h: c.thespian_handle,
const ownership = own;
const Self = @This();
pub fn clone(self: Self) Pid(.Owned) {
return .{ .h = c.thespian_handle_clone(self.h) };
}
pub fn ref(self: Self) Pid(.Wrapped) {
return .{ .h = self.h };
}
pub fn deinit(self: *const Self) void {
comptime if (ownership == Ownership.Wrapped)
@compileError("cannot call deinit on Pid(.Wrapped)");
c.thespian_handle_destroy(self.h);
}
pub fn send_raw(self: Self, m: message) result {
const ret = c.thespian_handle_send_raw(self.h, m.to(c.cbor_buffer));
if (ret) |err|
return set_error(err.*);
}
pub fn send(self: Self, m: anytype) result {
return self.send_raw(message.fmt(m));
}
pub fn call(self: Self, request: anytype) !message {
return CallContext.call(self.ref(), message.fmt(request));
}
pub fn link(self: Self) result {
return c.thespian_link(self.h);
}
pub fn expired(self: Self) bool {
return c.thespian_handle_is_expired(self.h);
}
pub fn wait_expired(self: Self, timeout_ns: isize) !void {
var max_sleep: isize = timeout_ns;
while (!self.expired()) {
if (max_sleep <= 0) return error.Timeout;
std.time.sleep(100);
max_sleep -= 100;
}
}
};
}
pub const pid = Pid(.Owned);
pub const pid_ref = Pid(.Wrapped);
fn wrap_handle(h: c.thespian_handle) pid_ref {
return .{ .h = h };
}
fn wrap_pid(h: c.thespian_handle) pid {
return .{ .h = h };
}
pub const message = struct {
buf: buffer_type = "",
const Self = @This();
pub const buffer_type: type = []const u8;
pub const c_buffer_type = c.cbor_buffer;
pub fn fmt(value: anytype) Self {
return fmtbuf(&message_buffer, value) catch unreachable;
}
pub fn fmtbuf(buf: []u8, value: anytype) !Self {
const f = comptime switch (@typeInfo(@TypeOf(value))) {
.Struct => |info| if (info.is_tuple)
fmtbufInternal
else
@compileError("thespian.message template should be a tuple: " ++ @typeName(@TypeOf(value))),
else => fmtbufInternalScalar,
};
return f(buf, value);
}
fn fmtbufInternalScalar(buf: []u8, value: anytype) !Self {
return fmtbufInternal(buf, .{value});
}
fn fmtbufInternal(buf: []u8, value: anytype) !Self {
var stream = std.io.fixedBufferStream(buf);
try cbor.writeValue(stream.writer(), value);
return .{ .buf = stream.getWritten() };
}
pub fn len(self: Self) usize {
return self.buf.len;
}
pub fn from(span: anytype) Self {
return .{ .buf = span.base[0..span.len] };
}
pub fn to(self: *const Self, comptime T: type) T {
return T{ .base = self.buf.ptr, .len = self.buf.len };
}
pub fn clone(self: *const Self, a: std.mem.Allocator) !Self {
return .{ .buf = try a.dupe(u8, self.buf) };
}
pub fn to_json(self: *const Self, buffer: []u8) cbor.CborJsonError![]const u8 {
return cbor.toJson(self.buf, buffer);
}
pub const json_string_view = c.c_string_view;
const json_callback = *const fn (json_string_view) callconv(.C) void;
pub fn to_json_cb(self: *const Self, callback: json_callback) void {
c.cbor_to_json(self.to(c_buffer_type), callback);
}
pub fn match(self: Self, m: anytype) error{Exit}!bool {
return if (cbor.match(self.buf, m)) |ret| ret else |e| exit_error(e);
}
};
pub fn exit_message(e: anytype) message {
return message.fmtbuf(&error_message_buffer, .{ "exit", e }) catch unreachable;
}
pub fn exit_normal() result {
return set_error_msg(exit_message("normal"));
}
pub fn exit(e: []const u8) error{Exit} {
return set_error_msg(exit_message(e));
}
pub fn exit_fmt(comptime fmt: anytype, args: anytype) result {
var buf: [1024]u8 = undefined;
const msg = std.fmt.bufPrint(&buf, fmt, args) catch "FMTERROR";
return set_error_msg(exit_message(msg));
}
pub fn exit_error(e: anyerror) error{Exit} {
return switch (e) {
error.Exit => error.Exit,
else => set_error_msg(exit_message(e)),
};
}
pub fn unexpected(b: message) error{Exit} {
const txt = "UNEXPECTED_MESSAGE: ";
var buf: [max_message_size]u8 = undefined;
@memcpy(buf[0..txt.len], txt);
const json = b.to_json(buf[txt.len..]) catch |e| return exit_error(e);
return set_error_msg(message.fmt(.{ "exit", buf[0..(txt.len + json.len)] }));
}
pub fn error_message() []const u8 {
if (error_buffer_tl.base) |base|
return base[0..error_buffer_tl.len];
set_error_msg(exit_message("NOERROR")) catch {};
return error_message();
}
pub fn error_text() []const u8 {
const msg_: message = .{ .buf = error_message() };
var msg__: []const u8 = undefined;
return if (msg_.match(.{ "exit", extract(&msg__) }) catch false)
msg__
else
&[_]u8{};
}
pub fn self_pid() pid_ref {
return wrap_handle(c.thespian_self());
}
pub const trace_channel = c.thespian_trace_channel;
pub const channel = struct {
pub const send = c.thespian_trace_channel_send;
pub const receive = c.thespian_trace_channel_receive;
pub const lifetime = c.thespian_trace_channel_lifetime;
pub const link = c.thespian_trace_channel_link;
pub const execute = c.thespian_trace_channel_execute;
pub const udp = c.thespian_trace_channel_udp;
pub const tcp = c.thespian_trace_channel_tcp;
pub const timer = c.thespian_trace_channel_timer;
pub const metronome = c.thespian_trace_channel_metronome;
pub const endpoint = c.thespian_trace_channel_endpoint;
pub const signal = c.thespian_trace_channel_signal;
pub const event: c.thespian_trace_channel = 2048;
pub const widget: c.thespian_trace_channel = 4096;
pub const input: c.thespian_trace_channel = 8192;
pub const all = c.thespian_trace_channel_all;
};
pub const env = struct {
env: *c.thespian_env_t,
owned: bool = false,
const Self = @This();
pub fn get() Self {
return .{ .env = c.thespian_env_get() orelse unreachable, .owned = false };
}
pub fn init() Self {
return .{ .env = c.thespian_env_new() orelse unreachable, .owned = true };
}
pub fn clone(self: *const Self) Self {
return .{ .env = c.thespian_env_clone(self.env) orelse unreachable, .owned = true };
}
pub fn deinit(self: Self) void {
if (self.owned) c.thespian_env_destroy(self.env);
}
pub fn enable_all_channels(self: *const Self) void {
c.thespian_env_enable_all_channels(self.env);
}
pub fn disable_all_channels(self: *const Self) void {
c.thespian_env_disable_all_channels(self.env);
}
pub fn enable(self: *const Self, chan: c.thespian_trace_channel) void {
c.thespian_env_enable(self.env, chan);
}
pub fn disable(self: *const Self, chan: c.thespian_trace_channel) void {
c.thespian_env_disable(self.env, chan);
}
pub fn enabled(self: *const Self, chan: c.thespian_trace_channel) bool {
return c.thespian_env_enabled(self.env, chan);
}
pub const trace_handler = *const fn (c.cbor_buffer) callconv(.C) void;
pub fn on_trace(self: *const Self, h: trace_handler) void {
c.thespian_env_on_trace(self.env, h);
}
pub fn trace(self: *const Self, m: c.cbor_buffer) void {
c.thespian_env_trace(self.env, m);
}
pub fn is(self: *const Self, key: []const u8) bool {
return c.thespian_env_is(self.env, .{ .base = key.ptr, .len = key.len });
}
pub fn set(self: *const Self, key: []const u8, value: bool) void {
c.thespian_env_set(self.env, .{ .base = key.ptr, .len = key.len }, value);
}
pub fn num(self: *const Self, key: []const u8) i64 {
return c.thespian_env_num(self.env, .{ .base = key.ptr, .len = key.len });
}
pub fn num_set(self: *const Self, key: []const u8, value: i64) void {
c.thespian_env_num_set(self.env, .{ .base = key.ptr, .len = key.len }, value);
}
pub fn str(self: *const Self, key: []const u8) []const u8 {
const ret = c.thespian_env_str(self.env, .{ .base = key.ptr, .len = key.len });
return ret.base[0..ret.len];
}
pub fn str_set(self: *const Self, key: []const u8, value: []const u8) void {
c.thespian_env_str(self.env, .{ .base = key.ptr, .len = key.len }, .{ .base = value.ptr, .len = value.len });
}
pub fn proc(self: *const Self, key: []const u8) pid_ref {
return wrap_handle(c.thespian_env_proc(self.env, .{ .base = key.ptr, .len = key.len }));
}
pub fn proc_set(self: *const Self, key: []const u8, value: pid_ref) void {
c.thespian_env_proc_set(self.env, .{ .base = key.ptr, .len = key.len }, value.h);
}
};
pub fn trace(chan: trace_channel, value: anytype) void {
if (env.get().enabled(chan)) {
if (@TypeOf(value) == message) {
env.get().trace(value.to(message.c_buffer_type));
} else {
var trace_buffer: [512]u8 = undefined;
const m = message.fmtbuf(&trace_buffer, value);
env.get().trace(m.to(message.c_buffer_type));
}
}
}
pub const context = struct {
a: std.mem.Allocator,
context: c.thespian_context,
context_destroy: *const fn (?*anyopaque) callconv(.C) void,
const Self = @This();
pub fn init(a: std.mem.Allocator) !Self {
var ctx_destroy: c.thespian_context_destroy = null;
const ctx = c.thespian_context_create(&ctx_destroy) orelse return error.ThespianContextCreateFailed;
return .{
.a = a,
.context = ctx,
.context_destroy = ctx_destroy orelse return error.ThespianContextCreateFailed,
};
}
pub fn run(self: *Self) void {
c.thespian_context_run(self.context);
}
pub fn spawn_link(
self: *const Self,
data: anytype,
f: Behaviour(@TypeOf(data)).FunT,
name: [:0]const u8,
eh: anytype,
env_: ?*const env,
) !pid {
const Tclosure = Behaviour(@TypeOf(data));
var handle_: c.thespian_handle = null;
try neg_to_error(c.thespian_context_spawn_link(
self.context,
Tclosure.run,
try Tclosure.create(self.a, f, data),
exitHandlerRun(@TypeOf(eh)),
eh,
name,
if (env_) |env__| env__.env else null,
&handle_,
), error.ThespianSpawnFailed);
return .{ .h = handle_ };
}
pub fn deinit(self: *Self) void {
self.context_destroy(self.context);
}
};
fn exitHandlerRun(comptime T: type) c.thespian_exit_handler {
if (T == @TypeOf(null)) return null;
return switch (@typeInfo(T)) {
.Optional => |info| switch (@typeInfo(info.child)) {
.Pointer => |ptr_info| switch (ptr_info.size) {
.One => ptr_info.child.run,
else => @compileError("expected single item pointer, found: '" ++ @typeName(T) ++ "'"),
},
},
.Pointer => |ptr_info| switch (ptr_info.size) {
.One => ptr_info.child.run,
else => @compileError("expected single item pointer, found: '" ++ @typeName(T) ++ "'"),
},
else => @compileError("expected optional pointer or pointer type, found: '" ++ @typeName(T) ++ "'"),
};
}
pub fn receive(r: anytype) void {
const T = switch (@typeInfo(@TypeOf(r))) {
.Pointer => |ptr_info| switch (ptr_info.size) {
.One => ptr_info.child,
else => @compileError("invalid receiver type"),
},
else => @compileError("invalid receiver type"),
};
c.thespian_receive(T.run, r);
}
pub fn Receiver(comptime T: type) type {
return struct {
f: FunT,
data: T,
const FunT: type = *const fn (T, from: pid_ref, m: message) result;
const Self = @This();
pub fn init(f: FunT, data: T) Self {
return .{ .f = f, .data = data };
}
pub fn run(ostate: c.thespian_behaviour_state, from: c.thespian_handle, m: c.cbor_buffer) callconv(.C) c.thespian_result {
const state: *Self = @ptrCast(@alignCast(ostate orelse unreachable));
reset_error();
return to_result(state.f(state.data, wrap_handle(from), message.from(m)));
}
};
}
pub fn get_trap() bool {
return c.thespian_get_trap();
}
pub fn set_trap(on: bool) bool {
return c.thespian_set_trap(on);
}
pub fn spawn_link(
a: std.mem.Allocator,
data: anytype,
f: Behaviour(@TypeOf(data)).FunT,
name: [:0]const u8,
) !pid {
return spawn_link_env(a, data, f, name, env.get());
}
pub fn spawn_link_env(
a: std.mem.Allocator,
data: anytype,
f: Behaviour(@TypeOf(data)).FunT,
name: [:0]const u8,
env_: ?env,
) !pid {
const Tclosure = Behaviour(@TypeOf(data));
var handle_: c.thespian_handle = null;
try neg_to_error(c.thespian_spawn_link(
Tclosure.run,
try Tclosure.create(a, f, data),
name,
if (env_) |env__| env__.env else null,
&handle_,
), error.ThespianSpawnFailed);
return wrap_pid(handle_);
}
pub fn neg_to_error(errcode: c_int, errortype: anyerror) !void {
if (errcode < 0) return errortype;
}
pub fn nonzero_to_error(errcode: c_int, errortype: anyerror) !void {
if (errcode != 0) return errortype;
}
pub fn wrap(comptime f: anytype, errortype: anyerror) fn (std.meta.ArgsTuple(@TypeOf(f))) @TypeOf(errortype)!void {
const Local = struct {
pub fn wrapped(args: std.meta.ArgsTuple(@TypeOf(f))) @TypeOf(errortype)!void {
return nonzero_to_error(@call(.auto, f, args), errortype);
}
};
return Local.wrapped;
}
fn Behaviour(comptime T: type) type {
return struct {
a: std.mem.Allocator,
f: FunT,
data: T,
const FunT: type = *const fn (T) result;
const Self = @This();
pub fn create(a: std.mem.Allocator, f: FunT, data: T) !*Self {
const self: *Self = try a.create(Self);
self.* = .{ .a = a, .f = f, .data = data };
return self;
}
pub fn destroy(self: *Self) void {
self.a.destroy(self);
}
pub fn run(state: c.thespian_behaviour_state) callconv(.C) c.thespian_result {
const self: *Self = @ptrCast(@alignCast(state orelse unreachable));
defer self.destroy();
reset_error();
return to_result(self.f(self.data));
}
};
}
fn to_result(ret: result) c.thespian_result {
if (ret) |_| {
return null;
} else |_| {
const msg = error_message();
if (!(cbor.match(msg, .{ "exit", "normal" }) catch false)) {
if (env.get().is("dump-stack-trace")) {
const trace_ = @errorReturnTrace();
if (trace_) |t| std.debug.dumpStackTrace(t.*);
}
}
return &error_buffer_tl;
}
}
fn set_error_msg(m: message) error{Exit} {
return set_error(m.to(c.thespian_error));
}
fn set_error(e: c.thespian_error) error{Exit} {
error_buffer_tl = e;
return error.Exit;
}
pub fn reset_error() void {
error_buffer_tl = .{ .base = null, .len = 0 };
}
pub fn make_exit_handler(data: anytype, f: ExitHandler(@TypeOf(data)).FunT) ExitHandler(@TypeOf(data)) {
return ExitHandler(@TypeOf(data)).init(f, data);
}
fn ExitHandler(comptime T: type) type {
return struct {
a: ?std.mem.Allocator = null,
f: FunT,
data: T,
const FunT: type = *const fn (T, []const u8) void;
const Self = @This();
pub fn init(f: FunT, data: T) Self {
return .{ .f = f, .data = data };
}
pub fn create(a: std.mem.Allocator, f: FunT, data: T) !*Self {
const self: *Self = try a.create(Self);
self.* = .{ .a = a, .f = f, .data = data };
return self;
}
pub fn destroy(self: *Self) void {
if (self.a) |a_| a_.destroy(self);
}
pub fn run(state: c.thespian_exit_handler_state, msg: [*c]const u8, len: usize) callconv(.C) void {
const self: *Self = @ptrCast(@alignCast(state orelse unreachable));
defer self.destroy();
self.f(self.data, msg[0..len]);
}
};
}
pub const signal = struct {
handle: *c.struct_thespian_signal_handle,
const Self = @This();
pub fn init(signum: c_int, m: message) !Self {
return .{ .handle = c.thespian_signal_create(signum, m.to(c.cbor_buffer)) orelse return error.ThespianSignalInitFailed };
}
pub fn cancel(self: *const Self) !void {
return neg_to_error(c.thespian_signal_cancel(self.handle), error.ThespianSignalCancelFailed);
}
pub fn deinit(self: *const Self) void {
c.thespian_signal_destroy(self.handle);
}
};
pub const metronome = struct {
handle: *c.struct_thespian_metronome_handle,
const Self = @This();
pub fn init(tick_time_us: u64) !Self {
return .{ .handle = c.thespian_metronome_create_us(tick_time_us) orelse return error.ThespianMetronomeInitFailed };
}
pub fn init_ms(tick_time_us: u64) !Self {
return .{ .handle = c.thespian_metronome_create_ms(tick_time_us) orelse return error.ThespianMetronomeInitFailed };
}
pub fn start(self: *const Self) !void {
return neg_to_error(c.thespian_metronome_start(self.handle), error.ThespianMetronomeStartFailed);
}
pub fn stop(self: *const Self) !void {
return neg_to_error(c.thespian_metronome_stop(self.handle), error.ThespianMetronomeStopFailed);
}
pub fn deinit(self: *const Self) void {
c.thespian_metronome_destroy(self.handle);
}
};
pub const timeout = struct {
handle: *c.struct_thespian_timeout_handle,
const Self = @This();
pub fn init(tick_time_us: u64, m: message) !Self {
return .{ .handle = c.thespian_timeout_create_us(tick_time_us, m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed };
}
pub fn init_ms(tick_time_us: u64, m: message) !Self {
return .{ .handle = c.thespian_timeout_create_ms(tick_time_us, m.to(c.cbor_buffer)) orelse return error.ThespianTimeoutInitFailed };
}
pub fn start(self: *const Self) !void {
return neg_to_error(c.thespian_timeout_start(self.handle), error.ThespianTimeoutStartFailed);
}
pub fn stop(self: *const Self) !void {
return neg_to_error(c.thespian_timeout_stop(self.handle), error.ThespianTimeoutStopFailed);
}
pub fn deinit(self: *const Self) void {
c.thespian_timeout_destroy(self.handle);
}
};
pub const file_descriptor = struct {
handle: *c.struct_thespian_file_descriptor_handle,
const Self = @This();
pub fn init(tag_: []const u8, fd: i32) !Self {
return .{ .handle = c.thespian_file_descriptor_create(tag_.ptr, fd) orelse return error.ThespianFileDescriptorInitFailed };
}
pub fn wait_write(self: *const Self) !void {
return neg_to_error(c.thespian_file_descriptor_wait_write(self.handle), error.ThespianFileDescriptorWaitWriteFailed);
}
pub fn wait_read(self: *const Self) !void {
return neg_to_error(c.thespian_file_descriptor_wait_read(self.handle), error.ThespianFileDescriptorWaitReadFailed);
}
pub fn cancel(self: *const Self) !void {
return neg_to_error(c.thespian_file_descriptor_cancel(self.handle), error.ThespianFileDescriptorCancelFailed);
}
pub fn deinit(self: *const Self) void {
c.thespian_file_descriptor_destroy(self.handle);
}
};
const CallContext = struct {
receiver: ReceiverT,
to: pid_ref,
request: message,
response: ?message,
a: std.mem.Allocator,
const Self = @This();
const ReceiverT = Receiver(*Self);
pub fn call(to: pid_ref, request: message) !message {
var heap: [32 + 1024]u8 = undefined;
var fba = std.heap.FixedBufferAllocator.init(&heap);
const a = fba.allocator();
var self: Self = undefined;
const rec = ReceiverT.init(receive_, &self);
self = .{
.receiver = rec,
.to = to,
.request = request,
.response = null,
.a = a,
};
const proc = try spawn_link(a, &self, start, @typeName(Self));
while (!proc.expired()) {
std.time.sleep(50);
}
if (self.response) |resp| {
const m = message_buffer[0..resp.buf.len];
@memcpy(m, resp.buf);
return .{ .buf = m };
} else {
return .{};
}
}
fn start(self: *Self) result {
_ = set_trap(true);
try self.to.link();
try self.to.send_raw(self.request);
receive(&self.receiver);
}
fn receive_(self: *Self, from: pid_ref, m: message) result {
_ = from;
self.response = m.clone(self.a) catch |e| return exit_error(e);
try exit_normal();
}
};

166
src/trace.cpp Normal file
View file

@ -0,0 +1,166 @@
#include <thespian/trace.hpp>
#include <atomic>
#include <fstream>
#include <memory>
#include <mutex>
#include <sstream>
using cbor::any;
using cbor::array;
using cbor::buffer;
using cbor::extract;
using cbor::more;
using std::atomic;
using std::fstream;
using std::ios_base;
using std::lock_guard;
using std::make_shared;
using std::ostream;
using std::recursive_mutex;
using std::string;
using std::string_view;
using std::stringstream;
namespace thespian {
constexpr auto trace_file_flags =
ios_base::binary | ios_base::out | ios_base::ate;
const auto trace_buf_size = 64U * 1024U;
template <typename T, typename Q> struct trace_file {
trace_file(const trace_file &) = delete;
trace_file(const trace_file &&) = delete;
auto operator=(const trace_file &) -> trace_file & = delete;
auto operator=(trace_file &&) -> trace_file & = delete;
struct node {
Q data{};
node *next{};
};
atomic<node *> head{nullptr};
recursive_mutex m;
fstream s;
char buf[trace_buf_size]{}; // NOLINT
explicit trace_file(const string &file_name)
: s(file_name, trace_file_flags) {
s.rdbuf()->pubsetbuf(buf, trace_buf_size);
}
~trace_file() {
while (do_work())
;
}
auto reverse(node *p) -> node * {
node *prev{nullptr};
node *next{nullptr};
while (p) {
next = p->next;
p->next = prev;
prev = p;
p = next;
}
return prev;
}
auto push(Q data) -> void {
auto p = new node{move(data), head};
while (!head.compare_exchange_weak(p->next, p))
;
while (do_work())
;
}
auto do_work() -> bool {
if (!m.try_lock())
return false;
lock_guard<recursive_mutex> lock(m);
m.unlock();
auto p = head.load();
while (p && !head.compare_exchange_weak(p, nullptr)) {
;
}
if (!p)
return false;
p = reverse(p);
while (p) {
static_cast<T *>(this)->on_trace(move(p->data));
auto p_ = p;
p = p->next;
delete p_;
}
return true;
}
};
auto to_mermaid(ostream &s, const buffer &m) -> void {
string_view from;
string_view typ;
if (not m(A(any, any, extract(from)), extract(typ), more))
return;
if (from.empty())
from = "UNKNOWN";
if (typ == "spawn") {
s << " Note right of " << from << ": SPAWN\n";
} else if (typ == "receive") {
// ignore
} else if (typ == "run") {
s << " activate " << from << '\n';
} else if (typ == "sleep") {
s << " deactivate " << from << '\n';
} else if (typ == "send") {
string_view to;
buffer::range msg;
if (m(any, any, A(any, any, extract(to)), extract(msg))) {
s << " " << from << "->>" << to << ": send ";
msg.to_json(s);
s << '\n';
}
} else if (typ == "link") {
string_view to;
if (m(any, any, A(any, any, extract(to))))
s << " " << from << "-->>" << to << ": create link\n";
} else if (typ == "exit") {
string_view msg;
if (m(any, any, A(any, extract(msg), more)))
s << " Note right of " << from << ": EXIT " << msg << '\n';
}
}
static const auto cbor_cr = array("\n");
auto trace_to_cbor_file(const string &file_name) -> void {
struct cbor_file : trace_file<cbor_file, buffer> {
using trace_file::trace_file;
auto on_trace(const buffer &msg) -> void {
s.write(reinterpret_cast<const char *>(msg.data()), msg.size()); // NOLINT
s.write(reinterpret_cast<const char *>(cbor_cr.data()), // NOLINT
cbor_cr.size()); // NOLINT
}
};
auto f = make_shared<cbor_file>(file_name);
on_trace([f](auto m) { f->push(move(m)); });
}
auto trace_to_json_file(const string &file_name) -> void {
struct json_file : trace_file<json_file, buffer> {
using trace_file::trace_file;
auto on_trace(const buffer &msg) -> void {
msg.to_json(s);
s << '\n';
}
};
auto f = make_shared<json_file>(file_name);
on_trace([f](auto m) { f->push(move(m)); });
}
auto trace_to_mermaid_file(const string &file_name) -> void {
struct mermaid_file : trace_file<mermaid_file, buffer> {
explicit mermaid_file(const string &_file_name) : trace_file(_file_name) {
s << "sequenceDiagram\n";
}
auto on_trace(const buffer &msg) -> void { to_mermaid(s, msg); }
};
auto f = make_shared<mermaid_file>(file_name);
on_trace([f](auto m) { f->push(move(m)); });
}
} // namespace thespian