3#include "synapse-app-sdk/middleware/data_publisher.hpp"
4#include "synapse-app-sdk/middleware/data_reader.hpp"
5#include "synapse-app-sdk/middleware/taps.hpp"
6#include "synapse-app-sdk/utils/performance/function_profiler.hpp"
7#include "synapse-app-sdk/utils/time/time.hpp"
8#include "synapse-app-sdk/utils/config/config.hpp"
9#include "synapse-app-sdk/utils/logging/synapse_log_sink.hpp"
17#include "api/synapse.pb.h"
21const std::string kDefaultDeviceConfigPath =
"/opt/scifi/config/device.json";
38 explicit App(
const std::string& configuration_path);
51 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
52 "T must be a protobuf message type");
55 const google::protobuf::Descriptor* descriptor = T::descriptor();
56 if (descriptor ==
nullptr) {
57 spdlog::warn(
"Nullptr while creating tap descriptor: {}", name);
60 const std::string type_name = descriptor->full_name();
61 spdlog::info(
"Creating tap: {}, type: {}", name, type_name);
62 return tap_manager_.add_tap(name, type_name).has_value();
63 }
catch (
const std::exception& e) {
64 spdlog::warn(
"Failed to create tap with name: {}, why: {}", name, e.what());
78 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
79 "T must be a protobuf message type");
94 const std::function<
void(
const T& message)>& callback) {
95 static_assert(std::is_base_of<google::protobuf::Message, T>::value,
96 "T must be a protobuf message type");
98 auto callback_wrapper = [callback, name](zmq::message_t message) {
99 auto message_proto = parse_protobuf_message<T>(std::move(message));
100 if (message_proto.has_value() && callback) {
101 callback(message_proto.value());
103 spdlog::warn(
"Failed to parse protobuf message for consumer tap: {}", name);
106 return tap_manager_.add_consumer_tap(name, T::descriptor()->full_name(), callback_wrapper)
120 virtual bool start();
144 std::shared_ptr<synapse::log::SynapseLogSink<std::mutex>>
log_sink_{
nullptr};
163 const std::string& publisher_endpoint);
217 bool get_app_config(std::function<
bool(
const synapse::ApplicationNodeConfig&)> validator,
218 synapse::ApplicationNodeConfig& output_config) {
220 const auto app_node_configs =
222 if (app_node_configs.size() != 1) {
223 spdlog::error(
"Expected exactly one application node, got {}", app_node_configs.size());
224 spdlog::error(
"Did you forget to add the application node to the device configuration?");
227 const auto& node_config = app_node_configs[0];
229 if (!node_config.has_application()) {
230 spdlog::error(
"Configuration does not have application parameters");
233 const auto& app_node_config = node_config.application();
234 if (!validator(app_node_config)) {
235 spdlog::error(
"Application node configuration is not valid");
239 output_config.CopyFrom(app_node_config);
240 spdlog::info(
"Application node configuration: {}", output_config.DebugString());
245 void disable_log_streaming() { should_stream_logs_ =
false; }
260 void setup_logging();
268 bool setup_log_streaming();
271 std::atomic<bool> should_stream_logs_{
true};
317 template <
typename T>
319 std::signal(SIGINT, AppRunner::signal_handler);
321 app_ = std::make_unique<T>();
322 if (!app_->setup()) {
323 spdlog::error(
"Failed to setup Synapse application");
327 if (!app_->start()) {
328 spdlog::error(
"Failed to start Synapse application");
334 if (signal_received_) {
337 std::this_thread::sleep_for(std::chrono::milliseconds(100));
363 std::atomic<bool> running_{
false};
366 std::atomic<bool> signal_received_{
false};
369 std::unique_ptr<App> app_ =
nullptr;
Singleton class for running a Synapse application.
Definition app.hpp:279
AppRunner & operator=(AppRunner &&)=delete
Delete move assignment operator.
int run()
Runs an application of type T.
Definition app.hpp:318
AppRunner & operator=(const AppRunner &)=delete
Delete copy assignment operator.
void handle_signal(int signal)
Handles a signal.
Definition app.hpp:347
static AppRunner & instance()
Gets the singleton instance.
Definition app.hpp:285
AppRunner(const AppRunner &)=delete
Delete copy constructor.
void stop() noexcept
Stops the application.
Definition app.hpp:305
AppRunner(AppRunner &&)=delete
Delete move constructor.
void stop_profile(const std::string &name)
Stops profiling a function.
Definition app.cpp:145
virtual void main()=0
Main loop to be implemented by derived classes.
void start_profile(const std::string &name)
Starts profiling a function.
Definition app.cpp:138
virtual bool setup()=0
Setup function to be implemented by derived classes.
std::shared_ptr< synapse::log::SynapseLogSink< std::mutex > > log_sink_
Custom log sink for capturing and streaming application logs.
Definition app.hpp:144
std::shared_ptr< synapse::ZMQDataReader > data_reader_
Data reader for receiving data.
Definition app.hpp:135
bool create_consumer_tap(const std::string &name, const std::function< void(const T &message)> &callback)
Creates a consumer tap for receiving messages Tip: Don't do a lot of work in the callback,...
Definition app.hpp:93
virtual bool start()
Starts the application.
Definition app.cpp:47
std::thread main_loop_thread_
Thread for the main loop.
Definition app.hpp:248
synapse::FunctionProfilerManager function_profiler_manager_
Function profiler manager.
Definition app.hpp:176
bool enable_function_profiling(const std::chrono::milliseconds &period)
Enables function profiling.
Definition app.cpp:109
TapManager tap_manager_
Manager for handling taps.
Definition app.hpp:141
bool get_app_config(std::function< bool(const synapse::ApplicationNodeConfig &)> validator, synapse::ApplicationNodeConfig &output_config)
Gets the application parameters from the device configuration.
Definition app.hpp:217
bool load_device_configuration(const std::string &path)
Loads device configuration from file.
Definition app.cpp:85
bool publish_tap(const std::string &name, const T &message)
Publishes a message to a tap.
Definition app.hpp:77
bool create_tap(const std::string &name)
Creates a tap for publishing messages.
Definition app.hpp:50
std::atomic< bool > function_profiling_enabled_
Flag indicating if function profiling is enabled.
Definition app.hpp:182
virtual bool setup_reader(const uint32_t node_id)
Sets up the data reader.
Definition app.cpp:21
synapse::DeviceConfiguration device_configuration_
Device configuration.
Definition app.hpp:166
void print_profile(const std::string &name)
Prints profile information.
Definition app.cpp:152
std::shared_ptr< synapse::DataPublisher > data_publisher_
Data publisher for sending data.
Definition app.hpp:138
virtual ~App()=default
Virtual destructor.
std::thread function_profiling_thread_
Thread for function profiling.
Definition app.hpp:179
virtual bool setup_publisher(const PublisherType &publisher_type, const std::string &publisher_endpoint)
Sets up the data publisher.
Definition app.cpp:34
std::optional< uint64_t > log_subscription_id_
ID for the log streaming subscription.
Definition app.hpp:147
virtual void stop()
Stops the application.
Definition app.cpp:65
std::atomic< bool > node_running_
Flag indicating if the node is running.
Definition app.hpp:129
App()
Default constructor.
Definition app.cpp:6
zmq::context_t zmq_context_
ZMQ context for communication.
Definition app.hpp:132
Manager class for multiple function profilers.
Definition function_profiler.hpp:82
Manager class for handling data taps.
Definition taps.hpp:134