22inline uint64_t pow2_ceil_u64(uint64_t x)
50 , map_len_(other.map_len_)
56 other.ctrl_ =
nullptr;
64 if (ctrl_ !=
nullptr) {
65 munmap(ctrl_, map_len_);
73 map_len_ = other.map_len_;
80 other.ctrl_ =
nullptr;
88 if (
ctrl_ !=
nullptr) {
99 throw std::runtime_error(
"SpscShm::create: empty name");
102 size_t cap = pow2_ceil_u64(min_capacity);
103 size_t map_len =
sizeof(
SpscCtrl) + cap;
105 int fd = shm_open(name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0600);
107 std::string error_msg =
"SpscShm::create: shm_open failed for '" + name +
"': " +
std::strerror(errno);
108 if (errno == ENOSPC || errno == ENOMEM) {
109 error_msg +=
" (likely /dev/shm is full - check df -h /dev/shm)";
111 throw std::runtime_error(error_msg);
114 if (ftruncate(fd,
static_cast<off_t
>(map_len)) != 0) {
116 std::string error_msg =
"SpscShm::create: ftruncate failed for '" + name +
118 if (e == ENOSPC || e == ENOMEM) {
119 error_msg +=
" (likely /dev/shm is full - check df -h /dev/shm)";
122 shm_unlink(name.c_str());
123 throw std::runtime_error(error_msg);
126 void*
mem = mmap(
nullptr, map_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
127 if (
mem == MAP_FAILED) {
129 std::string error_msg =
"SpscShm::create: mmap failed for '" + name +
"' (size=" +
std::to_string(map_len) +
131 if (e == ENOSPC || e == ENOMEM) {
132 error_msg +=
" (likely /dev/shm is full - check df -h /dev/shm)";
135 shm_unlink(name.c_str());
136 throw std::runtime_error(error_msg);
144 ctrl->mask = cap - 1;
145 ctrl->wrap_head = UINT64_MAX;
153 auto*
buf =
reinterpret_cast<uint8_t*
>(ctrl + 1);
160 throw std::runtime_error(
"SpscShm::connect: empty name");
163 int fd = shm_open(name.c_str(), O_RDWR, 0600);
165 throw std::runtime_error(
"SpscShm::connect: shm_open failed: " + std::string(
std::strerror(errno)));
169 if (fstat(fd, &st) != 0) {
172 throw std::runtime_error(
"SpscShm::connect: fstat failed: " + std::string(
std::strerror(e)));
174 size_t map_len =
static_cast<size_t>(st.st_size);
176 void*
mem = mmap(
nullptr, map_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
177 if (
mem == MAP_FAILED) {
180 throw std::runtime_error(
"SpscShm::connect: mmap failed: " + std::string(
std::strerror(e)));
184 auto*
buf =
reinterpret_cast<uint8_t*
>(ctrl + 1);
194 return shm_unlink(name.c_str()) == 0;
214 uint64_t pos = head & mask;
215 uint64_t till_end = cap - pos;
218 if (want <= till_end) {
232 uint64_t pos = head & mask;
233 uint64_t till_end = cap - pos;
237 uint64_t total_advance = n;
240 total_advance += till_end;
274 uint64_t pos = tail & mask;
275 [[maybe_unused]] uint64_t till_end = cap - pos;
279 assert(want <= till_end);
290 uint64_t pos = tail & mask;
291 uint64_t till_end = cap - pos;
293 uint64_t total_release = 0;
296 total_release = till_end + n;
298 assert(n <= till_end);
303 uint64_t new_tail = tail + total_release;
319 auto check_available = [
this, cap, mask, need]() ->
bool {
322 uint64_t avail = head - tail;
329 uint64_t pos = tail & mask;
330 uint64_t till_end = cap - pos;
332 if (need <= till_end) {
337 return avail >= (till_end + need);
340 if (check_available()) {
346 constexpr uint64_t SPIN_NS = 100000;
347 uint64_t spin_duration;
348 uint64_t remaining_timeout;
352 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
353 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
357 remaining_timeout = timeout_ns;
361 if (spin_duration > 0) {
363 constexpr uint32_t TIME_CHECK_INTERVAL = 256;
364 uint32_t iterations = 0;
368 if (check_available()) {
376 if (iterations >= TIME_CHECK_INTERVAL) {
385 if (check_available()) {
392 if (remaining_timeout == 0) {
402 if (check_available()) {
412 bool result = check_available();
423 auto check_space = [
this, cap, mask, need]() ->
bool {
426 uint64_t freeb = cap - (head - tail);
435 uint64_t pos = head & mask;
436 uint64_t till_end = cap - pos;
438 if (need <= till_end) {
445 return freeb >= (till_end + need);
454 constexpr uint64_t SPIN_NS = 100000;
455 uint64_t spin_duration = 0;
456 uint64_t remaining_timeout = timeout_ns;
460 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
461 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
465 if (spin_duration > 0) {
467 constexpr uint32_t TIME_CHECK_INTERVAL = 256;
468 uint32_t iterations = 0;
480 if (iterations >= TIME_CHECK_INTERVAL) {
496 if (remaining_timeout == 0) {
516 bool result = check_space();
535 uint64_t head_pos = head & mask;
536 uint64_t tail_pos = tail & mask;
537 uint64_t used = head - tail;
538 uint64_t free = cap - used;
540 std::cerr <<
"[" << prefix <<
"] head=" << head <<
" tail=" << tail <<
" | head_pos=" << head_pos
541 <<
" tail_pos=" << tail_pos <<
" | used=" << used <<
" free=" << free <<
" cap=" << cap
542 <<
" | wrap_head=" << (wrap_head == UINT64_MAX ?
"NONE" :
std::to_string(wrap_head)) <<
'\n';
Lock-free single-producer single-consumer shared memory ring buffer.
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
void * peek(size_t want, uint32_t timeout_ns)
Peek contiguous readable region (blocks until available)
void wakeup_all()
Wake all blocked threads (for graceful shutdown)
SpscShm(SpscShm &&other) noexcept
uint64_t available() const
void release(size_t n)
Release n bytes previously peeked.
void * claim(size_t want, uint32_t timeout_ns)
Claim contiguous space in the ring buffer (blocks until available)
bool wait_for_data(size_t need, uint32_t spin_ns)
void publish(size_t n)
Publish n bytes previously claimed.
bool wait_for_space(size_t need, uint32_t spin_ns)
SpscShm & operator=(SpscShm &&other) noexcept
void debug_dump(const char *prefix) const
Dump internal ring buffer state for debugging.
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
static SpscShm connect(const std::string &name)
Connect to existing SPSC ring buffer.
Cross-platform futex-like synchronization primitives.
int futex_wake(volatile uint32_t *addr, int n)
Wake waiters blocked on an address.
uint64_t mono_ns_now()
Get current monotonic time in nanoseconds.
int futex_wait_timeout(volatile uint32_t *addr, uint32_t expect, uint64_t timeout_ns)
Atomic compare-and-wait operation with timeout.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
std::string to_string(bb::avm2::ValueTag tag)
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)
Control structure for SPSC ring buffer.
std::atomic< uint64_t > tail
std::atomic< bool > consumer_blocked
std::atomic< uint64_t > head
std::atomic< bool > producer_blocked
Common utilities for IPC shared memory implementation.