• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #pragma once
16 #include <lib/fit/function.h>
17 #include <lib/stdcompat/type_traits.h>
18 #include <pw_async/dispatcher.h>
19 
20 #include <functional>
21 #include <limits>
22 #include <queue>
23 #include <tuple>
24 #include <unordered_map>
25 #include <utility>
26 #include <variant>
27 
28 #include "pw_bluetooth_sapphire/internal/host/common/assert.h"
29 #include "pw_bluetooth_sapphire/internal/host/common/retire_log.h"
30 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h"
31 
32 namespace bt {
33 
34 // In-process data pipeline monitoring service. This issues tokens that
35 // accompany data packets through various buffers. When tokens are destroyed,
36 // their lifetimes are recorded.
37 //
38 // Clients may subscribe to alerts for when various values exceed specified
39 // thresholds.
40 //
41 // TODO(fxbug.dev/42150683): Produce pollable statistics about retired tokens
42 // TODO(fxbug.dev/42150684): Timestamp stages of each token
43 // TODO(fxbug.dev/42150684): Provide mechanism to split/merge tokens through
44 // chunking/de-chunking
45 class PipelineMonitor final {
46  private:
47   using TokenId = uint64_t;
48 
49  public:
50   // Each Token is created when the monitor "issues" it, at which point it is
51   // "in-flight" until the token is "retired" (either explicitly with |Retire()|
52   // or by destruction). Tokens model a unique moveable resource. Moved-from
53   // Token objects are valid and can take on newly issued tokens from the same
54   // PipelineMonitor instance.
55   //
56   // Tokens can also be created by splitting an existing token in order to model
57   // data chunking through e.g. segmentation or fragmentation. Splitting tokens
58   // will result in more retirements (and hence logged retirements) than issues.
59   //
60   // Tokens that outlive their issuing PipelineMonitor have no effect when
61   // retired or destroyed.
62   class Token {
63    public:
Token(Token && other)64     Token(Token&& other) noexcept : parent_(other.parent_) {
65       *this = std::move(other);
66     }
67 
68     Token& operator=(Token&& other) noexcept {
69       BT_ASSERT(&parent_.get() == &other.parent_.get());
70       id_ = std::exchange(other.id_, kInvalidTokenId);
71       return *this;
72     }
73 
74     Token() = delete;
75     Token(const Token&) = delete;
76     Token& operator=(const Token&) = delete;
77 
~Token()78     ~Token() { Retire(); }
79 
80     // Explicitly retire to its monitor. This has no effect if the token has
81     // already been retired.
Retire()82     void Retire() {
83       if (!parent_.is_alive()) {
84         return;
85       }
86       if (id_ != kInvalidTokenId) {
87         parent_->Retire(this);
88       }
89       id_ = kInvalidTokenId;
90     }
91 
92     // If this token is valid, subtract |bytes_to_take| from its bookkeeping and
93     // track it under a new token. This does count towards token issue threshold
94     // alerts and will result in one more retirement logged. |bytes_to_take|
95     // must be no greater than the bytes issued for this token. If
96     // |bytes_to_take| is exactly equal, this is effectively a token move.
Split(size_t bytes_to_take)97     Token Split(size_t bytes_to_take) {
98       if (!parent_.is_alive()) {
99         return Token(parent_, kInvalidTokenId);
100       }
101       BT_ASSERT(id_ != kInvalidTokenId);
102       return parent_->Split(this, bytes_to_take);
103     }
104 
105    private:
106     friend class PipelineMonitor;
Token(WeakSelf<PipelineMonitor>::WeakPtr parent,TokenId id)107     Token(WeakSelf<PipelineMonitor>::WeakPtr parent, TokenId id)
108         : parent_(std::move(parent)), id_(id) {}
109 
110     const WeakSelf<PipelineMonitor>::WeakPtr parent_;
111     TokenId id_ = kInvalidTokenId;
112   };
113 
114   // Alert types used for |SetAlert|. These are used as dimensioned value
115   // wrappers whose types identify what kind of value they hold.
116 
117   // Alert for max_bytes_in_flight. Fires upon issuing the first token that
118   // exceeds the threshold.
119   struct MaxBytesInFlightAlert {
120     size_t value;
121   };
122 
123   // Alert for max_bytes_in_flight. Fires upon issuing the first token that
124   // exceeds the threshold.
125   struct MaxTokensInFlightAlert {
126     int64_t value;
127   };
128 
129   // Alert for token age (duration from issue to retirement). Fires upon
130   // retiring the first token that exceeds the threshold.
131   struct MaxAgeRetiredAlert {
132     pw::chrono::SystemClock::duration value;
133   };
134 
135   // Create a data chunk-tracking service that uses |pw_dispatcher| for timing.
136   // |retire_log| is copied into the class in order to store statistics about
137   // retired tokens. Note that the "live" internal log is readable with the
138   // |retire_log()| method, not the original instance passed to the ctor (which
139   // will not be logged into).
PipelineMonitor(pw::async::Dispatcher & pw_dispatcher,const internal::RetireLog & retire_log)140   explicit PipelineMonitor(pw::async::Dispatcher& pw_dispatcher,
141                            const internal::RetireLog& retire_log)
142       : dispatcher_(pw_dispatcher), retire_log_(retire_log) {}
143 
bytes_issued()144   [[nodiscard]] size_t bytes_issued() const { return bytes_issued_; }
tokens_issued()145   [[nodiscard]] int64_t tokens_issued() const { return tokens_issued_; }
bytes_in_flight()146   [[nodiscard]] size_t bytes_in_flight() const { return bytes_in_flight_; }
tokens_in_flight()147   [[nodiscard]] int64_t tokens_in_flight() const {
148     BT_ASSERT(issued_tokens_.size() <= std::numeric_limits<int64_t>::max());
149     return issued_tokens_.size();
150   }
bytes_retired()151   [[nodiscard]] size_t bytes_retired() const {
152     BT_ASSERT(bytes_issued() >= bytes_in_flight());
153     return bytes_issued() - bytes_in_flight();
154   }
tokens_retired()155   [[nodiscard]] int64_t tokens_retired() const {
156     return tokens_issued() - tokens_in_flight();
157   }
158 
retire_log()159   const internal::RetireLog& retire_log() const { return retire_log_; }
160 
161   // Start tracking |byte_count| bytes of data. This issues a token that is now
162   // considered "in-flight" until it is retired.
Issue(size_t byte_count)163   [[nodiscard]] Token Issue(size_t byte_count) {
164     // For consistency, complete all token map and counter modifications before
165     // processing alerts.
166     const auto id = MakeTokenId();
167     issued_tokens_.insert_or_assign(id,
168                                     TokenInfo{dispatcher_.now(), byte_count});
169     bytes_issued_ += byte_count;
170     tokens_issued_++;
171     bytes_in_flight_ += byte_count;
172 
173     // Process alerts.
174     SignalAlertValue<MaxBytesInFlightAlert>(bytes_in_flight());
175     SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight());
176     return Token(weak_self_.GetWeakPtr(), id);
177   }
178 
179   // Moves bytes tracked from one issued token to a new token, up to all of the
180   // bytes in |token|.
Split(Token * token,size_t bytes_to_take)181   [[nodiscard]] Token Split(Token* token, size_t bytes_to_take) {
182     // For consistency, complete all token map and counter modifications before
183     // processing alerts.
184     BT_ASSERT(this == &token->parent_.get());
185     auto iter = issued_tokens_.find(token->id_);
186     BT_ASSERT(iter != issued_tokens_.end());
187     TokenInfo& token_info = iter->second;
188     BT_ASSERT(bytes_to_take <= token_info.byte_count);
189     if (token_info.byte_count == bytes_to_take) {
190       return std::move(*token);
191     }
192 
193     token_info.byte_count -= bytes_to_take;
194 
195     const TokenId id = MakeTokenId();
196     issued_tokens_.insert_or_assign(
197         id, TokenInfo{token_info.issue_time, bytes_to_take});
198     tokens_issued_++;
199 
200     // Process alerts.
201     SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight());
202     return Token(weak_self_.GetWeakPtr(), id);
203   }
204 
205   // Subscribes to an alert that fires when the watched value strictly exceeds
206   // |threshold|. When that happens, |listener| is called with the alert type
207   // containing the actual value and the alert trigger is removed.
208   //
209   // New alerts will not be signaled until the next event that can change the
210   // value (token issued, retired, etc), so |listener| can re-subscribe (but
211   // likely at a different threshold to avoid a tight loop of re-subscriptions).
212   //
213   // For example,
214   //   monitor.SetAlert(MaxBytesInFlightAlert{kMaxBytes},
215   //                    [](MaxBytesInFlightAlert value) { /* value.value =
216   //                    bytes_in_flight() */ });
217   template <typename AlertType>
SetAlert(AlertType threshold,fit::callback<void (decltype (threshold))> listener)218   void SetAlert(AlertType threshold,
219                 fit::callback<void(decltype(threshold))> listener) {
220     GetAlertList<AlertType>().push(
221         AlertInfo<AlertType>{threshold, std::move(listener)});
222   }
223 
224   // Convenience function to set a single listener for all of |alerts|, called
225   // if any of the alerts defined by |thresholds| are triggered.
226   template <typename... AlertTypes>
SetAlerts(fit::function<void (std::variant<cpp20::type_identity_t<AlertTypes>...>)> listener,AlertTypes...thresholds)227   void SetAlerts(
228       fit::function<void(std::variant<cpp20::type_identity_t<AlertTypes>...>)>
229           listener,
230       AlertTypes... thresholds) {
231     // This is a fold expression over the comma (,) operator with SetAlert.
232     (SetAlert<AlertTypes>(thresholds, listener.share()), ...);
233   }
234 
235  private:
236   // Tracks information for each Token issued out-of-line so that Tokens can be
237   // kept small.
238   struct TokenInfo {
239     pw::chrono::SystemClock::time_point issue_time =
240         pw::chrono::SystemClock::time_point::max();
241     size_t byte_count = 0;
242   };
243 
244   // Records alerts and their subscribers. Removed when fired.
245   template <typename AlertType>
246   struct AlertInfo {
247     AlertType threshold;
248     fit::callback<void(AlertType)> listener;
249 
250     // Used by std::priority_queue through std::less. The logic is intentionally
251     // inverted so that the AlertInfo with the smallest threshold appears first.
252     bool operator<(const AlertInfo<AlertType>& other) const {
253       return this->threshold.value > other.threshold.value;
254     }
255   };
256 
257   // Store subscribers so that the earliest and most likely to fire threshold is
258   // highest priority to make testing values against thresholds constant time
259   // and fast.
260   template <typename T>
261   using AlertList = std::priority_queue<AlertInfo<T>>;
262 
263   template <typename... AlertTypes>
264   using AlertRegistry = std::tuple<AlertList<AlertTypes>...>;
265 
266   // Used in Token to represent an inactive Token object (one that does not
267   // represent an in-flight token in the monitor).
268   static constexpr TokenId kInvalidTokenId =
269       std::numeric_limits<TokenId>::max();
270 
271   template <typename AlertType>
GetAlertList()272   AlertList<AlertType>& GetAlertList() {
273     return std::get<AlertList<AlertType>>(alert_registry_);
274   }
275 
276   // Signal a change in the value watched by |AlertType| and test it against the
277   // subscribed alert thresholds. Any thresholds strict exceeded (with
278   // std::greater) cause its subscribed listener to be removed and invoked.
279   template <typename AlertType>
SignalAlertValue(decltype (AlertType::value)value)280   void SignalAlertValue(decltype(AlertType::value) value) {
281     auto& alert_list = GetAlertList<AlertType>();
282     std::vector<decltype(AlertInfo<AlertType>::listener)> listeners;
283     while (!alert_list.empty()) {
284       auto& top = alert_list.top();
285       if (!std::greater()(value, top.threshold.value)) {
286         break;
287       }
288 
289       // std::priority_queue intentionally has const access to top() in order to
290       // avoid breaking heap constraints. This cast to remove const and modify
291       // top respects that design intent because (1) it doesn't modify element
292       // order (2) the intent is to pop the top anyways. It is important to call
293       // |listener| after pop in case that call re-subscribes to this call
294       // (which could modify the heap top).
295       listeners.push_back(
296           std::move(const_cast<AlertInfo<AlertType>&>(top).listener));
297       alert_list.pop();
298     }
299 
300     // Deferring the call to after filtering helps prevent infinite alert loops.
301     for (auto& listener : listeners) {
302       listener(AlertType{value});
303     }
304   }
305 
MakeTokenId()306   TokenId MakeTokenId() {
307     return std::exchange(next_token_id_,
308                          (next_token_id_ + 1) % kInvalidTokenId);
309   }
310 
Retire(Token * token)311   void Retire(Token* token) {
312     // For consistency, complete all token map and counter modifications before
313     // processing alerts.
314     BT_ASSERT(this == &token->parent_.get());
315     auto node = issued_tokens_.extract(token->id_);
316     BT_ASSERT(bool{node});
317     const TokenInfo& token_info = node.mapped();
318     bytes_in_flight_ -= token_info.byte_count;
319     const pw::chrono::SystemClock::duration age =
320         dispatcher_.now() - token_info.issue_time;
321     retire_log_.Retire(token_info.byte_count, age);
322 
323     // Process alerts.
324     SignalAlertValue<MaxAgeRetiredAlert>(age);
325   }
326 
327   pw::async::Dispatcher& dispatcher_;
328 
329   internal::RetireLog retire_log_;
330 
331   // This is likely not the best choice for memory efficiency and insertion
332   // latency (allocation and rehashing are both concerning). A slotmap is likely
333   // a good choice here with some tweaks to Token invalidation and an eye for
334   // implementation (SG14 slot_map may have O(N) insertion).
335   std::unordered_map<TokenId, TokenInfo> issued_tokens_;
336 
337   TokenId next_token_id_ = 0;
338   size_t bytes_issued_ = 0;
339   int64_t tokens_issued_ = 0;
340   size_t bytes_in_flight_ = 0;
341 
342   // Use a single variable to store all of the alert subscribers. This can be
343   // split by type using std::get (see GetAlertList).
344   AlertRegistry<MaxBytesInFlightAlert,
345                 MaxTokensInFlightAlert,
346                 MaxAgeRetiredAlert>
347       alert_registry_;
348 
349   WeakSelf<PipelineMonitor> weak_self_{this};
350 };
351 
352 }  // namespace bt
353