synapse-app-sdk
C++ SDK for Synapse Apps
Loading...
Searching...
No Matches
synapse_log_sink.hpp
1// Copyright Science Corporation 2025
2#pragma once
3#include <functional>
4#include <queue>
5#include <thread>
6#include <mutex>
7#include <condition_variable>
8
9#include <spdlog/spdlog.h>
10#include <spdlog/sinks/base_sink.h>
11
12namespace synapse::log {
13
14template <typename Mutex>
15class SynapseLogSink : public spdlog::sinks::base_sink<Mutex> {
16 public:
17 // Notify for new log messages
18 using log_callback_t = std::function<void(const synapse::LogEntry)>;
19
20 SynapseLogSink() {
21 running_.store(true);
22 worker_thread_ = std::thread([this]() { process_message_queue(); });
23 }
24
25 ~SynapseLogSink() {
26 {
27 std::lock_guard<std::mutex> lock(queue_mutex_);
28 running_.store(false);
29 queue_cv_.notify_all();
30 }
31 if (worker_thread_.joinable()) {
32 worker_thread_.join();
33 }
34 }
35
36 // Register a callback (subscriber) to new log messages
37 // Returns the id of the subscriber if successful, otherwise returns nullopt
38 std::optional<uint64_t> subscribe(log_callback_t callback) {
39 std::lock_guard<std::mutex> lock(callback_mutex_);
40 if (callbacks_.size() >= MAX_SUBSCRIBERS) {
41 return std::nullopt;
42 }
43 uint64_t id = callback_id_++;
44 callbacks_[id] = std::move(callback);
45 return id;
46 }
47
48 // Unregister the subscriber
49 bool unsubscribe(const uint64_t id) {
50 std::lock_guard<std::mutex> lock(callback_mutex_);
51 return callbacks_.erase(id) > 0;
52 }
53
54 protected:
55 void sink_it_(const spdlog::details::log_msg& msg) override {
56 // Check if the queue is full before pushing
57 std::lock_guard<std::mutex> lock(queue_mutex_);
58 if (queue_.size() >= MAX_QUEUE_SIZE) {
59 return;
60 }
61
62 // Convert the message to a LogEntry and add it to the queue
63 auto log_entry = convert_to_log_entry(msg);
64 queue_.push(std::move(log_entry));
65 queue_cv_.notify_one();
66 }
67
68 // Don't need to flush here
69 void flush_() override {}
70
71 private:
72 // Just limit the number of subscribers to 5, I don't expect there to be more than that
73 static constexpr uint64_t MAX_SUBSCRIBERS = 5;
74
75 // Max size of the queue, if we are this far behind we should be dropping logs
76 static constexpr uint64_t MAX_QUEUE_SIZE = 1000;
77
78 std::atomic<bool> running_ = false;
79
80 // Keep track of our subscribers
81 std::mutex callback_mutex_;
82 std::unordered_map<uint64_t, log_callback_t> callbacks_;
83
84 // We don't actually fire any of the callbacks in the hot loop, we add them to a queue and process
85 // them asynchronously This is to avoid blocking the hot loop and to avoid calling any spdlog
86 // functions in the callback (which would cause a circular dependency and result in a deadlock)
87 std::thread worker_thread_;
88 std::mutex queue_mutex_;
89 std::condition_variable queue_cv_;
90 std::queue<synapse::LogEntry> queue_;
91
92 std::atomic<uint64_t> callback_id_ = 0;
93
94 void process_message_queue() {
95 while (running_) {
96 synapse::LogEntry log_entry;
97 {
98 std::unique_lock<std::mutex> lock(queue_mutex_);
99 queue_cv_.wait(lock, [this]() { return !queue_.empty() || !running_; });
100
101 if (!running_ && queue_.empty()) {
102 break;
103 }
104
105 if (!queue_.empty()) {
106 log_entry = queue_.front();
107 queue_.pop();
108 }
109 }
110
111 // Call our callbacks while trying to minimize the time that the lock is held
112 {
113 std::lock_guard<std::mutex> lock(callback_mutex_);
114 for (const auto& [id, callback] : callbacks_) {
115 try {
116 callback(log_entry);
117 } catch (const std::exception& e) {
118 // Error here instead of log to avoid a circular dependency
119 // (we shouldn't be calling spdlog in the sink)
120 std::cerr << "Error processing callback in SynapseLogSink: " << e.what() << std::endl;
121 }
122 }
123 }
124 }
125 }
126
127 synapse::LogEntry convert_to_log_entry(const spdlog::details::log_msg& msg) {
128 // Create it into a log message for grpc
129 synapse::LogEntry entry;
130 const uint64_t timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
131 std::chrono::system_clock::now().time_since_epoch())
132 .count();
133 entry.set_timestamp_ns(timestamp_ns);
134
135 switch (msg.level) {
136 case spdlog::level::trace:
137 case spdlog::level::debug:
138 entry.set_level(synapse::LogLevel::LOG_LEVEL_DEBUG);
139 break;
140 case spdlog::level::info:
141 entry.set_level(synapse::LogLevel::LOG_LEVEL_INFO);
142 break;
143 case spdlog::level::warn:
144 entry.set_level(synapse::LogLevel::LOG_LEVEL_WARNING);
145 break;
146 case spdlog::level::err:
147 entry.set_level(synapse::LogLevel::LOG_LEVEL_ERROR);
148 break;
149 case spdlog::level::critical:
150 entry.set_level(synapse::LogLevel::LOG_LEVEL_CRITICAL);
151 break;
152 default:
153 entry.set_level(synapse::LogLevel::LOG_LEVEL_DEBUG);
154 break;
155 }
156 const std::string source = std::string(msg.logger_name.begin(), msg.logger_name.end());
157 if (source.empty()) {
158 entry.set_source("synapse-app");
159 } else {
160 entry.set_source(source);
161 }
162
163 // Try to convert the payload to a string, if it is binary data, we don't serialize it
164 try {
165 const std::string payload(msg.payload.begin(), msg.payload.end());
166 entry.set_message(payload);
167 } catch (const std::exception& e) {
168 // If it is binary data, we don't serialize it
169 entry.set_message("[BINARY DATA]");
170 }
171 return entry;
172 }
173};
174} // namespace synapse::log