1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <cstddef> 18 #include <optional> 19 #include <span> 20 21 #include "pw_chrono/system_clock.h" 22 #include "pw_log_rpc/log_service.h" 23 #include "pw_log_rpc/rpc_log_drain_map.h" 24 #include "pw_multisink/multisink.h" 25 #include "pw_result/result.h" 26 #include "pw_rpc/raw/server_reader_writer.h" 27 #include "pw_status/status.h" 28 #include "pw_status/try.h" 29 #include "pw_sync/timed_thread_notification.h" 30 #include "pw_thread/thread_core.h" 31 32 namespace pw::log_rpc { 33 34 // RpcLogDrainThread is a single thread and single MultiSink::Listener that 35 // manages multiple log streams. It is a suitable option when a minimal 36 // thread count is desired but comes with the cost of individual log streams 37 // blocking each other's flushing. 38 class RpcLogDrainThread : public thread::ThreadCore, 39 public multisink::MultiSink::Listener { 40 public: RpcLogDrainThread(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map,std::span<std::byte> encoding_buffer)41 RpcLogDrainThread(multisink::MultiSink& multisink, 42 RpcLogDrainMap& drain_map, 43 std::span<std::byte> encoding_buffer) 44 : drain_map_(drain_map), 45 multisink_(multisink), 46 encoding_buffer_(encoding_buffer) {} 47 OnNewEntryAvailable()48 void OnNewEntryAvailable() override { 49 ready_to_flush_notification_.release(); 50 } 51 52 // Sequentially flushes each log stream. Run()53 void Run() override { 54 for (auto& drain : drain_map_.drains()) { 55 multisink_.AttachDrain(drain); 56 drain.set_on_open_callback( 57 [this]() { this->ready_to_flush_notification_.release(); }); 58 } 59 multisink_.AttachListener(*this); 60 61 bool drains_pending = true; 62 std::optional<chrono::SystemClock::duration> min_delay = 63 chrono::SystemClock::duration::zero(); 64 while (true) { 65 if (drains_pending && min_delay.has_value()) { 66 ready_to_flush_notification_.try_acquire_for(min_delay.value()); 67 } else { 68 ready_to_flush_notification_.acquire(); 69 } 70 drains_pending = false; 71 min_delay = std::nullopt; 72 for (auto& drain : drain_map_.drains()) { 73 std::optional<chrono::SystemClock::duration> drain_ready_in = 74 drain.Trickle(encoding_buffer_); 75 if (drain_ready_in.has_value()) { 76 min_delay = std::min(drain_ready_in.value(), 77 min_delay.value_or(drain_ready_in.value())); 78 drains_pending = true; 79 } 80 } 81 } 82 } 83 84 // Opens a server writer to set up an unrequested log stream. OpenUnrequestedLogStream(uint32_t channel_id,rpc::Server & rpc_server,LogService & log_service)85 Status OpenUnrequestedLogStream(uint32_t channel_id, 86 rpc::Server& rpc_server, 87 LogService& log_service) { 88 rpc::RawServerWriter writer = 89 rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>( 90 rpc_server, channel_id, log_service); 91 const Result<RpcLogDrain*> drain = 92 drain_map_.GetDrainFromChannelId(channel_id); 93 PW_TRY(drain.status()); 94 return drain.value()->Open(writer); 95 } 96 97 private: 98 sync::TimedThreadNotification ready_to_flush_notification_; 99 RpcLogDrainMap& drain_map_; 100 multisink::MultiSink& multisink_; 101 std::span<std::byte> encoding_buffer_; 102 }; 103 104 template <size_t kEncodingBufferSizeBytes> 105 class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread { 106 public: RpcLogDrainThreadWithBuffer(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map)107 RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink, 108 RpcLogDrainMap& drain_map) 109 : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {} 110 111 private: 112 static_assert(kEncodingBufferSizeBytes >= 113 RpcLogDrain::kLogEntriesEncodeFrameSize + 114 RpcLogDrain::kMinEntryBufferSize, 115 "RpcLogDrainThread's encoding buffer must be large enough for " 116 "at least one entry"); 117 118 std::byte encoding_buffer_array_[kEncodingBufferSizeBytes]; 119 }; 120 121 } // namespace pw::log_rpc 122