22 : rings_(
std::move(rings))
23 , doorbell_fd_(doorbell_fd)
24 , doorbell_len_(doorbell_len)
30 , doorbell_fd_(other.doorbell_fd_)
31 , doorbell_len_(other.doorbell_len_)
32 , doorbell_(other.doorbell_)
33 , last_served_(other.last_served_)
35 other.doorbell_fd_ = -1;
36 other.doorbell_len_ = 0;
37 other.doorbell_ =
nullptr;
38 other.last_served_ = 0;
45 if (doorbell_ !=
nullptr) {
46 munmap(doorbell_, doorbell_len_);
48 if (doorbell_fd_ >= 0) {
49 ::close(doorbell_fd_);
54 doorbell_fd_ = other.doorbell_fd_;
55 doorbell_len_ = other.doorbell_len_;
56 doorbell_ = other.doorbell_;
57 last_served_ = other.last_served_;
60 other.doorbell_fd_ = -1;
61 other.doorbell_len_ = 0;
62 other.doorbell_ =
nullptr;
63 other.last_served_ = 0;
80 if (name.empty() || num_producers == 0) {
81 throw std::runtime_error(
"MpscConsumer::create: invalid arguments");
85 std::string doorbell_name = name +
"_doorbell";
88 int doorbell_fd = shm_open(doorbell_name.c_str(), O_RDWR | O_CREAT | O_EXCL, 0600);
89 if (doorbell_fd < 0) {
90 throw std::runtime_error(
"MpscConsumer::create: shm_open doorbell failed: " +
94 if (ftruncate(doorbell_fd,
static_cast<off_t
>(doorbell_len)) != 0) {
97 shm_unlink(doorbell_name.c_str());
98 throw std::runtime_error(
"MpscConsumer::create: ftruncate doorbell failed: " + std::string(
std::strerror(e)));
102 static_cast<MpscDoorbell*
>(mmap(
nullptr, doorbell_len, PROT_READ | PROT_WRITE, MAP_SHARED, doorbell_fd, 0));
103 if (doorbell == MAP_FAILED) {
105 ::close(doorbell_fd);
106 shm_unlink(doorbell_name.c_str());
107 throw std::runtime_error(
"MpscConsumer::create: mmap doorbell failed: " + std::string(
std::strerror(e)));
116 rings.reserve(num_producers);
119 for (
size_t i = 0; i < num_producers; i++) {
125 for (
size_t i = 0; i < rings.size(); i++) {
129 munmap(doorbell, doorbell_len);
130 ::close(doorbell_fd);
131 shm_unlink(doorbell_name.c_str());
140 std::string doorbell_name = name +
"_doorbell";
141 shm_unlink(doorbell_name.c_str());
143 for (
size_t i = 0; i < num_producers; i++) {
153 size_t num_rings =
rings_.size();
156 for (
size_t i = 0; i < num_rings; i++) {
158 if (
rings_[idx].available() > 0) {
161 return static_cast<int>(idx);
166 constexpr uint64_t SPIN_NS = 100000;
167 uint64_t spin_duration;
168 uint64_t remaining_timeout;
172 spin_duration = (timeout_ns < SPIN_NS) ? timeout_ns : SPIN_NS;
173 remaining_timeout = (timeout_ns > SPIN_NS) ? (timeout_ns - SPIN_NS) : 0;
177 remaining_timeout = timeout_ns;
181 if (spin_duration > 0) {
185 for (
size_t i = 0; i < num_rings; i++) {
187 if (
rings_[idx].available() > 0) {
190 return static_cast<int>(idx);
197 for (
size_t i = 0; i < num_rings; i++) {
199 if (
rings_[idx].available() > 0) {
202 return static_cast<int>(idx);
208 if (remaining_timeout == 0) {
217 for (
size_t i = 0; i < num_rings; i++) {
219 if (
rings_[idx].available() > 0) {
222 return static_cast<int>(idx);
233 for (
size_t i = 0; i < num_rings; i++) {
235 if (
rings_[idx].available() > 0) {
238 return static_cast<int>(idx);
248 if (ring_idx >=
rings_.size()) {
251 return rings_[ring_idx].peek(want, timeout_ns);
256 if (ring_idx <
rings_.size()) {
257 rings_[ring_idx].release(n);
267 for (
auto& ring :
rings_) {
275 SpscShm&& ring,
int doorbell_fd,
size_t doorbell_len,
MpscDoorbell* doorbell,
size_t producer_id)
276 : ring_(
std::move(ring))
277 , doorbell_fd_(doorbell_fd)
278 , doorbell_len_(doorbell_len)
279 , doorbell_(doorbell)
280 , producer_id_(producer_id)
285 , doorbell_fd_(other.doorbell_fd_)
286 , doorbell_len_(other.doorbell_len_)
287 , doorbell_(other.doorbell_)
288 , producer_id_(other.producer_id_)
290 other.doorbell_fd_ = -1;
291 other.doorbell_len_ = 0;
292 other.doorbell_ =
nullptr;
293 other.producer_id_ = 0;
298 if (
this != &other) {
300 if (doorbell_ !=
nullptr) {
301 munmap(doorbell_, doorbell_len_);
303 if (doorbell_fd_ >= 0) {
304 ::close(doorbell_fd_);
309 doorbell_fd_ = other.doorbell_fd_;
310 doorbell_len_ = other.doorbell_len_;
311 doorbell_ = other.doorbell_;
312 producer_id_ = other.producer_id_;
315 other.doorbell_fd_ = -1;
316 other.doorbell_len_ = 0;
317 other.doorbell_ =
nullptr;
318 other.producer_id_ = 0;
336 throw std::runtime_error(
"MpscProducer::connect: empty name");
340 std::string doorbell_name = name +
"_doorbell";
343 int doorbell_fd = shm_open(doorbell_name.c_str(), O_RDWR, 0600);
344 if (doorbell_fd < 0) {
345 throw std::runtime_error(
"MpscProducer::connect: shm_open doorbell failed: " +
350 static_cast<MpscDoorbell*
>(mmap(
nullptr, doorbell_len, PROT_READ | PROT_WRITE, MAP_SHARED, doorbell_fd, 0));
351 if (doorbell == MAP_FAILED) {
353 ::close(doorbell_fd);
354 throw std::runtime_error(
"MpscProducer::connect: mmap doorbell failed: " + std::string(
std::strerror(e)));
358 std::string ring_name = name +
"_ring_" +
std::to_string(producer_id);
Multi-producer single-consumer - consumer side.
MpscConsumer(MpscConsumer &&other) noexcept
int wait_for_data(uint32_t timeout_ns)
Wait for data on any ring.
static MpscConsumer create(const std::string &name, size_t num_producers, size_t ring_capacity)
Create MPSC consumer.
MpscConsumer & operator=(MpscConsumer &&other) noexcept
void * peek(size_t ring_idx, size_t want, uint32_t timeout_ns)
Peek data from specific ring.
void release(size_t ring_idx, size_t n)
Release data from specific ring.
void wakeup_all()
Wake all blocked threads (for graceful shutdown) Wakes consumer blocked on doorbell and all producers...
static bool unlink(const std::string &name, size_t num_producers)
Unlink all shared memory for this MPSC system.
std::vector< SpscShm > rings_
Multi-producer single-consumer - producer side.
MpscProducer & operator=(MpscProducer &&other) noexcept
MpscProducer(MpscProducer &&other) noexcept
static MpscProducer connect(const std::string &name, size_t producer_id)
Connect to MPSC system as a producer.
void publish(size_t n)
Publish data to producer's ring (rings doorbell)
void * claim(size_t want, uint32_t timeout_ns)
Claim space in producer's ring.
Lock-free single-producer single-consumer shared memory ring buffer.
static bool unlink(const std::string &name)
Unlink shared memory object (cleanup after close)
void * claim(size_t want, uint32_t timeout_ns)
Claim contiguous space in the ring buffer (blocks until available)
void publish(size_t n)
Publish n bytes previously claimed.
static SpscShm create(const std::string &name, size_t min_capacity)
Create a new SPSC ring buffer.
static SpscShm connect(const std::string &name)
Connect to existing SPSC ring buffer.
Cross-platform futex-like synchronization primitives.
Multi-Producer Single-Consumer via SPSC rings + doorbell futex.
int futex_wake(volatile uint32_t *addr, int n)
Wake waiters blocked on an address.
uint64_t mono_ns_now()
Get current monotonic time in nanoseconds.
int futex_wait_timeout(volatile uint32_t *addr, uint32_t expect, uint64_t timeout_ns)
Atomic compare-and-wait operation with timeout.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
std::string to_string(bb::avm2::ValueTag tag)
Shared doorbell for waking consumer.
std::atomic< uint32_t > seq
std::atomic< bool > consumer_blocked
Common utilities for IPC shared memory implementation.