synapse-app-sdk
C++ SDK for Synapse Apps
Loading...
Searching...
No Matches
taps.hpp
1#pragma once
2#include <thread>
3
4#include <spdlog/spdlog.h>
5#include <zmq.hpp>
6
7#include "synapse-app-sdk/middleware/conversions.hpp"
8
9#include "api/datatype.pb.h"
10#include "api/tap.pb.h"
11
12namespace synapse {
13
15const std::string kDefaultTapZMQIPC = "ipc:///tmp/tap_registry";
16
22class TapSocket {
23 public:
25 zmq::socket_t socket;
26
28 synapse::TapConnection connection;
29};
30
38 public:
46 ConsumerTap(const std::string& name, const std::string& message_type_name,
47 const std::function<void(zmq::message_t message)>& callback,
48 zmq::context_t& zmq_context)
49 : callback_(callback), zmq_context_(zmq_context) {
50 socket_ = zmq::socket_t(zmq_context_, zmq::socket_type::sub);
51 socket_.set(zmq::sockopt::subscribe, "");
52 socket_.bind("tcp://*:0");
53 connection_.set_name(name);
54 connection_.set_message_type(message_type_name);
55 connection_.set_endpoint(socket_.get(zmq::sockopt::last_endpoint));
56 connection_.set_tap_type(synapse::TapType::TAP_TYPE_CONSUMER);
57
58 spdlog::info("Consumer tap {} <{}> created at {}", name, message_type_name,
59 connection_.endpoint());
60 }
61
64
69 synapse::TapConnection connection() const { return connection_; }
70
77 void start() {
78 running_ = true;
79 read_thread_ = std::thread([this]() {
80 while (running_) {
81 zmq::message_t message;
82 try {
83 auto ret = socket_.recv(message, zmq::recv_flags::dontwait);
84 if (!ret.has_value()) {
85 std::this_thread::sleep_for(std::chrono::milliseconds(100));
86 continue;
87 }
88 callback_(std::move(message));
89 } catch (const std::exception& e) {
90 spdlog::error("Error receiving message from consumer tap {}: {}", connection_.name(),
91 e.what());
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
93 continue;
94 }
95 }
96 });
97 }
98
104 void stop() {
105 running_ = false;
106 if (read_thread_.joinable()) {
107 read_thread_.join();
108 }
109 }
110
111 private:
113 zmq::socket_t socket_;
115 zmq::context_t& zmq_context_;
117 std::function<void(zmq::message_t message)> callback_;
118
120 synapse::TapConnection connection_;
121
123 std::thread read_thread_;
124
126 std::atomic<bool> running_{false};
127};
128
135 public:
141 explicit TapManager(zmq::context_t& zmq_context,
142 const std::string& registry_endpoint = kDefaultTapZMQIPC);
143
147 ~TapManager();
148
155 std::optional<synapse::TapConnection> add_tap(const std::string& name,
156 const std::string& message_type_name);
157
164 std::optional<synapse::TapConnection> add_consumer_tap(
165 const std::string& name, const std::string& message_type_name,
166 const std::function<void(zmq::message_t message)>& callback) {
167 if (name.empty() || message_type_name.empty()) {
168 return std::nullopt;
169 }
170
171 std::lock_guard<std::mutex> lock(consumer_tap_mutex_);
172 if (consumer_taps_.count(name)) {
173 return std::nullopt;
174 }
175 spdlog::info("Trying to add consumer tap: {} <{}>", name, message_type_name);
176 auto consumer_tap =
177 std::make_shared<ConsumerTap>(name, message_type_name, callback, zmq_context_);
178 consumer_tap->start();
179 consumer_taps_[name] = consumer_tap;
180 return consumer_tap->connection();
181 }
182
191 template <typename T>
192 bool send_message(const std::string& name, const T& message) {
193 if (name.empty()) {
194 return false;
195 }
196
197 // Make sure we have this tap
198 if (!taps_.count(name)) {
199 return false;
200 }
201
202 auto out_message = protobuf_to_zmq_message(message);
203 const auto ret = taps_.at(name)->socket.send(out_message, zmq::send_flags::none);
204
205 // Non block send
206 return ret.has_value();
207 }
208
212 void stop();
213
214 private:
216 std::atomic<bool> running_{false};
217
219 zmq::context_t& zmq_context_;
220
222 std::thread tap_thread_;
223
225 zmq::socket_t req_socket_;
226
228 std::mutex tap_mutex_;
229
231 std::unordered_map<std::string, std::shared_ptr<TapSocket>> taps_;
232
233 std::mutex consumer_tap_mutex_;
234
235 std::unordered_map<std::string, std::shared_ptr<ConsumerTap>> consumer_taps_;
236
240 void run_tap_thread();
241};
242} // namespace synapse
ConsumerTap(const std::string &name, const std::string &message_type_name, const std::function< void(zmq::message_t message)> &callback, zmq::context_t &zmq_context)
Constructor for ConsumerTap.
Definition taps.hpp:46
~ConsumerTap()
Destructor - stops the consumer tap.
Definition taps.hpp:63
void stop()
Stops the consumer tap.
Definition taps.hpp:104
synapse::TapConnection connection() const
Gets the connection information for this tap.
Definition taps.hpp:69
void start()
Starts the consumer tap.
Definition taps.hpp:77
TapManager(zmq::context_t &zmq_context, const std::string &registry_endpoint=kDefaultTapZMQIPC)
Constructor for TapManager.
Definition taps.cpp:5
bool send_message(const std::string &name, const T &message)
Sends a message through a named tap.
Definition taps.hpp:192
void stop()
Stops the tap manager and releases resources.
Definition taps.cpp:46
std::optional< synapse::TapConnection > add_tap(const std::string &name, const std::string &message_type_name)
Adds a new tap with the specified name and message type.
Definition taps.cpp:14
std::optional< synapse::TapConnection > add_consumer_tap(const std::string &name, const std::string &message_type_name, const std::function< void(zmq::message_t message)> &callback)
Adds a new consumer tap with the specified name and message type.
Definition taps.hpp:164
~TapManager()
Destructor - ensures all resources are cleaned up.
Definition taps.cpp:12
Class representing a socket connection for a tap.
Definition taps.hpp:22
synapse::TapConnection connection
Connection information for the tap.
Definition taps.hpp:28
zmq::socket_t socket
ZMQ socket for the tap.
Definition taps.hpp:25