Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
msgpack_client_async.cpp
Go to the documentation of this file.
3#include "napi.h"
4#include <cstdint>
5#include <vector>
6
7using namespace bb::nodejs::msgpack_client;
8
10 : ObjectWrap(info)
11{
12 Napi::Env env = info.Env();
13
14 // Arg 0: shared memory base name (string)
15 if (info.Length() < 1 || !info[0].IsString()) {
16 throw Napi::TypeError::New(env, "First argument must be a string (shared memory name)");
17 }
18 std::string shm_name = info[0].As<Napi::String>();
19
20 // Create shared memory client (SPSC-only, no max_clients needed)
22
23 // Connect to bb server
24 if (!client_->connect()) {
25 throw Napi::Error::New(env, "Failed to connect to shared memory server");
26 }
27}
28
29Napi::Value MsgpackClientAsync::setResponseCallback(const Napi::CallbackInfo& info)
30{
31 Napi::Env env = info.Env();
32
33 // Arg 0: JavaScript callback function
34 if (info.Length() < 1 || !info[0].IsFunction()) {
35 throw Napi::TypeError::New(env, "First argument must be a function");
36 }
37
38 // Store the callback for lazy TSFN creation
39 // Don't create TSFN yet - it will be created on first acquire()
40 js_callback_ = Napi::Persistent(info[0].As<Napi::Function>());
41
42 // Start background polling thread now that callback is registered
44
45 // Detach the thread - it will run until process exits
46 // No need for explicit shutdown or join
47 poll_thread_.detach();
48
49 return env.Undefined();
50}
51
53{
54 constexpr uint64_t TIMEOUT_NS = 1000000000; // 1s
55
56 while (true) { // Run forever until process exits
57 // Poll for response (blocks with timeout using futex)
58 std::span<const uint8_t> response = client_->receive(TIMEOUT_NS);
59
60 if (response.empty()) {
61 // Timeout - just continue polling
62 continue;
63 }
64
65 // Copy response data before releasing (span is invalidated by release())
66 auto* response_data = new std::vector<uint8_t>(response.begin(), response.end());
67
68 // Release the message in ring buffer to free space
69 client_->release(response.size());
70
71 // Lock mutex to safely access TSFN
72 {
74
75 // TSFN is active - invoke JavaScript callback
76 // The callback will handle matching this response to the correct promise
77 auto status = tsfn_.NonBlockingCall(
78 response_data, [](Napi::Env env, Napi::Function js_callback, std::vector<uint8_t>* data) {
79 // This lambda runs on the JavaScript main thread!
80 // Safe to create JS objects and call functions here
81
82 // Create Buffer with response data
83 auto js_buffer = Napi::Buffer<uint8_t>::Copy(env, data->data(), data->size());
84
85 // Call the registered JavaScript callback with the response
86 // TypeScript will pop its queue and resolve the appropriate promise
87 js_callback.Call({ js_buffer });
88
89 // Clean up response data
90 delete data;
91 });
92
93 if (status != napi_ok) {
94 // Failed to queue callback - likely process is exiting
95 // Just clean up and continue (process will exit soon anyway)
96 delete response_data;
97 }
98 }
99 }
100}
101
102Napi::Value MsgpackClientAsync::call(const Napi::CallbackInfo& info)
103{
104 Napi::Env env = info.Env();
105
106 // Arg 0: msgpack buffer to send
107 if (info.Length() < 1 || !info[0].IsBuffer()) {
108 throw Napi::TypeError::New(env, "First argument must be a Buffer");
109 }
110
111 auto input_buffer = info[0].As<Napi::Buffer<uint8_t>>();
112 const uint8_t* input_data = input_buffer.Data();
113 size_t input_len = input_buffer.Length();
114
115 // Send request (non-blocking write to ring buffer with no timeout)
116 // TypeScript will handle promise creation and queueing
117 if (!client_->send(input_data, input_len, 0)) {
118 throw Napi::Error::New(env, "Failed to send request, ring buffer full. Make it bigger?");
119 }
120
121 // Return undefined - TypeScript manages promises
122 return env.Undefined();
123}
124
125Napi::Value MsgpackClientAsync::acquire(const Napi::CallbackInfo& info)
126{
127 Napi::Env env = info.Env();
128
130
131 if (ref_count_ == 0) {
132 // Lazily create TSFN when first needed (0 → 1)
133 tsfn_ = Napi::ThreadSafeFunction::New(env,
134 js_callback_.Value(), // The actual JS function to call
135 "ShmResponseCallback", // Resource name for debugging
136 0, // Unlimited queue size
137 1 // Initial thread count (must be >= 1)
138 );
139 }
140
141 ref_count_++;
142 return env.Undefined();
143}
144
145Napi::Value MsgpackClientAsync::release(const Napi::CallbackInfo& info)
146{
148
149 ref_count_--;
150
151 if (ref_count_ == 0) {
152 // Destroy TSFN when no longer needed (1 → 0)
153 // This releases the initial reference, bringing ref count to 0
154 tsfn_.Release();
155 }
156
157 return info.Env().Undefined();
158}
159
160Napi::Function MsgpackClientAsync::get_class(Napi::Env env)
161{
162 return DefineClass(
163 env,
164 "MsgpackClientAsync",
165 {
166 MsgpackClientAsync::InstanceMethod("setResponseCallback", &MsgpackClientAsync::setResponseCallback),
167 MsgpackClientAsync::InstanceMethod("call", &MsgpackClientAsync::call),
168 MsgpackClientAsync::InstanceMethod("acquire", &MsgpackClientAsync::acquire),
169 MsgpackClientAsync::InstanceMethod("release", &MsgpackClientAsync::release),
170 });
171}
static std::unique_ptr< IpcClient > create_shm(const std::string &base_name)
static Napi::Function get_class(Napi::Env env)
void poll_responses()
Background thread function that polls for responses.
Napi::Value call(const Napi::CallbackInfo &info)
Send a msgpack buffer asynchronously.
Napi::Value release(const Napi::CallbackInfo &info)
Release a reference to allow the event loop to exit Called by TypeScript when there are no pending ca...
Napi::Value setResponseCallback(const Napi::CallbackInfo &info)
Set the JavaScript callback to be invoked when responses arrive.
std::unique_ptr< bb::ipc::IpcClient > client_
Napi::Value acquire(const Napi::CallbackInfo &info)
Acquire a reference to keep the event loop alive Called by TypeScript when there are pending callback...
void info(Args... args)
Definition log.hpp:75
const std::vector< MemoryValue > data
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13