Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
shm.test.cpp
Go to the documentation of this file.
6#include <array>
7#include <atomic>
8#include <chrono>
9#include <csignal>
10#include <cstddef>
11#include <cstring>
12#include <functional>
13#include <gtest/gtest.h>
14#include <iostream>
15#include <random>
16#include <sstream>
17#include <thread>
18#include <unistd.h>
19#include <vector>
20
21using namespace bb::ipc;
22
23namespace {
24
28TEST(ShmTest, SingleClientSmallRingHighVolume)
29{
30 constexpr size_t RING_SIZE = 2UL * 1024;
31 constexpr size_t NUM_ITERATIONS = 10000000;
32 // Sizing ensures that no matter that state of the internal ring buffer, we can't deadlock.
33 constexpr size_t MAX_MSG_SIZE = (RING_SIZE / 2) - 4;
34
35 // Use short name for macOS compatibility (31-char limit)
36 std::string wrap_test_shm = "shm_wrap_" + std::to_string(getpid());
37 auto server = IpcServer::create_shm(wrap_test_shm, RING_SIZE, RING_SIZE);
38 ASSERT_TRUE(server->listen()) << "Wrap test server failed to listen";
39
40 std::atomic<bool> server_running{ true };
41 std::atomic<size_t> corruptions{ 0 };
42
43 // Echo server with validation
44 std::thread server_thread([&]() {
45 size_t iter = 0;
46 while (server_running.load(std::memory_order_acquire)) {
47 server->accept();
48
49 int client_id = server->wait_for_data(10000000); // 10ms
50 if (client_id < 0) {
51 continue;
52 }
53
54 auto request_buf = server->receive(client_id);
55 // std::cerr << "Server received " << request.size() << " bytes" << '\n';
56
57 if (request_buf.empty()) {
58 continue;
59 }
60
61 // Take a copy of the request so we can release.
62 std::vector<uint8_t> request(request_buf.begin(), request_buf.end());
63 server->release(client_id, request.size());
64
65 // Validate pattern: first byte should be XOR with offsets
66 // Check a few bytes to detect corruption without slowing down too much
67 if (request.size() > 0) {
68 uint8_t first = request[0];
69 for (size_t i = 0; i < std::min(request.size(), size_t(16)); i++) {
70 uint8_t expected = static_cast<uint8_t>((first ^ i) & 0xFF);
71 if (request[i] != expected) {
72 corruptions.fetch_add(1);
73 std::cerr << "Pattern mismatch at offset " << i << ": expected=" << (int)expected
74 << " actual=" << (int)request[i] << '\n';
75 break;
76 }
77 }
78 }
79
80 // Retry send until success.
81 while (!server->send(client_id, request.data(), request.size())) {
82 // Timeout - retry (response ring might be full)
83 std::cerr << iter << " Server send size " << request.size() << " timeout, retrying..." << '\n';
84 dynamic_cast<ShmServer*>(server.get())->debug_dump();
85 }
86 // std::cerr << "Server sent response of " << request.size() << " bytes" << '\n';
87 iter++;
88 }
89 });
90
91 std::this_thread::sleep_for(std::chrono::milliseconds(300));
92
93 auto client = IpcClient::create_shm(wrap_test_shm);
94 ASSERT_TRUE(client->connect());
95
96 // Random message sizes.
98 std::mt19937 gen(rd());
99 std::uniform_int_distribution<size_t> size_dist(1, MAX_MSG_SIZE);
100
101 // Store sizes for each iteration so receiver knows what to expect
102 std::vector<size_t> iteration_sizes(NUM_ITERATIONS);
103 for (size_t i = 0; i < NUM_ITERATIONS; i++) {
104 iteration_sizes[i] = size_dist(gen);
105 // iteration_sizes[i] = MAX_MSG_SIZE - 1;
106 }
107
108 // Sender thread: continuously send requests
109 std::thread sender_thread([&]() {
110 std::vector<uint8_t> send_buffer(MAX_MSG_SIZE);
111
112 for (size_t iter = 0; iter < NUM_ITERATIONS; iter++) {
113 size_t size = iteration_sizes[iter];
114 // std::cerr << "Client: Iteration " << iter << ": sending " << size << " bytes" << '\n';
115
116 // Fill buffer with iteration-specific pattern
117 // First byte is iteration number (mod 256), rest is XOR pattern with offset
118 uint8_t iter_byte = static_cast<uint8_t>(iter & 0xFF);
119 for (size_t i = 0; i < size; i++) {
120 send_buffer[i] = static_cast<uint8_t>((iter_byte ^ i) & 0xFF);
121 }
122
123 // Retry send until success - timeouts are expected under extreme load
124 while (!client->send(send_buffer.data(), size, 100000000)) {
125 // Timeout - retry (ring might be full, server might be slow)
126 std::cerr << iter << " Client send size " << size << " timeout, retrying..." << '\n';
127 dynamic_cast<ShmClient*>(client.get())->debug_dump();
128 }
129 }
130 });
131
132 // Receiver thread: continuously receive and validate responses
133 std::thread receiver_thread([&]() {
134 for (size_t iter = 0; iter < NUM_ITERATIONS; iter++) {
135 size_t expected_size = iteration_sizes[iter];
136
137 // Retry recv until success - timeouts are expected under extreme load
139 while ((response = client->receive(100000000)).empty()) {
140 std::cerr << iter << " Client receive timeout, retrying..." << '\n';
141 // Timeout - retry
142 }
143 // std::cerr << "Client received response of " << response.size() << " bytes" << '\n';
144
145 ASSERT_EQ(response.size(), expected_size) << "Size mismatch at iteration " << iter;
146
147 // Validate entire response - check iteration byte and pattern
148 uint8_t iter_byte = static_cast<uint8_t>(iter & 0xFF);
149 if (response.size() > 0) {
150 ASSERT_EQ(response[0], iter_byte) << "Iteration byte mismatch at iteration " << iter;
151 for (size_t i = 0; i < response.size(); i++) {
152 uint8_t expected = static_cast<uint8_t>((iter_byte ^ i) & 0xFF);
153 if (response[i] != expected) {
154 FAIL() << "Data corruption at iteration " << iter << " offset " << i
155 << ": expected=" << (int)expected << " actual=" << (int)response[i];
156 }
157 }
158 }
159
160 client->release(response.size());
161 }
162 });
163
164 sender_thread.join();
165 receiver_thread.join();
166
167 client->close();
168
169 server_running.store(false);
170 server->request_shutdown();
171 server_thread.join();
172 server->close();
173
174 EXPECT_EQ(corruptions.load(), 0) << "Corruptions detected in single-threaded wrap test";
175}
176
182// TEST(ShmTest, DeadlockReproduction)
183// {
184// constexpr size_t RING_SIZE = 8UL * 1024; // 8KB rings
185// // Max message size is half capacity minus 4 bytes (length prefix)
186// constexpr size_t MAX_MSG_SIZE = RING_SIZE / 2 - 4;
187
188// std::string test_shm = "shm_deadlock_" + std::to_string(getpid());
189// auto server = IpcServer::create_shm(test_shm, RING_SIZE, RING_SIZE);
190// ASSERT_TRUE(server->listen()) << "Deadlock test server failed to listen";
191
192// auto client = IpcClient::create_shm(test_shm);
193// ASSERT_TRUE(client->connect());
194
195// #define snd(s) \
196// { \
197// ASSERT_TRUE(client->send(std::vector<uint8_t>(s, 0).data(), s, 0)); \
198// dynamic_cast<ShmClient*>(client.get())->debug_dump(); \
199// }
200// #define rcv() \
201// { \
202// auto request = server->receive(0); \
203// ASSERT_FALSE(request.empty()); \
204// server->release(0, request.size()); \
205// dynamic_cast<ShmServer*>(server.get())->debug_dump(); \
206// }
207
208// snd(MAX_MSG_SIZE - 1);
209// snd(MAX_MSG_SIZE);
210// rcv();
211// rcv();
212// snd(MAX_MSG_SIZE);
213
214// client->close();
215// server->close();
216// } // namespace
217
218} // namespace
TEST(acir_formal_proofs, uint_terms_add)
Tests 128-bit unsigned addition Verifies that the ACIR implementation of addition is correct Executio...
static std::unique_ptr< IpcClient > create_shm(const std::string &base_name)
static std::unique_ptr< IpcServer > create_shm(const std::string &base_name, size_t request_ring_size=static_cast< size_t >(1024 *1024), size_t response_ring_size=static_cast< size_t >(1024 *1024))
IPC client implementation using shared memory.
IPC server implementation using shared memory.
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::string to_string(bb::avm2::ValueTag tag)
Single-producer/single-consumer shared-memory ring buffer (Linux, x86-64 optimized)