Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
spsc_shm.cpp
Go to the documentation of this file.
1#include "spsc_shm.hpp"
2#include "futex.hpp"
3#include "utilities.hpp"
4#include <atomic>
5#include <cassert>
6#include <cerrno>
7#include <climits>
8#include <cstdint>
9#include <cstring>
10#include <fcntl.h>
11#include <iostream>
12#include <stdexcept>
13#include <string>
14#include <sys/mman.h>
15#include <sys/stat.h>
16#include <unistd.h>
17
18namespace bb::ipc {
19
20namespace {
21
22inline uint64_t pow2_ceil_u64(uint64_t x)
23{
24 if (x < 2) {
25 return 2;
26 }
27 x--;
28 x |= x >> 1;
29 x |= x >> 2;
30 x |= x >> 4;
31 x |= x >> 8;
32 x |= x >> 16;
33 x |= x >> 32;
34 return x + 1;
35}
36
37} // anonymous namespace
38
39// ----- SpscShm Implementation -----
40
41SpscShm::SpscShm(int fd, size_t map_len, SpscCtrl* ctrl, uint8_t* buf)
42 : fd_(fd)
43 , map_len_(map_len)
44 , ctrl_(ctrl)
45 , buf_(buf)
46{}
47
48SpscShm::SpscShm(SpscShm&& other) noexcept
49 : fd_(other.fd_)
50 , map_len_(other.map_len_)
51 , ctrl_(other.ctrl_)
52 , buf_(other.buf_)
53{
54 other.fd_ = -1;
55 other.map_len_ = 0;
56 other.ctrl_ = nullptr;
57 other.buf_ = nullptr;
58}
59
61{
62 if (this != &other) {
63 // Clean up current resources
64 if (ctrl_ != nullptr) {
65 munmap(ctrl_, map_len_);
66 }
67 if (fd_ >= 0) {
68 ::close(fd_);
69 }
70
71 // Move from other
72 fd_ = other.fd_;
73 map_len_ = other.map_len_;
74 ctrl_ = other.ctrl_;
75 buf_ = other.buf_;
76
77 // Clear other
78 other.fd_ = -1;
79 other.map_len_ = 0;
80 other.ctrl_ = nullptr;
81 other.buf_ = nullptr;
82 }
83 return *this;
84}
85
87{
88 if (ctrl_ != nullptr) {
89 munmap(ctrl_, map_len_);
90 }
91 if (fd_ >= 0) {
92 ::close(fd_);
93 }
94}
95
96SpscShm SpscShm::create(const std::string& name, size_t min_capacity)
97{
98 if (name.empty()) {
99 throw std::runtime_error("SpscShm::create: empty name");
100 }
101
102 size_t cap = pow2_ceil_u64(min_capacity);
103 size_t map_len = sizeof(SpscCtrl) + cap;
104
105 int fd = shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0600);
106 if (fd < 0) {
107 std::string error_msg = "SpscShm::create: shm_open failed for '" + name + "': " + std::strerror(errno);
108 if (errno == ENOSPC || errno == ENOMEM) {
109 error_msg += " (likely /dev/shm is full - check df -h /dev/shm)";
110 }
111 throw std::runtime_error(error_msg);
112 }
113
114 if (ftruncate(fd, static_cast<off_t>(map_len)) != 0) {
115 int e = errno;
116 std::string error_msg = "SpscShm::create: ftruncate failed for '" + name +
117 "' (size=" + std::to_string(map_len) + "): " + std::strerror(e);
118 if (e == ENOSPC || e == ENOMEM) {
119 error_msg += " (likely /dev/shm is full - check df -h /dev/shm)";
120 }
121 ::close(fd);
122 shm_unlink(name.c_str());
123 throw std::runtime_error(error_msg);
124 }
125
126 void* mem = mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
127 if (mem == MAP_FAILED) {
128 int e = errno;
129 std::string error_msg = "SpscShm::create: mmap failed for '" + name + "' (size=" + std::to_string(map_len) +
130 "): " + std::strerror(e);
131 if (e == ENOSPC || e == ENOMEM) {
132 error_msg += " (likely /dev/shm is full - check df -h /dev/shm)";
133 }
134 ::close(fd);
135 shm_unlink(name.c_str());
136 throw std::runtime_error(error_msg);
137 }
138
139 std::memset(mem, 0, map_len);
140 auto* ctrl = static_cast<SpscCtrl*>(mem);
141
142 // Initialize non-atomic fields first
143 ctrl->capacity = cap;
144 ctrl->mask = cap - 1;
145 ctrl->wrap_head = UINT64_MAX;
146
147 // Initialize atomics with release ordering to ensure capacity/mask/wrap_head are visible
148 ctrl->head.store(0ULL, std::memory_order_release);
149 ctrl->tail.store(0ULL, std::memory_order_release);
150 ctrl->consumer_blocked.store(false, std::memory_order_release);
151 ctrl->producer_blocked.store(false, std::memory_order_release);
152
153 auto* buf = reinterpret_cast<uint8_t*>(ctrl + 1);
154 return SpscShm(fd, map_len, ctrl, buf);
155}
156
157SpscShm SpscShm::connect(const std::string& name)
158{
159 if (name.empty()) {
160 throw std::runtime_error("SpscShm::connect: empty name");
161 }
162
163 int fd = shm_open(name.c_str(), O_RDWR, 0600);
164 if (fd < 0) {
165 throw std::runtime_error("SpscShm::connect: shm_open failed: " + std::string(std::strerror(errno)));
166 }
167
168 struct stat st;
169 if (fstat(fd, &st) != 0) {
170 int e = errno;
171 ::close(fd);
172 throw std::runtime_error("SpscShm::connect: fstat failed: " + std::string(std::strerror(e)));
173 }
174 size_t map_len = static_cast<size_t>(st.st_size);
175
176 void* mem = mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
177 if (mem == MAP_FAILED) {
178 int e = errno;
179 ::close(fd);
180 throw std::runtime_error("SpscShm::connect: mmap failed: " + std::string(std::strerror(e)));
181 }
182
183 auto* ctrl = static_cast<SpscCtrl*>(mem);
184 auto* buf = reinterpret_cast<uint8_t*>(ctrl + 1);
185
186 // Ensure initialization is visible before use (pairs with release in create)
187 (void)ctrl->head.load(std::memory_order_acquire);
188
189 return SpscShm(fd, map_len, ctrl, buf);
190}
191
192bool SpscShm::unlink(const std::string& name)
193{
194 return shm_unlink(name.c_str()) == 0;
195}
196
197uint64_t SpscShm::available() const
198{
199 uint64_t head = ctrl_->head.load(std::memory_order_acquire);
200 uint64_t tail = ctrl_->tail.load(std::memory_order_acquire);
201 return head - tail;
202}
203
204void* SpscShm::claim(size_t want, uint32_t timeout_ns)
205{
206 // Wait for contiguous space to be available
207 if (!wait_for_space(want, timeout_ns)) {
208 return nullptr; // Timeout
209 }
210
211 uint64_t cap = ctrl_->capacity;
212 uint64_t mask = ctrl_->mask;
213 uint64_t head = ctrl_->head.load(std::memory_order_relaxed);
214 uint64_t pos = head & mask;
215 uint64_t till_end = cap - pos;
216
217 // Check if it fits contiguously without wrapping
218 if (want <= till_end) {
219 // Fits contiguously - no wrap
220 return buf_ + pos;
221 }
222
223 // Needs to wrap
224 return buf_; // Return pointer to beginning of ring
225}
226
227void SpscShm::publish(size_t n)
228{
229 uint64_t head = ctrl_->head.load(std::memory_order_relaxed);
230 uint64_t cap = ctrl_->capacity;
231 uint64_t mask = ctrl_->mask;
232 uint64_t pos = head & mask;
233 uint64_t till_end = cap - pos;
234
235 // Detect if we published wrapped data
236 // If at current head position we can't fit n bytes, it must have wrapped
237 uint64_t total_advance = n;
238 if (n > till_end) {
239 // We wrote at the beginning after wrapping - skip padding and our data
240 total_advance += till_end;
241 ctrl_->wrap_head = head;
242 }
243
244 // Advance head atomically with release - synchronizes wrap_head write
245 ctrl_->head.store(head + total_advance, std::memory_order_release);
246
248 // Ensure that head update is visible before waking consumer.
250 futex_wake(reinterpret_cast<volatile uint32_t*>(&ctrl_->head), 1);
251 }
252}
253
254void* SpscShm::peek(size_t want, uint32_t timeout_ns)
255{
256 // Wait for contiguous data to be available
257 if (!wait_for_data(want, timeout_ns)) {
258 return nullptr; // Timeout
259 }
260
261 // Read head with acquire to synchronize wrap_head
263
264 uint64_t tail = ctrl_->tail.load(std::memory_order_relaxed);
265
266 // Check if we're at the position where a message wrapped
267 // If tail == wrap_head, the message starts at position 0
268 if (tail == ctrl_->wrap_head) {
269 return buf_;
270 }
271
272 uint64_t cap = ctrl_->capacity;
273 uint64_t mask = ctrl_->mask;
274 uint64_t pos = tail & mask;
275 [[maybe_unused]] uint64_t till_end = cap - pos;
276
277 // At this point wait_for_data() has guaranteed contiguity from tail
278 // (or we would have wrapped via wrap_head), so want must fit here.
279 assert(want <= till_end);
280
281 // Data fits contiguously at current position
282 return buf_ + pos;
283}
284
285void SpscShm::release(size_t n)
286{
287 uint64_t tail = ctrl_->tail.load(std::memory_order_relaxed);
288 uint64_t cap = ctrl_->capacity;
289 uint64_t mask = ctrl_->mask;
290 uint64_t pos = tail & mask;
291 uint64_t till_end = cap - pos;
292
293 uint64_t total_release = 0;
294 if (tail == ctrl_->wrap_head) {
295 // We're releasing data from a wrapped message - skip padding
296 total_release = till_end + n;
297 } else {
298 assert(n <= till_end);
299 // Normal case: data was contiguous
300 total_release = n;
301 }
302
303 uint64_t new_tail = tail + total_release;
304 ctrl_->tail.store(new_tail, std::memory_order_release);
305
307 // Ensure that tail update is visible before waking producer.
309 futex_wake(reinterpret_cast<volatile uint32_t*>(&ctrl_->tail), 1);
310 }
311}
312
313bool SpscShm::wait_for_data(size_t need, uint32_t timeout_ns)
314{
315 uint64_t cap = ctrl_->capacity;
316 uint64_t mask = ctrl_->mask;
317
318 // Check if we need contiguous data that would wrap
319 auto check_available = [this, cap, mask, need]() -> bool {
320 uint64_t head = ctrl_->head.load(std::memory_order_acquire);
321 uint64_t tail = ctrl_->tail.load(std::memory_order_relaxed);
322 uint64_t avail = head - tail;
323
324 if (avail < need) {
325 return false; // Not enough total data
326 }
327
328 // Check if data is contiguous
329 uint64_t pos = tail & mask;
330 uint64_t till_end = cap - pos;
331
332 if (need <= till_end) {
333 return true; // Fits contiguously
334 }
335
336 // Would wrap - need padding + actual data available
337 return avail >= (till_end + need);
338 };
339
340 if (check_available()) {
341 previous_had_data_ = true; // Found data - enable spinning on next call
342 return true;
343 }
344
345 // Adaptive spinning: only spin if previous call found data
346 constexpr uint64_t SPIN_NS = 100000; // 100us
347 uint64_t spin_duration;
348 uint64_t remaining_timeout;
349
350 if (previous_had_data_) {
351 // Previous call found data - do full spin (optimistic)
352 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
353 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
354 } else {
355 // Previous call timed out - skip spinning (idle channel)
356 spin_duration = 0;
357 remaining_timeout = timeout_ns;
358 }
359
360 // Spin phase (only if previous call found data)
361 if (spin_duration > 0) {
362 uint64_t start = mono_ns_now();
363 constexpr uint32_t TIME_CHECK_INTERVAL = 256; // Check time every 256 iterations
364 uint32_t iterations = 0;
365
366 // NOLINTNEXTLINE(cppcoreguidelines-avoid-do-while)
367 do {
368 if (check_available()) {
369 previous_had_data_ = true; // Found data during spin
370 return true;
371 }
372 IPC_PAUSE();
373
374 // Only check time periodically to avoid syscall overhead
375 iterations++;
376 if (iterations >= TIME_CHECK_INTERVAL) {
377 if ((mono_ns_now() - start) >= spin_duration) {
378 break;
379 }
380 iterations = 0;
381 }
382 } while (true);
383
384 // Check after spin
385 if (check_available()) {
386 previous_had_data_ = true; // Found data after spin
387 return true;
388 }
389 }
390
391 // No more time or didn't spin - check if we can block
392 if (remaining_timeout == 0) {
393 previous_had_data_ = false; // Timeout - disable spinning on next call
394 return false;
395 }
396
397 // About to block - load seq, final check, then block
398 uint32_t head_now = static_cast<uint32_t>(ctrl_->head.load(std::memory_order_acquire));
399
401
402 if (check_available()) {
404 previous_had_data_ = true; // Found data before blocking
405 return true;
406 }
407
408 // Wait on futex for producer to signal new data
409 futex_wait_timeout(reinterpret_cast<volatile uint32_t*>(&ctrl_->head), head_now, remaining_timeout);
411
412 bool result = check_available();
413 previous_had_data_ = result; // Update flag based on final result
414 return result;
415}
416
417bool SpscShm::wait_for_space(size_t need, uint32_t timeout_ns)
418{
419 uint64_t cap = ctrl_->capacity;
420 uint64_t mask = ctrl_->mask;
421
422 // Check if we need contiguous space that would wrap
423 auto check_space = [this, cap, mask, need]() -> bool {
424 uint64_t head = ctrl_->head.load(std::memory_order_relaxed);
425 uint64_t tail = ctrl_->tail.load(std::memory_order_acquire);
426 uint64_t freeb = cap - (head - tail);
427
428 // std::cerr << "Checking space: head=" << head << " tail=" << tail << " free=" << freeb << " need=" << need
429 // << "\n";
430 if (freeb < need) {
431 return false; // Not enough total free space
432 }
433
434 // Check if space is contiguous
435 uint64_t pos = head & mask;
436 uint64_t till_end = cap - pos;
437
438 if (need <= till_end) {
439 return true; // Fits contiguously
440 }
441
442 // Would wrap - just check if we have enough total space
443 // If we have till_end + need bytes free, the ring buffer invariant
444 // guarantees the beginning is available for writing
445 return freeb >= (till_end + need);
446 };
447
448 if (check_space()) {
449 previous_had_space_ = true; // Found space - enable spinning on next call
450 return true;
451 }
452
453 // Adaptive spinning: only spin if previous call found space
454 constexpr uint64_t SPIN_NS = 100000; // 100us
455 uint64_t spin_duration = 0;
456 uint64_t remaining_timeout = timeout_ns;
457
459 // Previous call found space - do full spin (optimistic)
460 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
461 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
462 }
463
464 // Spin phase (only if previous call found space)
465 if (spin_duration > 0) {
466 uint64_t start = mono_ns_now();
467 constexpr uint32_t TIME_CHECK_INTERVAL = 256; // Check time every 256 iterations
468 uint32_t iterations = 0;
469
470 // NOLINTNEXTLINE(cppcoreguidelines-avoid-do-while)
471 do {
472 if (check_space()) {
473 previous_had_space_ = true; // Found space during spin
474 return true;
475 }
476 IPC_PAUSE();
477
478 // Only check time periodically to avoid syscall overhead
479 iterations++;
480 if (iterations >= TIME_CHECK_INTERVAL) {
481 if ((mono_ns_now() - start) >= spin_duration) {
482 break;
483 }
484 iterations = 0;
485 }
486 } while (true);
487
488 // Check after spin
489 if (check_space()) {
490 previous_had_space_ = true; // Found space after spin
491 return true;
492 }
493 }
494
495 // No more time or didn't spin - check if we can block
496 if (remaining_timeout == 0) {
497 previous_had_space_ = false; // Timeout - disable spinning on next call
498 return false;
499 }
500
501 // About to block - load seq, final check, then block
502 uint32_t tail_now = static_cast<uint32_t>(ctrl_->tail.load(std::memory_order_acquire));
503
504 // Wait on futex for consumer to signal freed space
506
507 if (check_space()) {
509 previous_had_space_ = true; // Found space before blocking
510 return true;
511 }
512
513 futex_wait_timeout(reinterpret_cast<volatile uint32_t*>(&ctrl_->tail), tail_now, remaining_timeout);
515
516 bool result = check_space();
517 previous_had_space_ = result; // Update flag based on final result
518 return result;
519}
520
522{
523 futex_wake(reinterpret_cast<volatile uint32_t*>(&ctrl_->head), INT_MAX);
524 futex_wake(reinterpret_cast<volatile uint32_t*>(&ctrl_->tail), INT_MAX);
525}
526
527void SpscShm::debug_dump(const char* prefix) const
528{
529 uint64_t head = ctrl_->head.load(std::memory_order_acquire);
530 uint64_t tail = ctrl_->tail.load(std::memory_order_acquire);
531 uint64_t cap = ctrl_->capacity;
532 uint64_t mask = ctrl_->mask;
533 uint64_t wrap_head = ctrl_->wrap_head;
534
535 uint64_t head_pos = head & mask;
536 uint64_t tail_pos = tail & mask;
537 uint64_t used = head - tail;
538 uint64_t free = cap - used;
539
540 std::cerr << "[" << prefix << "] head=" << head << " tail=" << tail << " | head_pos=" << head_pos
541 << " tail_pos=" << tail_pos << " | used=" << used << " free=" << free << " cap=" << cap
542 << " | wrap_head=" << (wrap_head == UINT64_MAX ? "NONE" : std::to_string(wrap_head)) << '\n';
543}
544
545} // namespace bb::ipc
Lock-free single-producer single-consumer shared memory ring buffer.
Definition spsc_shm.hpp:70
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
Definition spsc_shm.cpp:192
void * peek(size_t want, uint32_t timeout_ns)
Peek contiguous readable region (blocks until available)
Definition spsc_shm.cpp:254
void wakeup_all()
Wake all blocked threads (for graceful shutdown)
Definition spsc_shm.cpp:521
SpscShm(SpscShm &&other) noexcept
Definition spsc_shm.cpp:48
uint64_t available() const
Definition spsc_shm.cpp:197
void release(size_t n)
Release n bytes previously peeked.
Definition spsc_shm.cpp:285
void * claim(size_t want, uint32_t timeout_ns)
Claim contiguous space in the ring buffer (blocks until available)
Definition spsc_shm.cpp:204
bool wait_for_data(size_t need, uint32_t spin_ns)
Definition spsc_shm.cpp:313
void publish(size_t n)
Publish n bytes previously claimed.
Definition spsc_shm.cpp:227
bool wait_for_space(size_t need, uint32_t spin_ns)
Definition spsc_shm.cpp:417
SpscShm & operator=(SpscShm &&other) noexcept
Definition spsc_shm.cpp:60
void debug_dump(const char *prefix) const
Dump internal ring buffer state for debugging.
Definition spsc_shm.cpp:527
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
Definition spsc_shm.cpp:96
SpscCtrl * ctrl_
Definition spsc_shm.hpp:174
static SpscShm connect(const std::string &name)
Connect to existing SPSC ring buffer.
Definition spsc_shm.cpp:157
MemoryStore mem
uint8_t const * buf
Definition data_store.hpp:9
Cross-platform futex-like synchronization primitives.
int futex_wake(volatile uint32_t *addr, int n)
Wake waiters blocked on an address.
Definition futex.hpp:98
uint64_t mono_ns_now()
Get current monotonic time in nanoseconds.
Definition utilities.hpp:31
int futex_wait_timeout(volatile uint32_t *addr, uint32_t expect, uint64_t timeout_ns)
Atomic compare-and-wait operation with timeout.
Definition futex.hpp:69
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::string to_string(bb::avm2::ValueTag tag)
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)
Control structure for SPSC ring buffer.
Definition spsc_shm.hpp:28
std::atomic< uint64_t > tail
Definition spsc_shm.hpp:36
std::atomic< bool > consumer_blocked
Definition spsc_shm.hpp:37
uint64_t capacity
Definition spsc_shm.hpp:41
std::atomic< uint64_t > head
Definition spsc_shm.hpp:30
std::atomic< bool > producer_blocked
Definition spsc_shm.hpp:32
uint64_t wrap_head
Definition spsc_shm.hpp:31
Common utilities for IPC shared memory implementation.
#define IPC_PAUSE()
Definition utilities.hpp:16