Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
ipc_server.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <cstddef>
5#include <cstdint>
6#include <exception>
7#include <functional>
8#include <memory>
9#include <span>
10#include <string>
11#include <sys/types.h>
12#include <utility>
13#include <vector>
14
15namespace bb::ipc {
16
22class ShutdownRequested : public std::exception {
23 std::vector<uint8_t> response_;
24
25 public:
26 explicit ShutdownRequested(std::vector<uint8_t> response)
27 : response_(std::move(response))
28 {}
29 const std::vector<uint8_t>& response() const { return response_; }
30 const char* what() const noexcept override { return "Server shutdown requested"; }
31};
32
39class IpcServer {
40 public:
41 IpcServer() = default;
42 virtual ~IpcServer() = default;
43
44 // Abstract interface - no copy or move
45 IpcServer(const IpcServer&) = delete;
46 IpcServer& operator=(const IpcServer&) = delete;
47 IpcServer(IpcServer&&) = delete;
49
54 virtual bool listen() = 0;
55
62 virtual int wait_for_data(uint64_t timeout_ns) = 0;
63
76 virtual std::span<const uint8_t> receive(int client_id) = 0;
77
88 virtual void release(int client_id, size_t message_size) = 0;
89
97 virtual bool send(int client_id, const void* data, size_t len) = 0;
98
102 virtual void close() = 0;
103
111 virtual void request_shutdown()
112 {
114 wakeup_all();
115 }
116
124
132 virtual int accept() { return -1; }
133
152 virtual void run(const Handler& handler)
153 {
155 // Try to accept new clients (non-blocking for socket servers)
156 accept();
157
158 int client_id = wait_for_data(100000000); // 100ms timeout
159 if (client_id < 0) {
160 // Timeout or error - check shutdown flag on next iteration
161 continue;
162 }
163
164 // Receive message (blocks until complete message available, zero-copy for SHM)
165 auto request = receive(client_id);
166 if (request.empty()) {
167 continue;
168 }
169
170 try {
171 auto response = handler(client_id, request);
172 if (!response.empty()) {
173 send(client_id, response.data(), response.size());
174 }
175
176 // Explicitly release/consume the message
177 release(client_id, request.size());
178 } catch (const ShutdownRequested& shutdown) {
179 // Release message before shutting down
180 release(client_id, request.size());
181
182 // Send final response before shutting down
183 if (!shutdown.response().empty()) {
184 send(client_id, shutdown.response().data(), shutdown.response().size());
185 }
186 // Graceful shutdown - exit loop and let destructors run
187 return;
188 }
189 }
190 }
191
192 // Factory methods
193 static std::unique_ptr<IpcServer> create_socket(const std::string& socket_path, int max_clients);
194 static std::unique_ptr<IpcServer> create_shm(const std::string& base_name,
195 size_t request_ring_size = static_cast<size_t>(1024 * 1024),
196 size_t response_ring_size = static_cast<size_t>(1024 * 1024));
197
198 protected:
199 std::atomic<bool> shutdown_requested_{ false };
200
207 virtual void wakeup_all() {};
208};
209
210} // namespace bb::ipc
Abstract interface for IPC server.
virtual int accept()
Accept a new client connection (optional for some transports)
IpcServer(const IpcServer &)=delete
virtual int wait_for_data(uint64_t timeout_ns)=0
Wait for data from any connected client.
virtual void close()=0
Close the server and all client connections.
static std::unique_ptr< IpcServer > create_socket(const std::string &socket_path, int max_clients)
IpcServer & operator=(IpcServer &&)=delete
static std::unique_ptr< IpcServer > create_shm(const std::string &base_name, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
virtual bool send(int client_id, const void *data, size_t len)=0
Send a message to a specific client.
virtual void wakeup_all()
Wake all blocked threads (for graceful shutdown)
std::function< std::vector< uint8_t >(int client_id, std::span< const uint8_t > request)> Handler
High-level request handler function type.
virtual bool listen()=0
Start listening for client connections.
virtual ~IpcServer()=default
virtual void release(int client_id, size_t message_size)=0
Release/consume the previously received message.
virtual std::span< const uint8_t > receive(int client_id)=0
Receive next message from a specific client.
virtual void request_shutdown()
Request graceful shutdown.
IpcServer(IpcServer &&)=delete
std::atomic< bool > shutdown_requested_
IpcServer & operator=(const IpcServer &)=delete
virtual void run(const Handler &handler)
Run server event loop with handler.
Exception thrown by handler to signal graceful shutdown.
ShutdownRequested(std::vector< uint8_t > response)
const std::vector< uint8_t > & response() const
const char * what() const noexcept override
std::vector< uint8_t > response_
const std::vector< MemoryValue > data
STL namespace.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
uint8_t len