synapse-app-sdk
C++ SDK for Synapse Apps
Loading...
Searching...
No Matches
data_publisher.hpp
1#pragma once
2
3#include <atomic>
4#include <optional>
5#include <thread>
6
7#include "spdlog/spdlog.h"
8#include "zmq.hpp"
9
10#include "synapse-app-sdk/middleware/conversions.hpp"
11
12namespace synapse {
13
17enum class PublisherType : uint8_t {
18 kUnknown = 0,
19 ZMQ_IPC = 1,
20 ZMQ_TCP = 2,
21 UDP = 3
22};
23
30 public:
36 DataPublisher(const PublisherType& publisher_type, zmq::context_t& zmq_context)
37 : publisher_type_(publisher_type),
38 zmq_context_(zmq_context),
39 publisher_socket_(zmq_context_, zmq::socket_type::pub) {}
40
45 if (connected_) {
46 publisher_socket_.close();
47 }
48 }
49
55 bool setup(const std::string& endpoint) {
56 try {
57 publisher_socket_.bind(endpoint);
58 connected_ = true;
59 return true;
60 } catch (const std::exception& e) {
61 spdlog::error("Failed to bind data publisher to: {}, why: {}", endpoint, e.what());
62 return false;
63 }
64 }
65
75 template <typename T>
76 bool try_publish(const T& message) {
77 if (!connected_) {
78 spdlog::warn("Could not publish, did you connect?");
79 return false;
80 }
81 // Send the topic data first
82 const std::string kControllerZMQTopic = "controller/output";
83 zmq::message_t topic_message(kControllerZMQTopic.begin(), kControllerZMQTopic.end());
84 const auto topic_send_ret = publisher_socket_.send(topic_message, zmq::send_flags::sndmore);
85 if (!topic_send_ret.has_value() || topic_send_ret.value() == 0) {
86 spdlog::error("Failed to publish topic data for controller node");
87 return false;
88 }
89
90 // TODO: We will need to abstract this out for different publishers
91 zmq::message_t out_message = protobuf_to_zmq_message(message);
92 if (out_message.size() == 0) {
93 spdlog::error("Failed to convert protobuf to zmq message for sending");
94 return false;
95 }
96
97 auto result = publisher_socket_.send(out_message, zmq::send_flags::dontwait);
98 return result.has_value();
99 }
100
101 private:
103 PublisherType publisher_type_;
104
106 std::atomic<bool> connected_{false};
107
109 zmq::context_t& zmq_context_;
110
112 zmq::socket_t publisher_socket_;
113};
114} // namespace synapse
~DataPublisher()
Destructor - ensures socket is closed.
Definition data_publisher.hpp:44
bool setup(const std::string &endpoint)
Sets up the publisher with the specified endpoint.
Definition data_publisher.hpp:55
DataPublisher(const PublisherType &publisher_type, zmq::context_t &zmq_context)
Constructor for DataPublisher.
Definition data_publisher.hpp:36
bool try_publish(const T &message)
Non-blocking publish of a message.
Definition data_publisher.hpp:76