• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 
17 #include <cstddef>
18 #include <optional>
19 #include <span>
20 
21 #include "pw_chrono/system_clock.h"
22 #include "pw_log_rpc/log_service.h"
23 #include "pw_log_rpc/rpc_log_drain_map.h"
24 #include "pw_multisink/multisink.h"
25 #include "pw_result/result.h"
26 #include "pw_rpc/raw/server_reader_writer.h"
27 #include "pw_status/status.h"
28 #include "pw_status/try.h"
29 #include "pw_sync/timed_thread_notification.h"
30 #include "pw_thread/thread_core.h"
31 
32 namespace pw::log_rpc {
33 
34 // RpcLogDrainThread is a single thread and single MultiSink::Listener that
35 // manages multiple log streams. It is a suitable option when a minimal
36 // thread count is desired but comes with the cost of individual log streams
37 // blocking each other's flushing.
38 class RpcLogDrainThread : public thread::ThreadCore,
39                           public multisink::MultiSink::Listener {
40  public:
RpcLogDrainThread(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map,std::span<std::byte> encoding_buffer)41   RpcLogDrainThread(multisink::MultiSink& multisink,
42                     RpcLogDrainMap& drain_map,
43                     std::span<std::byte> encoding_buffer)
44       : drain_map_(drain_map),
45         multisink_(multisink),
46         encoding_buffer_(encoding_buffer) {}
47 
OnNewEntryAvailable()48   void OnNewEntryAvailable() override {
49     ready_to_flush_notification_.release();
50   }
51 
52   // Sequentially flushes each log stream.
Run()53   void Run() override {
54     for (auto& drain : drain_map_.drains()) {
55       multisink_.AttachDrain(drain);
56       drain.set_on_open_callback(
57           [this]() { this->ready_to_flush_notification_.release(); });
58     }
59     multisink_.AttachListener(*this);
60 
61     bool drains_pending = true;
62     std::optional<chrono::SystemClock::duration> min_delay =
63         chrono::SystemClock::duration::zero();
64     while (true) {
65       if (drains_pending && min_delay.has_value()) {
66         ready_to_flush_notification_.try_acquire_for(min_delay.value());
67       } else {
68         ready_to_flush_notification_.acquire();
69       }
70       drains_pending = false;
71       min_delay = std::nullopt;
72       for (auto& drain : drain_map_.drains()) {
73         std::optional<chrono::SystemClock::duration> drain_ready_in =
74             drain.Trickle(encoding_buffer_);
75         if (drain_ready_in.has_value()) {
76           min_delay = std::min(drain_ready_in.value(),
77                                min_delay.value_or(drain_ready_in.value()));
78           drains_pending = true;
79         }
80       }
81     }
82   }
83 
84   // Opens a server writer to set up an unrequested log stream.
OpenUnrequestedLogStream(uint32_t channel_id,rpc::Server & rpc_server,LogService & log_service)85   Status OpenUnrequestedLogStream(uint32_t channel_id,
86                                   rpc::Server& rpc_server,
87                                   LogService& log_service) {
88     rpc::RawServerWriter writer =
89         rpc::RawServerWriter::Open<log::pw_rpc::raw::Logs::Listen>(
90             rpc_server, channel_id, log_service);
91     const Result<RpcLogDrain*> drain =
92         drain_map_.GetDrainFromChannelId(channel_id);
93     PW_TRY(drain.status());
94     return drain.value()->Open(writer);
95   }
96 
97  private:
98   sync::TimedThreadNotification ready_to_flush_notification_;
99   RpcLogDrainMap& drain_map_;
100   multisink::MultiSink& multisink_;
101   std::span<std::byte> encoding_buffer_;
102 };
103 
104 template <size_t kEncodingBufferSizeBytes>
105 class RpcLogDrainThreadWithBuffer final : public RpcLogDrainThread {
106  public:
RpcLogDrainThreadWithBuffer(multisink::MultiSink & multisink,RpcLogDrainMap & drain_map)107   RpcLogDrainThreadWithBuffer(multisink::MultiSink& multisink,
108                               RpcLogDrainMap& drain_map)
109       : RpcLogDrainThread(multisink, drain_map, encoding_buffer_array_) {}
110 
111  private:
112   static_assert(kEncodingBufferSizeBytes >=
113                     RpcLogDrain::kLogEntriesEncodeFrameSize +
114                         RpcLogDrain::kMinEntryBufferSize,
115                 "RpcLogDrainThread's encoding buffer must be large enough for "
116                 "at least one entry");
117 
118   std::byte encoding_buffer_array_[kEncodingBufferSizeBytes];
119 };
120 
121 }  // namespace pw::log_rpc
122