18#elif defined(__APPLE__)
28 auto* original_cout_buf =
std::cout.rdbuf();
32 std::ostream stdout_stream(original_cout_buf);
35 while (!input_stream.eof()) {
38 input_stream.read(
reinterpret_cast<char*
>(&
length),
sizeof(
length));
40 if (input_stream.gcount() !=
sizeof(
length)) {
50 std::cerr <<
"Error: Incomplete msgpack buffer read" <<
'\n';
59 auto unpacked = msgpack::unpack(
reinterpret_cast<const char*
>(
buffer.data()),
buffer.size());
60 auto obj = unpacked.get();
64 if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) {
65 throw_or_abort(
"Expected an array of size 1 (tuple of arguments) for bbapi command deserialization");
69 auto& tuple_arr = obj.via.array;
70 auto& command_obj = tuple_arr.ptr[0];
74 if (command_obj.type != msgpack::type::ARRAY || command_obj.via.array.size != 2) {
75 throw_or_abort(
"Expected Command to be an array of size 2 [command-name, payload]");
79 auto& command_arr = command_obj.via.array;
80 if (command_arr.ptr[0].type != msgpack::type::STR) {
81 throw_or_abort(
"Expected first element of Command to be a string (type name)");
86 command_obj.convert(command);
92 msgpack::sbuffer response_buffer;
93 msgpack::pack(response_buffer, response);
96 uint32_t response_length =
static_cast<uint32_t
>(response_buffer.size());
97 stdout_stream.write(
reinterpret_cast<const char*
>(&response_length),
sizeof(response_length));
98 stdout_stream.write(response_buffer.data(),
static_cast<std::streamsize>(response_buffer.size()));
99 stdout_stream.flush();
110static void setup_parent_death_monitoring()
115 if (prctl(PR_SET_PDEATHSIG, SIGTERM) == -1) {
116 std::cerr <<
"Warning: Could not set parent death signal" <<
'\n';
118#elif defined(__APPLE__)
121 pid_t parent_pid = getppid();
122 std::thread([parent_pid]() {
125 std::cerr <<
"Warning: Could not create kqueue for parent monitoring" <<
'\n';
129 struct kevent change;
130 EV_SET(&change, parent_pid, EVFILT_PROC, EV_ADD | EV_ENABLE, NOTE_EXIT, 0,
nullptr);
131 if (kevent(kq, &change, 1,
nullptr, 0,
nullptr) == -1) {
132 std::cerr <<
"Warning: Could not monitor parent process" <<
'\n';
139 kevent(kq,
nullptr, 0, &
event, 1,
nullptr);
141 std::cerr <<
"Parent process exited, shutting down..." <<
'\n';
158 auto graceful_shutdown_handler = [](
int signal) {
159 std::cerr <<
"\nReceived signal " << signal <<
", shutting down gracefully..." <<
'\n';
167 auto fatal_error_handler = [](
int signal) {
168 const char* signal_name =
"UNKNOWN";
169 if (signal == SIGBUS) {
170 signal_name =
"SIGBUS";
171 }
else if (signal == SIGSEGV) {
172 signal_name =
"SIGSEGV";
174 std::cerr <<
"\nFatal error: received " << signal_name <<
" during initialization" <<
'\n';
175 std::cerr <<
"This likely means shared memory exhaustion (try reducing --max-clients)" <<
'\n';
179 global_server->
close();
185 (void)
std::signal(SIGTERM, graceful_shutdown_handler);
186 (void)
std::signal(SIGINT, graceful_shutdown_handler);
191 setup_parent_death_monitoring();
193 if (!server->listen()) {
194 std::cerr <<
"Error: Could not start IPC server" <<
'\n';
206 auto unpacked = msgpack::unpack(
reinterpret_cast<const char*
>(request.data()), request.size());
207 auto obj = unpacked.get();
211 if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) {
212 std::cerr <<
"Error: Expected an array of size 1 (tuple of arguments) from client " << client_id
218 auto& tuple_arr = obj.via.array;
219 auto& command_obj = tuple_arr.ptr[0];
223 if (command_obj.type != msgpack::type::ARRAY || command_obj.via.array.size != 2) {
224 std::cerr <<
"Error: Expected Command to be an array of size 2 [command-name, payload] from client "
225 << client_id <<
'\n';
230 auto& command_arr = command_obj.via.array;
231 if (command_arr.ptr[0].type != msgpack::type::STR) {
232 std::cerr <<
"Error: Expected first element of Command to be a string (type name) from client "
233 << client_id <<
'\n';
239 std::string command_name(command_arr.ptr[0].via.str.ptr, command_arr.ptr[0].via.str.size);
240 bool is_shutdown = (command_name ==
"Shutdown");
244 command_obj.convert(command);
248 msgpack::sbuffer response_buffer;
249 msgpack::pack(response_buffer, response);
250 std::vector<uint8_t> result(response_buffer.data(), response_buffer.data() + response_buffer.size());
262 }
catch (
const std::exception& e) {
264 std::cerr <<
"Error processing request from client " << client_id <<
": " << e.what() <<
'\n';
272 msgpack::sbuffer response_buffer;
273 msgpack::pack(response_buffer, response);
274 return std::vector<uint8_t>(response_buffer.data(), response_buffer.data() + response_buffer.size());
284 [[maybe_unused]]
int max_clients,
285 [[maybe_unused]]
size_t request_ring_size,
286 [[maybe_unused]]
size_t response_ring_size)
290 if (!msgpack_input_file.empty() && msgpack_input_file.size() >= 4 &&
291 msgpack_input_file.substr(msgpack_input_file.size() - 4) ==
".shm") {
293 std::string base_name = msgpack_input_file.substr(0, msgpack_input_file.size() - 4);
295 std::cerr <<
"Shared memory server at " << base_name <<
'\n';
300 if (!msgpack_input_file.empty() && msgpack_input_file.size() >= 5 &&
301 msgpack_input_file.substr(msgpack_input_file.size() - 5) ==
".sock") {
304 std::cerr <<
"Socket server at " << msgpack_input_file <<
'\n';
310 std::istream* input_stream = &
std::cin;
311 std::ifstream file_stream;
313 if (!msgpack_input_file.empty()) {
314 file_stream.open(msgpack_input_file, std::ios::binary);
315 if (!file_stream.is_open()) {
316 std::cerr <<
"Error: Could not open input file: " << msgpack_input_file <<
'\n';
319 input_stream = &file_stream;
A wrapper around std::variant that provides msgpack serialization based on type names.
Abstract interface for IPC server.
virtual void close()=0
Close the server and all client connections.
static std::unique_ptr< IpcServer > create_socket(const std::string &socket_path, int max_clients)
static std::unique_ptr< IpcServer > create_shm(const std::string &base_name, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
virtual void request_shutdown()
Request graceful shutdown.
Exception thrown by handler to signal graceful shutdown.
uint8_t const size_t length
uint8_t buffer[RANDOM_BUFFER_SIZE]
CommandResponse bbapi(Command &&command)
Main API function that processes commands and returns responses.
Entry point for Barretenberg command-line interface.
int execute_msgpack_ipc_server(std::unique_ptr< ipc::IpcServer > server)
Execute msgpack commands over IPC.
int execute_msgpack_run(const std::string &msgpack_input_file, int max_clients, size_t request_ring_size, size_t response_ring_size)
Execute msgpack run command.
int process_msgpack_commands(std::istream &input_stream)
Process msgpack API commands from an input stream.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
simulation::PublicDataTreeReadWriteEvent event
Error response returned when a command fails.
void throw_or_abort(std::string const &err)