15class SynapseLogSink :
public spdlog::sinks::base_sink<Mutex> {
18 using log_callback_t = std::function<void(
const synapse::LogEntry)>;
22 worker_thread_ = std::thread([
this]() { process_message_queue(); });
27 std::lock_guard<std::mutex> lock(queue_mutex_);
28 running_.store(
false);
29 queue_cv_.notify_all();
31 if (worker_thread_.joinable()) {
32 worker_thread_.join();
38 std::optional<uint64_t> subscribe(log_callback_t callback) {
39 std::lock_guard<std::mutex> lock(callback_mutex_);
40 if (callbacks_.size() >= MAX_SUBSCRIBERS) {
43 uint64_t
id = callback_id_++;
44 callbacks_[id] = std::move(callback);
49 bool unsubscribe(
const uint64_t
id) {
50 std::lock_guard<std::mutex> lock(callback_mutex_);
51 return callbacks_.erase(
id) > 0;
55 void sink_it_(
const spdlog::details::log_msg& msg)
override {
57 std::lock_guard<std::mutex> lock(queue_mutex_);
58 if (queue_.size() >= MAX_QUEUE_SIZE) {
63 auto log_entry = convert_to_log_entry(msg);
64 queue_.push(std::move(log_entry));
65 queue_cv_.notify_one();
69 void flush_()
override {}
73 static constexpr uint64_t MAX_SUBSCRIBERS = 5;
76 static constexpr uint64_t MAX_QUEUE_SIZE = 1000;
78 std::atomic<bool> running_ =
false;
81 std::mutex callback_mutex_;
82 std::unordered_map<uint64_t, log_callback_t> callbacks_;
87 std::thread worker_thread_;
88 std::mutex queue_mutex_;
89 std::condition_variable queue_cv_;
90 std::queue<synapse::LogEntry> queue_;
92 std::atomic<uint64_t> callback_id_ = 0;
94 void process_message_queue() {
96 synapse::LogEntry log_entry;
98 std::unique_lock<std::mutex> lock(queue_mutex_);
99 queue_cv_.wait(lock, [
this]() {
return !queue_.empty() || !running_; });
101 if (!running_ && queue_.empty()) {
105 if (!queue_.empty()) {
106 log_entry = queue_.front();
113 std::lock_guard<std::mutex> lock(callback_mutex_);
114 for (
const auto& [
id, callback] : callbacks_) {
117 }
catch (
const std::exception& e) {
120 std::cerr <<
"Error processing callback in SynapseLogSink: " << e.what() << std::endl;
127 synapse::LogEntry convert_to_log_entry(
const spdlog::details::log_msg& msg) {
129 synapse::LogEntry entry;
130 const uint64_t timestamp_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
131 std::chrono::system_clock::now().time_since_epoch())
133 entry.set_timestamp_ns(timestamp_ns);
136 case spdlog::level::trace:
137 case spdlog::level::debug:
138 entry.set_level(synapse::LogLevel::LOG_LEVEL_DEBUG);
140 case spdlog::level::info:
141 entry.set_level(synapse::LogLevel::LOG_LEVEL_INFO);
143 case spdlog::level::warn:
144 entry.set_level(synapse::LogLevel::LOG_LEVEL_WARNING);
146 case spdlog::level::err:
147 entry.set_level(synapse::LogLevel::LOG_LEVEL_ERROR);
149 case spdlog::level::critical:
150 entry.set_level(synapse::LogLevel::LOG_LEVEL_CRITICAL);
153 entry.set_level(synapse::LogLevel::LOG_LEVEL_DEBUG);
156 const std::string source = std::string(msg.logger_name.begin(), msg.logger_name.end());
157 if (source.empty()) {
158 entry.set_source(
"synapse-app");
160 entry.set_source(source);
165 const std::string payload(msg.payload.begin(), msg.payload.end());
166 entry.set_message(payload);
167 }
catch (
const std::exception& e) {
169 entry.set_message(
"[BINARY DATA]");