Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
mpsc_shm.cpp
Go to the documentation of this file.
1#include "mpsc_shm.hpp"
2#include "futex.hpp"
3#include "utilities.hpp"
4#include <atomic>
5#include <cerrno>
6#include <climits>
7#include <cstdint>
8#include <cstring>
9#include <fcntl.h>
10#include <stdexcept>
11#include <string>
12#include <sys/mman.h>
13#include <unistd.h>
14#include <utility>
15#include <vector>
16
17namespace bb::ipc {
18
19// ----- MpscConsumer Implementation -----
20
21MpscConsumer::MpscConsumer(std::vector<SpscShm>&& rings, int doorbell_fd, size_t doorbell_len, MpscDoorbell* doorbell)
22 : rings_(std::move(rings))
23 , doorbell_fd_(doorbell_fd)
24 , doorbell_len_(doorbell_len)
25 , doorbell_(doorbell)
26{}
27
29 : rings_(std::move(other.rings_))
30 , doorbell_fd_(other.doorbell_fd_)
31 , doorbell_len_(other.doorbell_len_)
32 , doorbell_(other.doorbell_)
33 , last_served_(other.last_served_)
34{
35 other.doorbell_fd_ = -1;
36 other.doorbell_len_ = 0;
37 other.doorbell_ = nullptr;
38 other.last_served_ = 0;
39}
40
42{
43 if (this != &other) {
44 // Clean up current resources
45 if (doorbell_ != nullptr) {
46 munmap(doorbell_, doorbell_len_);
47 }
48 if (doorbell_fd_ >= 0) {
49 ::close(doorbell_fd_);
50 }
51
52 // Move from other
53 rings_ = std::move(other.rings_);
54 doorbell_fd_ = other.doorbell_fd_;
55 doorbell_len_ = other.doorbell_len_;
56 doorbell_ = other.doorbell_;
57 last_served_ = other.last_served_;
58
59 // Clear other
60 other.doorbell_fd_ = -1;
61 other.doorbell_len_ = 0;
62 other.doorbell_ = nullptr;
63 other.last_served_ = 0;
64 }
65 return *this;
66}
67
69{
70 if (doorbell_ != nullptr) {
71 munmap(doorbell_, doorbell_len_);
72 }
73 if (doorbell_fd_ >= 0) {
74 ::close(doorbell_fd_);
75 }
76}
77
78MpscConsumer MpscConsumer::create(const std::string& name, size_t num_producers, size_t ring_capacity)
79{
80 if (name.empty() || num_producers == 0) {
81 throw std::runtime_error("MpscConsumer::create: invalid arguments");
82 }
83
84 // Create doorbell shared memory
85 std::string doorbell_name = name + "_doorbell";
86 size_t doorbell_len = sizeof(MpscDoorbell);
87
88 int doorbell_fd = shm_open(doorbell_name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0600);
89 if (doorbell_fd < 0) {
90 throw std::runtime_error("MpscConsumer::create: shm_open doorbell failed: " +
91 std::string(std::strerror(errno)));
92 }
93
94 if (ftruncate(doorbell_fd, static_cast<off_t>(doorbell_len)) != 0) {
95 int e = errno;
96 ::close(doorbell_fd);
97 shm_unlink(doorbell_name.c_str());
98 throw std::runtime_error("MpscConsumer::create: ftruncate doorbell failed: " + std::string(std::strerror(e)));
99 }
100
101 auto* doorbell =
102 static_cast<MpscDoorbell*>(mmap(nullptr, doorbell_len, PROT_READ | PROT_WRITE, MAP_SHARED, doorbell_fd, 0));
103 if (doorbell == MAP_FAILED) {
104 int e = errno;
105 ::close(doorbell_fd);
106 shm_unlink(doorbell_name.c_str());
107 throw std::runtime_error("MpscConsumer::create: mmap doorbell failed: " + std::string(std::strerror(e)));
108 }
109
110 // Initialize doorbell (use placement new to avoid memset on non-trivial type)
111 new (doorbell) MpscDoorbell{};
112 doorbell->consumer_blocked.store(false, std::memory_order_release);
113
114 // Create all SPSC rings
116 rings.reserve(num_producers);
117
118 try {
119 for (size_t i = 0; i < num_producers; i++) {
120 std::string ring_name = name + "_ring_" + std::to_string(i);
121 rings.push_back(SpscShm::create(ring_name, ring_capacity));
122 }
123 } catch (...) {
124 // Cleanup on failure
125 for (size_t i = 0; i < rings.size(); i++) {
126 std::string ring_name = name + "_ring_" + std::to_string(i);
127 SpscShm::unlink(ring_name);
128 }
129 munmap(doorbell, doorbell_len);
130 ::close(doorbell_fd);
131 shm_unlink(doorbell_name.c_str());
132 throw;
133 }
134
135 return MpscConsumer(std::move(rings), doorbell_fd, doorbell_len, doorbell);
136}
137
138bool MpscConsumer::unlink(const std::string& name, size_t num_producers)
139{
140 std::string doorbell_name = name + "_doorbell";
141 shm_unlink(doorbell_name.c_str());
142
143 for (size_t i = 0; i < num_producers; i++) {
144 std::string ring_name = name + "_ring_" + std::to_string(i);
145 SpscShm::unlink(ring_name);
146 }
147
148 return true;
149}
150
151int MpscConsumer::wait_for_data(uint32_t timeout_ns)
152{
153 size_t num_rings = rings_.size();
154
155 // Phase 1: Quick poll - check if data already available
156 for (size_t i = 0; i < num_rings; i++) {
157 size_t idx = (last_served_ + 1 + i) % num_rings;
158 if (rings_[idx].available() > 0) {
159 last_served_ = idx;
160 previous_had_data_ = true; // Found data - enable spinning on next call
161 return static_cast<int>(idx);
162 }
163 }
164
165 // Adaptive spinning: only spin if previous call found data
166 constexpr uint64_t SPIN_NS = 100000; // 100us
167 uint64_t spin_duration;
168 uint64_t remaining_timeout;
169
170 if (previous_had_data_) {
171 // Previous call found data - do full spin (optimistic)
172 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
173 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
174 } else {
175 // Previous call timed out - skip spinning (idle channel)
176 spin_duration = 0;
177 remaining_timeout = timeout_ns;
178 }
179
180 // Phase 2: Spin phase (only if previous call found data)
181 if (spin_duration > 0) {
182 uint64_t start = mono_ns_now();
183 // NOLINTNEXTLINE(cppcoreguidelines-avoid-do-while)
184 do {
185 for (size_t i = 0; i < num_rings; i++) {
186 size_t idx = (last_served_ + 1 + i) % num_rings;
187 if (rings_[idx].available() > 0) {
188 last_served_ = idx;
189 previous_had_data_ = true; // Found data during spin
190 return static_cast<int>(idx);
191 }
192 }
193 IPC_PAUSE();
194 } while ((mono_ns_now() - start) < spin_duration);
195
196 // Check after spin
197 for (size_t i = 0; i < num_rings; i++) {
198 size_t idx = (last_served_ + 1 + i) % num_rings;
199 if (rings_[idx].available() > 0) {
200 last_served_ = idx;
201 previous_had_data_ = true; // Found data after spin
202 return static_cast<int>(idx);
203 }
204 }
205 }
206
207 // No more time or didn't spin - check if we can block
208 if (remaining_timeout == 0) {
209 previous_had_data_ = false; // Timeout - disable spinning on next call
210 return -1;
211 }
212
213 // About to block - load seq, final check, then block
214 uint32_t seq = doorbell_->seq.load(std::memory_order_acquire);
215
216 // Final check before blocking
217 for (size_t i = 0; i < num_rings; i++) {
218 size_t idx = (last_served_ + 1 + i) % num_rings;
219 if (rings_[idx].available() > 0) {
220 last_served_ = idx;
221 previous_had_data_ = true; // Found data before blocking
222 return static_cast<int>(idx);
223 }
224 }
225
226 // Set blocked flag RIGHT BEFORE futex_wait
228 futex_wait_timeout(reinterpret_cast<volatile uint32_t*>(&doorbell_->seq), seq, remaining_timeout);
229 // Clear blocked flag RIGHT AFTER futex_wait returns
231
232 // After waking, poll again
233 for (size_t i = 0; i < num_rings; i++) {
234 size_t idx = (last_served_ + 1 + i) % num_rings;
235 if (rings_[idx].available() > 0) {
236 last_served_ = idx;
237 previous_had_data_ = true; // Found data after waking
238 return static_cast<int>(idx);
239 }
240 }
241
242 previous_had_data_ = false; // Timeout or spurious wakeup - disable spinning on next call
243 return -1; // No data available (timeout or spurious wakeup)
244}
245
246void* MpscConsumer::peek(size_t ring_idx, size_t want, uint32_t timeout_ns)
247{
248 if (ring_idx >= rings_.size()) {
249 return nullptr;
250 }
251 return rings_[ring_idx].peek(want, timeout_ns);
252}
253
254void MpscConsumer::release(size_t ring_idx, size_t n)
255{
256 if (ring_idx < rings_.size()) {
257 rings_[ring_idx].release(n);
258 }
259}
260
262{
263 // Wake consumer blocked on doorbell
264 futex_wake(reinterpret_cast<volatile uint32_t*>(&doorbell_->seq), INT_MAX);
265
266 // Wake all producers blocked on their rings
267 for (auto& ring : rings_) {
268 ring.wakeup_all();
269 }
270}
271
272// ----- MpscProducer Implementation -----
273
275 SpscShm&& ring, int doorbell_fd, size_t doorbell_len, MpscDoorbell* doorbell, size_t producer_id)
276 : ring_(std::move(ring))
277 , doorbell_fd_(doorbell_fd)
278 , doorbell_len_(doorbell_len)
279 , doorbell_(doorbell)
280 , producer_id_(producer_id)
281{}
282
284 : ring_(std::move(other.ring_))
285 , doorbell_fd_(other.doorbell_fd_)
286 , doorbell_len_(other.doorbell_len_)
287 , doorbell_(other.doorbell_)
288 , producer_id_(other.producer_id_)
289{
290 other.doorbell_fd_ = -1;
291 other.doorbell_len_ = 0;
292 other.doorbell_ = nullptr;
293 other.producer_id_ = 0;
294}
295
297{
298 if (this != &other) {
299 // Clean up current resources
300 if (doorbell_ != nullptr) {
301 munmap(doorbell_, doorbell_len_);
302 }
303 if (doorbell_fd_ >= 0) {
304 ::close(doorbell_fd_);
305 }
306
307 // Move from other
308 ring_ = std::move(other.ring_);
309 doorbell_fd_ = other.doorbell_fd_;
310 doorbell_len_ = other.doorbell_len_;
311 doorbell_ = other.doorbell_;
312 producer_id_ = other.producer_id_;
313
314 // Clear other
315 other.doorbell_fd_ = -1;
316 other.doorbell_len_ = 0;
317 other.doorbell_ = nullptr;
318 other.producer_id_ = 0;
319 }
320 return *this;
321}
322
324{
325 if (doorbell_ != nullptr) {
326 munmap(doorbell_, doorbell_len_);
327 }
328 if (doorbell_fd_ >= 0) {
329 ::close(doorbell_fd_);
330 }
331}
332
333MpscProducer MpscProducer::connect(const std::string& name, size_t producer_id)
334{
335 if (name.empty()) {
336 throw std::runtime_error("MpscProducer::connect: empty name");
337 }
338
339 // Connect to doorbell
340 std::string doorbell_name = name + "_doorbell";
341 size_t doorbell_len = sizeof(MpscDoorbell);
342
343 int doorbell_fd = shm_open(doorbell_name.c_str(), O_RDWR, 0600);
344 if (doorbell_fd < 0) {
345 throw std::runtime_error("MpscProducer::connect: shm_open doorbell failed: " +
346 std::string(std::strerror(errno)));
347 }
348
349 auto* doorbell =
350 static_cast<MpscDoorbell*>(mmap(nullptr, doorbell_len, PROT_READ | PROT_WRITE, MAP_SHARED, doorbell_fd, 0));
351 if (doorbell == MAP_FAILED) {
352 int e = errno;
353 ::close(doorbell_fd);
354 throw std::runtime_error("MpscProducer::connect: mmap doorbell failed: " + std::string(std::strerror(e)));
355 }
356
357 // Connect to assigned ring
358 std::string ring_name = name + "_ring_" + std::to_string(producer_id);
359 SpscShm ring = SpscShm::connect(ring_name);
360
361 return MpscProducer(std::move(ring), doorbell_fd, doorbell_len, doorbell, producer_id);
362}
363
364void* MpscProducer::claim(size_t want, uint32_t timeout_ns)
365{
366 return ring_.claim(want, timeout_ns);
367}
368
370{
371 // Publish to ring first
372 ring_.publish(n);
373
374 // Ring doorbell to wake consumer
375 // Always increment seq (for futex synchronization)
377
378 // Conditional wake: Only wake if consumer is blocked on futex
380 futex_wake(reinterpret_cast<volatile uint32_t*>(&doorbell_->seq), 1);
381 }
382}
383
384} // namespace bb::ipc
Multi-producer single-consumer - consumer side.
Definition mpsc_shm.hpp:41
MpscConsumer(MpscConsumer &&other) noexcept
Definition mpsc_shm.cpp:28
int wait_for_data(uint32_t timeout_ns)
Wait for data on any ring.
Definition mpsc_shm.cpp:151
static MpscConsumer create(const std::string &name, size_t num_producers, size_t ring_capacity)
Create MPSC consumer.
Definition mpsc_shm.cpp:78
MpscDoorbell * doorbell_
Definition mpsc_shm.hpp:103
MpscConsumer & operator=(MpscConsumer &&other) noexcept
Definition mpsc_shm.cpp:41
void * peek(size_t ring_idx, size_t want, uint32_t timeout_ns)
Peek data from specific ring.
Definition mpsc_shm.cpp:246
void release(size_t ring_idx, size_t n)
Release data from specific ring.
Definition mpsc_shm.cpp:254
void wakeup_all()
Wake all blocked threads (for graceful shutdown) Wakes consumer blocked on doorbell and all producers...
Definition mpsc_shm.cpp:261
static bool unlink(const std::string &name, size_t num_producers)
Unlink all shared memory for this MPSC system.
Definition mpsc_shm.cpp:138
std::vector< SpscShm > rings_
Definition mpsc_shm.hpp:100
Multi-producer single-consumer - producer side.
Definition mpsc_shm.hpp:113
MpscProducer & operator=(MpscProducer &&other) noexcept
Definition mpsc_shm.cpp:296
MpscProducer(MpscProducer &&other) noexcept
Definition mpsc_shm.cpp:283
static MpscProducer connect(const std::string &name, size_t producer_id)
Connect to MPSC system as a producer.
Definition mpsc_shm.cpp:333
void publish(size_t n)
Publish data to producer's ring (rings doorbell)
Definition mpsc_shm.cpp:369
MpscDoorbell * doorbell_
Definition mpsc_shm.hpp:151
void * claim(size_t want, uint32_t timeout_ns)
Claim space in producer's ring.
Definition mpsc_shm.cpp:364
Lock-free single-producer single-consumer shared memory ring buffer.
Definition spsc_shm.hpp:70
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
Definition spsc_shm.cpp:192
void * claim(size_t want, uint32_t timeout_ns)
Claim contiguous space in the ring buffer (blocks until available)
Definition spsc_shm.cpp:204
void publish(size_t n)
Publish n bytes previously claimed.
Definition spsc_shm.cpp:227
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
Definition spsc_shm.cpp:96
static SpscShm connect(const std::string &name)
Connect to existing SPSC ring buffer.
Definition spsc_shm.cpp:157
Cross-platform futex-like synchronization primitives.
Multi-Producer Single-Consumer via SPSC rings + doorbell futex.
int futex_wake(volatile uint32_t *addr, int n)
Wake waiters blocked on an address.
Definition futex.hpp:98
uint64_t mono_ns_now()
Get current monotonic time in nanoseconds.
Definition utilities.hpp:31
int futex_wait_timeout(volatile uint32_t *addr, uint32_t expect, uint64_t timeout_ns)
Atomic compare-and-wait operation with timeout.
Definition futex.hpp:69
STL namespace.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::string to_string(bb::avm2::ValueTag tag)
Shared doorbell for waking consumer.
Definition mpsc_shm.hpp:26
std::atomic< uint32_t > seq
Definition mpsc_shm.hpp:28
std::atomic< bool > consumer_blocked
Definition mpsc_shm.hpp:32
Common utilities for IPC shared memory implementation.
#define IPC_PAUSE()
Definition utilities.hpp:16