synapse-app-sdk
C++ SDK for Synapse Apps
Loading...
Searching...
No Matches
app.hpp
1#pragma once
2
3#include "synapse-app-sdk/middleware/data_publisher.hpp"
4#include "synapse-app-sdk/middleware/data_reader.hpp"
5#include "synapse-app-sdk/middleware/taps.hpp"
6#include "synapse-app-sdk/utils/performance/function_profiler.hpp"
7#include "synapse-app-sdk/utils/time/time.hpp"
8#include "synapse-app-sdk/utils/config/config.hpp"
9#include "synapse-app-sdk/utils/logging/synapse_log_sink.hpp"
10#include <atomic>
11#include <csignal>
12#include <filesystem>
13#include <fstream>
14#include <iostream>
15#include <memory>
16
17#include "api/synapse.pb.h"
18
19namespace synapse {
21const std::string kDefaultDeviceConfigPath = "/opt/scifi/config/device.json";
22
29class App {
30 public:
32 App();
33
38 explicit App(const std::string& configuration_path);
39
41 virtual ~App() = default;
42
49 template <typename T>
50 bool create_tap(const std::string& name) {
51 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
52 "T must be a protobuf message type");
53 // Get the type name before creating the tap
54 try {
55 const google::protobuf::Descriptor* descriptor = T::descriptor();
56 if (descriptor == nullptr) {
57 spdlog::warn("Nullptr while creating tap descriptor: {}", name);
58 return false;
59 }
60 const std::string type_name = descriptor->full_name();
61 spdlog::info("Creating tap: {}, type: {}", name, type_name);
62 return tap_manager_.add_tap(name, type_name).has_value();
63 } catch (const std::exception& e) {
64 spdlog::warn("Failed to create tap with name: {}, why: {}", name, e.what());
65 return false;
66 }
67 }
68
76 template <typename T>
77 bool publish_tap(const std::string& name, const T& message) {
78 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
79 "T must be a protobuf message type");
80 return tap_manager_.send_message(name, message);
81 }
82
92 template <typename T>
93 bool create_consumer_tap(const std::string& name,
94 const std::function<void(const T& message)>& callback) {
95 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
96 "T must be a protobuf message type");
97
98 auto callback_wrapper = [callback, name](zmq::message_t message) {
99 auto message_proto = parse_protobuf_message<T>(std::move(message));
100 if (message_proto.has_value() && callback) {
101 callback(message_proto.value());
102 } else {
103 spdlog::warn("Failed to parse protobuf message for consumer tap: {}", name);
104 }
105 };
106 return tap_manager_.add_consumer_tap(name, T::descriptor()->full_name(), callback_wrapper)
107 .has_value();
108 }
109
114 virtual bool setup() = 0;
115
120 virtual bool start();
121
125 virtual void stop();
126
127 protected:
129 std::atomic<bool> node_running_{false};
130
132 zmq::context_t zmq_context_;
133
135 std::shared_ptr<synapse::ZMQDataReader> data_reader_{nullptr};
136
138 std::shared_ptr<synapse::DataPublisher> data_publisher_{nullptr};
139
142
144 std::shared_ptr<synapse::log::SynapseLogSink<std::mutex>> log_sink_{nullptr};
145
147 std::optional<uint64_t> log_subscription_id_{std::nullopt};
148
154 virtual bool setup_reader(const uint32_t node_id);
155
162 virtual bool setup_publisher(const PublisherType& publisher_type,
163 const std::string& publisher_endpoint);
164
166 synapse::DeviceConfiguration device_configuration_;
167
173 bool load_device_configuration(const std::string& path);
174
177
180
182 std::atomic<bool> function_profiling_enabled_{false};
183
189 bool enable_function_profiling(const std::chrono::milliseconds& period);
190
195 void start_profile(const std::string& name);
196
201 void stop_profile(const std::string& name);
202
207 void print_profile(const std::string& name);
208
217 bool get_app_config(std::function<bool(const synapse::ApplicationNodeConfig&)> validator,
218 synapse::ApplicationNodeConfig& output_config) {
219 // First, make sure we have a node of the correct type
220 const auto app_node_configs =
221 get_nodes_for_type(device_configuration_, synapse::NodeType::kApplication);
222 if (app_node_configs.size() != 1) {
223 spdlog::error("Expected exactly one application node, got {}", app_node_configs.size());
224 spdlog::error("Did you forget to add the application node to the device configuration?");
225 return false;
226 }
227 const auto& node_config = app_node_configs[0];
228
229 if (!node_config.has_application()) {
230 spdlog::error("Configuration does not have application parameters");
231 return false;
232 }
233 const auto& app_node_config = node_config.application();
234 if (!validator(app_node_config)) {
235 spdlog::error("Application node configuration is not valid");
236 return false;
237 }
238
239 output_config.CopyFrom(app_node_config);
240 spdlog::info("Application node configuration: {}", output_config.DebugString());
241
242 return true;
243 }
244
245 void disable_log_streaming() { should_stream_logs_ = false; }
246
248 std::thread main_loop_thread_;
249
253 virtual void main() = 0;
254
255 private:
260 void setup_logging();
261
268 bool setup_log_streaming();
269
271 std::atomic<bool> should_stream_logs_{true};
272};
273
280 public:
285 static AppRunner& instance() {
286 static AppRunner instance;
287 return instance;
288 }
289
291 AppRunner(const AppRunner&) = delete;
292
294 AppRunner& operator=(const AppRunner&) = delete;
295
297 AppRunner(AppRunner&&) = delete;
298
301
305 void stop() noexcept {
306 if (app_) {
307 app_->stop();
308 }
309 running_ = false;
310 }
311
317 template <typename T>
318 int run() {
319 std::signal(SIGINT, AppRunner::signal_handler);
320
321 app_ = std::make_unique<T>();
322 if (!app_->setup()) {
323 spdlog::error("Failed to setup Synapse application");
324 return 1;
325 }
326
327 if (!app_->start()) {
328 spdlog::error("Failed to start Synapse application");
329 return 1;
330 }
331
332 running_ = true;
333 while (running_) {
334 if (signal_received_) {
335 stop();
336 }
337 std::this_thread::sleep_for(std::chrono::milliseconds(100));
338 }
339
340 return 0;
341 }
342
347 void handle_signal(int signal) { signal_received_ = true; }
348
349 private:
351 AppRunner() = default;
352
354 ~AppRunner() = default;
355
360 static void signal_handler(int signal) { AppRunner::instance().handle_signal(signal); }
361
363 std::atomic<bool> running_{false};
364
366 std::atomic<bool> signal_received_{false};
367
369 std::unique_ptr<App> app_ = nullptr;
370};
371
377template <typename T>
378int Entrypoint() {
379 return AppRunner::instance().run<T>();
380}
381
382} // namespace synapse
Singleton class for running a Synapse application.
Definition app.hpp:279
AppRunner & operator=(AppRunner &&)=delete
Delete move assignment operator.
int run()
Runs an application of type T.
Definition app.hpp:318
AppRunner & operator=(const AppRunner &)=delete
Delete copy assignment operator.
void handle_signal(int signal)
Handles a signal.
Definition app.hpp:347
static AppRunner & instance()
Gets the singleton instance.
Definition app.hpp:285
AppRunner(const AppRunner &)=delete
Delete copy constructor.
void stop() noexcept
Stops the application.
Definition app.hpp:305
AppRunner(AppRunner &&)=delete
Delete move constructor.
void stop_profile(const std::string &name)
Stops profiling a function.
Definition app.cpp:145
virtual void main()=0
Main loop to be implemented by derived classes.
void start_profile(const std::string &name)
Starts profiling a function.
Definition app.cpp:138
virtual bool setup()=0
Setup function to be implemented by derived classes.
std::shared_ptr< synapse::log::SynapseLogSink< std::mutex > > log_sink_
Custom log sink for capturing and streaming application logs.
Definition app.hpp:144
std::shared_ptr< synapse::ZMQDataReader > data_reader_
Data reader for receiving data.
Definition app.hpp:135
bool create_consumer_tap(const std::string &name, const std::function< void(const T &message)> &callback)
Creates a consumer tap for receiving messages Tip: Don't do a lot of work in the callback,...
Definition app.hpp:93
virtual bool start()
Starts the application.
Definition app.cpp:47
std::thread main_loop_thread_
Thread for the main loop.
Definition app.hpp:248
synapse::FunctionProfilerManager function_profiler_manager_
Function profiler manager.
Definition app.hpp:176
bool enable_function_profiling(const std::chrono::milliseconds &period)
Enables function profiling.
Definition app.cpp:109
TapManager tap_manager_
Manager for handling taps.
Definition app.hpp:141
bool get_app_config(std::function< bool(const synapse::ApplicationNodeConfig &)> validator, synapse::ApplicationNodeConfig &output_config)
Gets the application parameters from the device configuration.
Definition app.hpp:217
bool load_device_configuration(const std::string &path)
Loads device configuration from file.
Definition app.cpp:85
bool publish_tap(const std::string &name, const T &message)
Publishes a message to a tap.
Definition app.hpp:77
bool create_tap(const std::string &name)
Creates a tap for publishing messages.
Definition app.hpp:50
std::atomic< bool > function_profiling_enabled_
Flag indicating if function profiling is enabled.
Definition app.hpp:182
virtual bool setup_reader(const uint32_t node_id)
Sets up the data reader.
Definition app.cpp:21
synapse::DeviceConfiguration device_configuration_
Device configuration.
Definition app.hpp:166
void print_profile(const std::string &name)
Prints profile information.
Definition app.cpp:152
std::shared_ptr< synapse::DataPublisher > data_publisher_
Data publisher for sending data.
Definition app.hpp:138
virtual ~App()=default
Virtual destructor.
std::thread function_profiling_thread_
Thread for function profiling.
Definition app.hpp:179
virtual bool setup_publisher(const PublisherType &publisher_type, const std::string &publisher_endpoint)
Sets up the data publisher.
Definition app.cpp:34
std::optional< uint64_t > log_subscription_id_
ID for the log streaming subscription.
Definition app.hpp:147
virtual void stop()
Stops the application.
Definition app.cpp:65
std::atomic< bool > node_running_
Flag indicating if the node is running.
Definition app.hpp:129
App()
Default constructor.
Definition app.cpp:6
zmq::context_t zmq_context_
ZMQ context for communication.
Definition app.hpp:132
Manager class for multiple function profilers.
Definition function_profiler.hpp:82
Manager class for handling data taps.
Definition taps.hpp:134