Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
shm_server.hpp
Go to the documentation of this file.
1#pragma once
2
4#include "ipc_server.hpp"
5#include "shm/spsc_shm.hpp"
6#include "shm_common.hpp"
7#include <cstdint>
8#include <cstring>
9#include <fcntl.h>
10#include <iostream>
11#include <optional>
12#include <string>
13#include <sys/mman.h>
14#include <sys/types.h>
15#include <unistd.h>
16#include <utility>
17
18namespace bb::ipc {
19
26class ShmServer : public IpcServer {
27 public:
28 static constexpr size_t DEFAULT_RING_SIZE = 1 << 20; // 1MB
29
30 ShmServer(std::string base_name,
31 size_t request_ring_size = DEFAULT_RING_SIZE,
32 size_t response_ring_size = DEFAULT_RING_SIZE)
33 : base_name_(std::move(base_name))
34 , request_ring_size_(request_ring_size)
35 , response_ring_size_(response_ring_size)
36 {}
37
38 ~ShmServer() override { close(); }
39
40 // Non-copyable, non-movable (owns shared memory resources)
41 ShmServer(const ShmServer&) = delete;
42 ShmServer& operator=(const ShmServer&) = delete;
43 ShmServer(ShmServer&&) = delete;
45
46 bool listen() override
47 {
48 if (request_ring_.has_value()) {
49 return true; // Already listening
50 }
51
52 // Clean up any leftover shared memory
53 std::string req_name = base_name_ + "_request";
54 std::string resp_name = base_name_ + "_response";
55 SpscShm::unlink(req_name);
56 SpscShm::unlink(resp_name);
57
58 try {
59 // Create SPSC ring for requests (client writes, server reads)
61
62 // Create SPSC ring for responses (server writes, client reads)
64
65 return true;
66 } catch (...) {
67 close(); // Cleanup on failure
68 return false;
69 }
70 }
71
72 int wait_for_data(uint64_t timeout_ns) override
73 {
74 assert(request_ring_);
75 if (!request_ring_.has_value()) {
76 return -1;
77 }
78
79 // Wait for data in request ring, return client ID 0 (always single client)
80 if (request_ring_->wait_for_data(sizeof(uint32_t), static_cast<uint32_t>(timeout_ns))) {
81 return 0; // Single client, always ID 0
82 }
83 return -1; // Timeout
84 }
85
86 std::span<const uint8_t> receive([[maybe_unused]] int client_id) override
87 {
88 if (!request_ring_.has_value()) {
89 return {};
90 }
91 // TODO: Plumb timeout.
92 return ring_receive_msg(request_ring_.value(), 100000000); // 100ms timeout
93 }
94
95 void release([[maybe_unused]] int client_id, size_t message_size) override
96 {
97 if (!request_ring_.has_value()) {
98 return;
99 }
100 request_ring_->release(sizeof(uint32_t) + message_size);
101 }
102
103 bool send([[maybe_unused]] int client_id, const void* data, size_t len) override
104 {
105 if (!response_ring_.has_value()) {
106 return false;
107 }
108 return ring_send_msg(response_ring_.value(), data, len, 100000000);
109 }
110
111 void close() override
112 {
113 // Close rings
114 request_ring_.reset();
115 response_ring_.reset();
116
117 // Clean up shared memory
118 std::string req_name = base_name_ + "_request";
119 std::string resp_name = base_name_ + "_response";
120 SpscShm::unlink(req_name);
121 SpscShm::unlink(resp_name);
122 }
123
124 void wakeup_all() override
125 {
126 // Wake any threads blocked in wait/peek/claim
127 if (request_ring_.has_value()) {
128 request_ring_->wakeup_all();
129 }
130 if (response_ring_.has_value()) {
131 response_ring_->wakeup_all();
132 }
133 }
134
135 void debug_dump() const
136 {
137 if (request_ring_.has_value()) {
138 request_ring_->debug_dump("Server REQ");
139 }
140 if (response_ring_.has_value()) {
141 response_ring_->debug_dump("Server RESP");
142 }
143 }
144
145 private:
146 std::string base_name_;
149 std::optional<SpscShm> request_ring_; // Server reads from this
150 std::optional<SpscShm> response_ring_; // Server writes to this
151};
152
153} // namespace bb::ipc
Abstract interface for IPC server.
IPC server implementation using shared memory.
void close() override
Close the server and all client connections.
ShmServer(std::string base_name, size_t request_ring_size=DEFAULT_RING_SIZE, size_t response_ring_size=DEFAULT_RING_SIZE)
int wait_for_data(uint64_t timeout_ns) override
Wait for data from any connected client.
ShmServer(const ShmServer &)=delete
std::span< const uint8_t > receive(int client_id) override
Receive next message from a specific client.
~ShmServer() override
std::string base_name_
void debug_dump() const
std::optional< SpscShm > request_ring_
void wakeup_all() override
Wake all blocked threads (for graceful shutdown)
ShmServer & operator=(const ShmServer &)=delete
static constexpr size_t DEFAULT_RING_SIZE
ShmServer & operator=(ShmServer &&)=delete
bool send(int client_id, const void *data, size_t len) override
Send a message to a specific client.
void release(int client_id, size_t message_size) override
Release/consume the previously received message.
std::optional< SpscShm > response_ring_
bool listen() override
Start listening for client connections.
ShmServer(ShmServer &&)=delete
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
Definition spsc_shm.cpp:192
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
Definition spsc_shm.cpp:96
const std::vector< MemoryValue > data
bool ring_send_msg(SpscShm &ring, const void *data, size_t len, uint64_t timeout_ns)
std::span< const uint8_t > ring_receive_msg(SpscShm &ring, uint64_t timeout_ns)
STL namespace.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
uint8_t len
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)