4#include <spdlog/spdlog.h>
7#include "synapse-app-sdk/middleware/conversions.hpp"
9#include "api/datatype.pb.h"
10#include "api/tap.pb.h"
15const std::string kDefaultTapZMQIPC =
"ipc:///tmp/tap_registry";
46 ConsumerTap(
const std::string& name,
const std::string& message_type_name,
47 const std::function<
void(zmq::message_t message)>& callback,
48 zmq::context_t& zmq_context)
49 : callback_(callback), zmq_context_(zmq_context) {
50 socket_ = zmq::socket_t(zmq_context_, zmq::socket_type::sub);
51 socket_.set(zmq::sockopt::subscribe,
"");
52 socket_.bind(
"tcp://*:0");
53 connection_.set_name(name);
54 connection_.set_message_type(message_type_name);
55 connection_.set_endpoint(socket_.get(zmq::sockopt::last_endpoint));
56 connection_.set_tap_type(synapse::TapType::TAP_TYPE_CONSUMER);
58 spdlog::info(
"Consumer tap {} <{}> created at {}", name, message_type_name,
59 connection_.endpoint());
69 synapse::TapConnection
connection()
const {
return connection_; }
79 read_thread_ = std::thread([
this]() {
81 zmq::message_t message;
83 auto ret = socket_.recv(message, zmq::recv_flags::dontwait);
84 if (!ret.has_value()) {
85 std::this_thread::sleep_for(std::chrono::milliseconds(100));
88 callback_(std::move(message));
89 }
catch (
const std::exception& e) {
90 spdlog::error(
"Error receiving message from consumer tap {}: {}", connection_.name(),
92 std::this_thread::sleep_for(std::chrono::milliseconds(100));
106 if (read_thread_.joinable()) {
113 zmq::socket_t socket_;
115 zmq::context_t& zmq_context_;
117 std::function<void(zmq::message_t message)> callback_;
120 synapse::TapConnection connection_;
123 std::thread read_thread_;
126 std::atomic<bool> running_{
false};
141 explicit TapManager(zmq::context_t& zmq_context,
142 const std::string& registry_endpoint = kDefaultTapZMQIPC);
155 std::optional<synapse::TapConnection>
add_tap(
const std::string& name,
156 const std::string& message_type_name);
165 const std::string& name,
const std::string& message_type_name,
166 const std::function<
void(zmq::message_t message)>& callback) {
167 if (name.empty() || message_type_name.empty()) {
171 std::lock_guard<std::mutex> lock(consumer_tap_mutex_);
172 if (consumer_taps_.count(name)) {
175 spdlog::info(
"Trying to add consumer tap: {} <{}>", name, message_type_name);
177 std::make_shared<ConsumerTap>(name, message_type_name, callback, zmq_context_);
178 consumer_tap->start();
179 consumer_taps_[name] = consumer_tap;
180 return consumer_tap->connection();
191 template <
typename T>
198 if (!taps_.count(name)) {
202 auto out_message = protobuf_to_zmq_message(message);
203 const auto ret = taps_.at(name)->socket.send(out_message, zmq::send_flags::none);
206 return ret.has_value();
216 std::atomic<bool> running_{
false};
219 zmq::context_t& zmq_context_;
222 std::thread tap_thread_;
225 zmq::socket_t req_socket_;
228 std::mutex tap_mutex_;
231 std::unordered_map<std::string, std::shared_ptr<TapSocket>> taps_;
233 std::mutex consumer_tap_mutex_;
235 std::unordered_map<std::string, std::shared_ptr<ConsumerTap>> consumer_taps_;
240 void run_tap_thread();
ConsumerTap(const std::string &name, const std::string &message_type_name, const std::function< void(zmq::message_t message)> &callback, zmq::context_t &zmq_context)
Constructor for ConsumerTap.
Definition taps.hpp:46
~ConsumerTap()
Destructor - stops the consumer tap.
Definition taps.hpp:63
void stop()
Stops the consumer tap.
Definition taps.hpp:104
synapse::TapConnection connection() const
Gets the connection information for this tap.
Definition taps.hpp:69
void start()
Starts the consumer tap.
Definition taps.hpp:77
TapManager(zmq::context_t &zmq_context, const std::string ®istry_endpoint=kDefaultTapZMQIPC)
Constructor for TapManager.
Definition taps.cpp:5
bool send_message(const std::string &name, const T &message)
Sends a message through a named tap.
Definition taps.hpp:192
void stop()
Stops the tap manager and releases resources.
Definition taps.cpp:46
std::optional< synapse::TapConnection > add_tap(const std::string &name, const std::string &message_type_name)
Adds a new tap with the specified name and message type.
Definition taps.cpp:14
std::optional< synapse::TapConnection > add_consumer_tap(const std::string &name, const std::string &message_type_name, const std::function< void(zmq::message_t message)> &callback)
Adds a new consumer tap with the specified name and message type.
Definition taps.hpp:164
~TapManager()
Destructor - ensures all resources are cleaned up.
Definition taps.cpp:12
Class representing a socket connection for a tap.
Definition taps.hpp:22
synapse::TapConnection connection
Connection information for the tap.
Definition taps.hpp:28
zmq::socket_t socket
ZMQ socket for the tap.
Definition taps.hpp:25