24 : socket_path_(
std::move(socket_path))
25 , initial_max_clients_(initial_max_clients)
27 const size_t reserve_size = initial_max_clients > 0 ?
static_cast<size_t>(initial_max_clients) : 10;
73 return static_cast<int>(i);
83 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size() ||
89 int fd =
client_fds_[
static_cast<size_t>(client_id)];
92 auto msg_len =
static_cast<uint32_t
>(
len);
93 ssize_t n =
::send(fd, &msg_len,
sizeof(msg_len), 0);
94 if (n < 0 ||
static_cast<size_t>(n) !=
sizeof(msg_len)) {
103 const auto bytes_sent =
static_cast<size_t>(n);
104 return bytes_sent ==
len;
116 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size() ||
121 int fd =
client_fds_[
static_cast<size_t>(client_id)];
122 const auto client_idx =
static_cast<size_t>(client_id);
130 uint32_t msg_len = 0;
131 size_t total_read = 0;
132 while (total_read <
sizeof(msg_len)) {
133 ssize_t n = ::recv(fd,
reinterpret_cast<uint8_t*
>(&msg_len) + total_read,
sizeof(msg_len) - total_read, 0);
135 if (errno == EINTR) {
145 total_read +=
static_cast<size_t>(n);
149 size_t total_size =
sizeof(uint32_t) + msg_len;
159 while (total_read < msg_len) {
161 ::recv(fd,
recv_buffers_[client_idx].
data() +
sizeof(uint32_t) + total_read, msg_len - total_read, 0);
163 if (errno == EINTR) {
174 total_read +=
static_cast<size_t>(n);
202 if (flags < 0 || fcntl(
listen_fd_, F_SETFL, flags | O_NONBLOCK) < 0) {
209 struct sockaddr_un addr;
211 addr.sun_family = AF_UNIX;
214 if (bind(
listen_fd_,
reinterpret_cast<struct sockaddr*
>(&addr),
sizeof(addr)) < 0) {
240 EV_SET(&ev,
listen_fd_, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
nullptr);
241 if (kevent(
fd_, &ev, 1,
nullptr, 0,
nullptr) < 0) {
262 int last_client_id = -1;
269 if (errno == EAGAIN || errno == EWOULDBLOCK) {
274 if (last_client_id >= 0) {
283 int flags = fcntl(client_fd, F_GETFL, 0);
285 fcntl(client_fd, F_SETFL, flags & ~O_NONBLOCK);
292 const auto client_id_unsigned =
static_cast<size_t>(client_id);
296 client_fds_[
static_cast<size_t>(client_id)] = client_fd;
302 EV_SET(&kev, client_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
nullptr);
303 if (kevent(
fd_, &kev, 1,
nullptr, 0,
nullptr) < 0) {
309 last_client_id = client_id;
312 return last_client_id;
323 struct timespec timeout;
324 struct timespec* timeout_ptr =
nullptr;
326 if (timeout_ns > 0) {
327 timeout.tv_sec =
static_cast<time_t
>(timeout_ns / 1000000000ULL);
328 timeout.tv_nsec =
static_cast<long>(timeout_ns % 1000000000ULL);
329 timeout_ptr = &timeout;
330 }
else if (timeout_ns == 0) {
333 timeout_ptr = &timeout;
336 int n = kevent(
fd_,
nullptr, 0, &ev, 1, timeout_ptr);
341 int ready_fd =
static_cast<int>(ev.ident);
361 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size()) {
365 int fd =
client_fds_[
static_cast<size_t>(client_id)];
370 EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0,
nullptr);
371 kevent(
fd_, &ev, 1,
nullptr, 0,
nullptr);
403 if (flags < 0 || fcntl(
listen_fd_, F_SETFL, flags | O_NONBLOCK) < 0) {
410 struct sockaddr_un addr;
412 addr.sun_family = AF_UNIX;
415 if (bind(
listen_fd_,
reinterpret_cast<struct sockaddr*
>(&addr),
sizeof(addr)) < 0) {
431 fd_ = epoll_create1(0);
440 struct epoll_event ev;
464 int last_client_id = -1;
471 if (errno == EAGAIN || errno == EWOULDBLOCK) {
476 if (last_client_id >= 0) {
485 int flags = fcntl(client_fd, F_GETFL, 0);
487 fcntl(client_fd, F_SETFL, flags & ~O_NONBLOCK);
494 const auto client_id_unsigned =
static_cast<size_t>(client_id);
498 client_fds_[
static_cast<size_t>(client_id)] = client_fd;
503 struct epoll_event client_ev;
504 client_ev.events = EPOLLIN;
505 client_ev.data.fd = client_fd;
506 if (epoll_ctl(
fd_, EPOLL_CTL_ADD, client_fd, &client_ev) < 0) {
512 last_client_id = client_id;
515 return last_client_id;
525 struct epoll_event ev;
526 int timeout_ms = timeout_ns > 0 ?
static_cast<int>(timeout_ns / 1000000) : -1;
527 int n = epoll_wait(
fd_, &ev, 1, timeout_ms);
550 if (client_id < 0 ||
static_cast<size_t>(client_id) >=
client_fds_.size()) {
554 int fd =
client_fds_[
static_cast<size_t>(client_id)];
556 epoll_ctl(
fd_, EPOLL_CTL_DEL, fd,
nullptr);
int accept() override
Accept a new client connection (optional for some transports)
SocketServer(std::string socket_path, int initial_max_clients)
std::vector< std::vector< uint8_t > > recv_buffers_
bool listen() override
Start listening for client connections.
std::vector< int > client_fds_
void close() override
Close the server and all client connections.
std::unordered_map< int, int > fd_to_client_id_
void disconnect_client(int client_id)
bool send(int client_id, const void *data, size_t len) override
Send a message to a specific client.
std::span< const uint8_t > receive(int client_id) override
Receive next message from a specific client.
void release(int client_id, size_t message_size) override
Release/consume the previously received message.
int wait_for_data(uint64_t timeout_ns) override
Wait for data from any connected client.
const std::vector< MemoryValue > data
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept