Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
api_msgpack.cpp
Go to the documentation of this file.
5#include <cstdint>
6#include <fstream>
7#include <iostream>
8#include <string>
9#include <vector>
10
11#ifndef __wasm__
13#include <csignal>
14#include <thread>
15#include <unistd.h>
16#ifdef __linux__
17#include <sys/prctl.h>
18#elif defined(__APPLE__)
19#include <sys/event.h>
20#endif
21#endif
22
23namespace bb {
24
25int process_msgpack_commands(std::istream& input_stream)
26{
27 // Redirect std::cout to stderr to prevent accidental writes to stdout
28 auto* original_cout_buf = std::cout.rdbuf();
29 std::cout.rdbuf(std::cerr.rdbuf());
30
31 // Create an ostream that writes directly to stdout
32 std::ostream stdout_stream(original_cout_buf);
33
34 // Process length-encoded msgpack buffers
35 while (!input_stream.eof()) {
36 // Read 4-byte length prefix in little-endian format
37 uint32_t length = 0;
38 input_stream.read(reinterpret_cast<char*>(&length), sizeof(length));
39
40 if (input_stream.gcount() != sizeof(length)) {
41 // End of stream or incomplete length
42 break;
43 }
44
45 // Read the msgpack buffer
46 std::vector<uint8_t> buffer(length);
47 input_stream.read(reinterpret_cast<char*>(buffer.data()), static_cast<std::streamsize>(length));
48
49 if (input_stream.gcount() != static_cast<std::streamsize>(length)) {
50 std::cerr << "Error: Incomplete msgpack buffer read" << '\n';
51 // Restore original cout buffer before returning
52 std::cout.rdbuf(original_cout_buf);
53 return 1;
54 }
55
56 // Deserialize the msgpack buffer
57 // The buffer should contain a tuple of arguments (array) matching the bbapi function signature.
58 // Since bbapi(Command) takes one argument, we expect a 1-element array containing the Command.
59 auto unpacked = msgpack::unpack(reinterpret_cast<const char*>(buffer.data()), buffer.size());
60 auto obj = unpacked.get();
61
62 // First, expect an array (the tuple of arguments)
63 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
64 if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) {
65 throw_or_abort("Expected an array of size 1 (tuple of arguments) for bbapi command deserialization");
66 }
67
68 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
69 auto& tuple_arr = obj.via.array;
70 auto& command_obj = tuple_arr.ptr[0];
71
72 // Now access the Command itself, which should be an array of size 2 [command-name, payload]
73 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
74 if (command_obj.type != msgpack::type::ARRAY || command_obj.via.array.size != 2) {
75 throw_or_abort("Expected Command to be an array of size 2 [command-name, payload]");
76 }
77
78 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
79 auto& command_arr = command_obj.via.array;
80 if (command_arr.ptr[0].type != msgpack::type::STR) {
81 throw_or_abort("Expected first element of Command to be a string (type name)");
82 }
83
84 // Convert to Command (which is a NamedUnion)
85 bb::bbapi::Command command;
86 command_obj.convert(command);
87
88 // Execute the command
89 auto response = bbapi::bbapi(std::move(command));
90
91 // Serialize the response
92 msgpack::sbuffer response_buffer;
93 msgpack::pack(response_buffer, response);
94
95 // Write length-encoded response directly to stdout
96 uint32_t response_length = static_cast<uint32_t>(response_buffer.size());
97 stdout_stream.write(reinterpret_cast<const char*>(&response_length), sizeof(response_length));
98 stdout_stream.write(response_buffer.data(), static_cast<std::streamsize>(response_buffer.size()));
99 stdout_stream.flush();
100 }
101
102 // Restore original cout buffer
103 std::cout.rdbuf(original_cout_buf);
104 return 0;
105}
106
107#ifndef __wasm__
108// Set up platform-specific parent death monitoring
109// This ensures the bb process exits when the parent (Node.js) dies
110static void setup_parent_death_monitoring()
111{
112#ifdef __linux__
113 // Linux: Use prctl to request SIGTERM when parent dies
114 // This is kernel-level and very reliable
115 if (prctl(PR_SET_PDEATHSIG, SIGTERM) == -1) {
116 std::cerr << "Warning: Could not set parent death signal" << '\n';
117 }
118#elif defined(__APPLE__)
119 // macOS: Use kqueue to monitor parent process
120 // Spawn a dedicated thread that blocks waiting for parent to exit
121 pid_t parent_pid = getppid();
122 std::thread([parent_pid]() {
123 int kq = kqueue();
124 if (kq == -1) {
125 std::cerr << "Warning: Could not create kqueue for parent monitoring" << '\n';
126 return;
127 }
128
129 struct kevent change;
130 EV_SET(&change, parent_pid, EVFILT_PROC, EV_ADD | EV_ENABLE, NOTE_EXIT, 0, nullptr);
131 if (kevent(kq, &change, 1, nullptr, 0, nullptr) == -1) {
132 std::cerr << "Warning: Could not monitor parent process" << '\n';
133 close(kq);
134 return;
135 }
136
137 // Block until parent exits
138 struct kevent event;
139 kevent(kq, nullptr, 0, &event, 1, nullptr);
140
141 std::cerr << "Parent process exited, shutting down..." << '\n';
142 close(kq);
143 std::exit(0);
144 }).detach();
145#endif
146}
147
149{
150 // Store server pointer for signal handler cleanup (works for both socket and shared memory)
151 // MUST be set before listen() since SIGBUS can occur during listen()
152 static ipc::IpcServer* global_server = server.get();
153
154 // Register signal handlers for graceful cleanup
155 // MUST be registered before listen() since SIGBUS can occur during initialization
156 // SIGTERM: Sent by processes/test frameworks on shutdown
157 // SIGINT: Sent by Ctrl+C
158 auto graceful_shutdown_handler = [](int signal) {
159 std::cerr << "\nReceived signal " << signal << ", shutting down gracefully..." << '\n';
160 if (global_server) {
161 global_server->request_shutdown();
162 }
163 };
164
165 // Register handlers for fatal memory errors (SIGBUS, SIGSEGV)
166 // These occur when shared memory exhaustion happens during initialization
167 auto fatal_error_handler = [](int signal) {
168 const char* signal_name = "UNKNOWN";
169 if (signal == SIGBUS) {
170 signal_name = "SIGBUS";
171 } else if (signal == SIGSEGV) {
172 signal_name = "SIGSEGV";
173 }
174 std::cerr << "\nFatal error: received " << signal_name << " during initialization" << '\n';
175 std::cerr << "This likely means shared memory exhaustion (try reducing --max-clients)" << '\n';
176
177 // Clean up IPC resources before exiting
178 if (global_server) {
179 global_server->close();
180 }
181
182 std::exit(1);
183 };
184
185 (void)std::signal(SIGTERM, graceful_shutdown_handler);
186 (void)std::signal(SIGINT, graceful_shutdown_handler);
187 (void)std::signal(SIGBUS, fatal_error_handler);
188 (void)std::signal(SIGSEGV, fatal_error_handler);
189
190 // Set up parent death monitoring (kills this process when parent dies)
191 setup_parent_death_monitoring();
192
193 if (!server->listen()) {
194 std::cerr << "Error: Could not start IPC server" << '\n';
195 return 1;
196 }
197
198 std::cerr << "IPC server ready" << '\n';
199
200 // Run server with msgpack handler
201 server->run([](int client_id, std::span<const uint8_t> request) -> std::vector<uint8_t> {
202 try {
203 // Deserialize msgpack command
204 // The buffer should contain a tuple of arguments (array) matching the bbapi function signature.
205 // Since bbapi(Command) takes one argument, we expect a 1-element array containing the Command.
206 auto unpacked = msgpack::unpack(reinterpret_cast<const char*>(request.data()), request.size());
207 auto obj = unpacked.get();
208
209 // First, expect an array (the tuple of arguments)
210 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
211 if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) {
212 std::cerr << "Error: Expected an array of size 1 (tuple of arguments) from client " << client_id
213 << '\n';
214 return {}; // Return empty to skip response
215 }
216
217 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
218 auto& tuple_arr = obj.via.array;
219 auto& command_obj = tuple_arr.ptr[0];
220
221 // Now access the Command itself, which should be an array of size 2 [command-name, payload]
222 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
223 if (command_obj.type != msgpack::type::ARRAY || command_obj.via.array.size != 2) {
224 std::cerr << "Error: Expected Command to be an array of size 2 [command-name, payload] from client "
225 << client_id << '\n';
226 return {};
227 }
228
229 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
230 auto& command_arr = command_obj.via.array;
231 if (command_arr.ptr[0].type != msgpack::type::STR) {
232 std::cerr << "Error: Expected first element of Command to be a string (type name) from client "
233 << client_id << '\n';
234 return {};
235 }
236
237 // Check if this is a Shutdown command
238 // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
239 std::string command_name(command_arr.ptr[0].via.str.ptr, command_arr.ptr[0].via.str.size);
240 bool is_shutdown = (command_name == "Shutdown");
241
242 // Convert to Command and execute
243 bb::bbapi::Command command;
244 command_obj.convert(command);
245 auto response = bbapi::bbapi(std::move(command));
246
247 // Serialize response
248 msgpack::sbuffer response_buffer;
249 msgpack::pack(response_buffer, response);
250 std::vector<uint8_t> result(response_buffer.data(), response_buffer.data() + response_buffer.size());
251
252 // If this was a shutdown command, throw exception with response
253 // This signals the server to send the response and then exit gracefully
254 if (is_shutdown) {
255 throw ipc::ShutdownRequested(std::move(result));
256 }
257
258 return result;
259 } catch (const ipc::ShutdownRequested&) {
260 // Re-throw shutdown request
261 throw;
262 } catch (const std::exception& e) {
263 // Log error to stderr for debugging (goes to log file if logger enabled)
264 std::cerr << "Error processing request from client " << client_id << ": " << e.what() << '\n';
265 std::cerr.flush();
266
267 // Create error response with exception message
268 bb::bbapi::ErrorResponse error_response{ .message = std::string(e.what()) };
269 bb::bbapi::CommandResponse response = error_response;
270
271 // Serialize and return error response to client
272 msgpack::sbuffer response_buffer;
273 msgpack::pack(response_buffer, response);
274 return std::vector<uint8_t>(response_buffer.data(), response_buffer.data() + response_buffer.size());
275 }
276 });
277
278 server->close();
279 return 0;
280}
281#endif
282
283int execute_msgpack_run(const std::string& msgpack_input_file,
284 [[maybe_unused]] int max_clients,
285 [[maybe_unused]] size_t request_ring_size,
286 [[maybe_unused]] size_t response_ring_size)
287{
288#ifndef __wasm__
289 // Check if this is a shared memory path (ends with .shm)
290 if (!msgpack_input_file.empty() && msgpack_input_file.size() >= 4 &&
291 msgpack_input_file.substr(msgpack_input_file.size() - 4) == ".shm") {
292 // Strip .shm suffix to get base name
293 std::string base_name = msgpack_input_file.substr(0, msgpack_input_file.size() - 4);
294 auto server = ipc::IpcServer::create_shm(base_name, request_ring_size, response_ring_size);
295 std::cerr << "Shared memory server at " << base_name << '\n';
297 }
298
299 // Check if this is a Unix domain socket path (ends with .sock)
300 if (!msgpack_input_file.empty() && msgpack_input_file.size() >= 5 &&
301 msgpack_input_file.substr(msgpack_input_file.size() - 5) == ".sock") {
302 // Socket server still supports max_clients (multiple clients via MPSC)
303 auto server = ipc::IpcServer::create_socket(msgpack_input_file, max_clients);
304 std::cerr << "Socket server at " << msgpack_input_file << '\n';
306 }
307#endif
308
309 // Process msgpack API commands from stdin or file
310 std::istream* input_stream = &std::cin;
311 std::ifstream file_stream;
312
313 if (!msgpack_input_file.empty()) {
314 file_stream.open(msgpack_input_file, std::ios::binary);
315 if (!file_stream.is_open()) {
316 std::cerr << "Error: Could not open input file: " << msgpack_input_file << '\n';
317 return 1;
318 }
319 input_stream = &file_stream;
320 }
321
322 return process_msgpack_commands(*input_stream);
323}
324
325} // namespace bb
A wrapper around std::variant that provides msgpack serialization based on type names.
Abstract interface for IPC server.
virtual void close()=0
Close the server and all client connections.
static std::unique_ptr< IpcServer > create_socket(const std::string &socket_path, int max_clients)
static std::unique_ptr< IpcServer > create_shm(const std::string &base_name, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
virtual void request_shutdown()
Request graceful shutdown.
Exception thrown by handler to signal graceful shutdown.
uint8_t const size_t length
Definition data_store.hpp:9
uint8_t buffer[RANDOM_BUFFER_SIZE]
Definition engine.cpp:34
CommandResponse bbapi(Command &&command)
Main API function that processes commands and returns responses.
Definition c_bind.cpp:24
Entry point for Barretenberg command-line interface.
Definition api.hpp:5
int execute_msgpack_ipc_server(std::unique_ptr< ipc::IpcServer > server)
Execute msgpack commands over IPC.
int execute_msgpack_run(const std::string &msgpack_input_file, int max_clients, size_t request_ring_size, size_t response_ring_size)
Execute msgpack run command.
int process_msgpack_commands(std::istream &input_stream)
Process msgpack API commands from an input stream.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
simulation::PublicDataTreeReadWriteEvent event
Error response returned when a command fails.
void throw_or_abort(std::string const &err)