1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 #include <lib/fit/function.h> 17 #include <lib/stdcompat/type_traits.h> 18 #include <pw_async/dispatcher.h> 19 20 #include <functional> 21 #include <limits> 22 #include <queue> 23 #include <tuple> 24 #include <unordered_map> 25 #include <utility> 26 #include <variant> 27 28 #include "pw_bluetooth_sapphire/internal/host/common/assert.h" 29 #include "pw_bluetooth_sapphire/internal/host/common/retire_log.h" 30 #include "pw_bluetooth_sapphire/internal/host/common/weak_self.h" 31 32 namespace bt { 33 34 // In-process data pipeline monitoring service. This issues tokens that 35 // accompany data packets through various buffers. When tokens are destroyed, 36 // their lifetimes are recorded. 37 // 38 // Clients may subscribe to alerts for when various values exceed specified 39 // thresholds. 40 // 41 // TODO(fxbug.dev/42150683): Produce pollable statistics about retired tokens 42 // TODO(fxbug.dev/42150684): Timestamp stages of each token 43 // TODO(fxbug.dev/42150684): Provide mechanism to split/merge tokens through 44 // chunking/de-chunking 45 class PipelineMonitor final { 46 private: 47 using TokenId = uint64_t; 48 49 public: 50 // Each Token is created when the monitor "issues" it, at which point it is 51 // "in-flight" until the token is "retired" (either explicitly with |Retire()| 52 // or by destruction). Tokens model a unique moveable resource. Moved-from 53 // Token objects are valid and can take on newly issued tokens from the same 54 // PipelineMonitor instance. 55 // 56 // Tokens can also be created by splitting an existing token in order to model 57 // data chunking through e.g. segmentation or fragmentation. Splitting tokens 58 // will result in more retirements (and hence logged retirements) than issues. 59 // 60 // Tokens that outlive their issuing PipelineMonitor have no effect when 61 // retired or destroyed. 62 class Token { 63 public: Token(Token && other)64 Token(Token&& other) noexcept : parent_(other.parent_) { 65 *this = std::move(other); 66 } 67 68 Token& operator=(Token&& other) noexcept { 69 BT_ASSERT(&parent_.get() == &other.parent_.get()); 70 id_ = std::exchange(other.id_, kInvalidTokenId); 71 return *this; 72 } 73 74 Token() = delete; 75 Token(const Token&) = delete; 76 Token& operator=(const Token&) = delete; 77 ~Token()78 ~Token() { Retire(); } 79 80 // Explicitly retire to its monitor. This has no effect if the token has 81 // already been retired. Retire()82 void Retire() { 83 if (!parent_.is_alive()) { 84 return; 85 } 86 if (id_ != kInvalidTokenId) { 87 parent_->Retire(this); 88 } 89 id_ = kInvalidTokenId; 90 } 91 92 // If this token is valid, subtract |bytes_to_take| from its bookkeeping and 93 // track it under a new token. This does count towards token issue threshold 94 // alerts and will result in one more retirement logged. |bytes_to_take| 95 // must be no greater than the bytes issued for this token. If 96 // |bytes_to_take| is exactly equal, this is effectively a token move. Split(size_t bytes_to_take)97 Token Split(size_t bytes_to_take) { 98 if (!parent_.is_alive()) { 99 return Token(parent_, kInvalidTokenId); 100 } 101 BT_ASSERT(id_ != kInvalidTokenId); 102 return parent_->Split(this, bytes_to_take); 103 } 104 105 private: 106 friend class PipelineMonitor; Token(WeakSelf<PipelineMonitor>::WeakPtr parent,TokenId id)107 Token(WeakSelf<PipelineMonitor>::WeakPtr parent, TokenId id) 108 : parent_(std::move(parent)), id_(id) {} 109 110 const WeakSelf<PipelineMonitor>::WeakPtr parent_; 111 TokenId id_ = kInvalidTokenId; 112 }; 113 114 // Alert types used for |SetAlert|. These are used as dimensioned value 115 // wrappers whose types identify what kind of value they hold. 116 117 // Alert for max_bytes_in_flight. Fires upon issuing the first token that 118 // exceeds the threshold. 119 struct MaxBytesInFlightAlert { 120 size_t value; 121 }; 122 123 // Alert for max_bytes_in_flight. Fires upon issuing the first token that 124 // exceeds the threshold. 125 struct MaxTokensInFlightAlert { 126 int64_t value; 127 }; 128 129 // Alert for token age (duration from issue to retirement). Fires upon 130 // retiring the first token that exceeds the threshold. 131 struct MaxAgeRetiredAlert { 132 pw::chrono::SystemClock::duration value; 133 }; 134 135 // Create a data chunk-tracking service that uses |pw_dispatcher| for timing. 136 // |retire_log| is copied into the class in order to store statistics about 137 // retired tokens. Note that the "live" internal log is readable with the 138 // |retire_log()| method, not the original instance passed to the ctor (which 139 // will not be logged into). PipelineMonitor(pw::async::Dispatcher & pw_dispatcher,const internal::RetireLog & retire_log)140 explicit PipelineMonitor(pw::async::Dispatcher& pw_dispatcher, 141 const internal::RetireLog& retire_log) 142 : dispatcher_(pw_dispatcher), retire_log_(retire_log) {} 143 bytes_issued()144 [[nodiscard]] size_t bytes_issued() const { return bytes_issued_; } tokens_issued()145 [[nodiscard]] int64_t tokens_issued() const { return tokens_issued_; } bytes_in_flight()146 [[nodiscard]] size_t bytes_in_flight() const { return bytes_in_flight_; } tokens_in_flight()147 [[nodiscard]] int64_t tokens_in_flight() const { 148 BT_ASSERT(issued_tokens_.size() <= std::numeric_limits<int64_t>::max()); 149 return issued_tokens_.size(); 150 } bytes_retired()151 [[nodiscard]] size_t bytes_retired() const { 152 BT_ASSERT(bytes_issued() >= bytes_in_flight()); 153 return bytes_issued() - bytes_in_flight(); 154 } tokens_retired()155 [[nodiscard]] int64_t tokens_retired() const { 156 return tokens_issued() - tokens_in_flight(); 157 } 158 retire_log()159 const internal::RetireLog& retire_log() const { return retire_log_; } 160 161 // Start tracking |byte_count| bytes of data. This issues a token that is now 162 // considered "in-flight" until it is retired. Issue(size_t byte_count)163 [[nodiscard]] Token Issue(size_t byte_count) { 164 // For consistency, complete all token map and counter modifications before 165 // processing alerts. 166 const auto id = MakeTokenId(); 167 issued_tokens_.insert_or_assign(id, 168 TokenInfo{dispatcher_.now(), byte_count}); 169 bytes_issued_ += byte_count; 170 tokens_issued_++; 171 bytes_in_flight_ += byte_count; 172 173 // Process alerts. 174 SignalAlertValue<MaxBytesInFlightAlert>(bytes_in_flight()); 175 SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight()); 176 return Token(weak_self_.GetWeakPtr(), id); 177 } 178 179 // Moves bytes tracked from one issued token to a new token, up to all of the 180 // bytes in |token|. Split(Token * token,size_t bytes_to_take)181 [[nodiscard]] Token Split(Token* token, size_t bytes_to_take) { 182 // For consistency, complete all token map and counter modifications before 183 // processing alerts. 184 BT_ASSERT(this == &token->parent_.get()); 185 auto iter = issued_tokens_.find(token->id_); 186 BT_ASSERT(iter != issued_tokens_.end()); 187 TokenInfo& token_info = iter->second; 188 BT_ASSERT(bytes_to_take <= token_info.byte_count); 189 if (token_info.byte_count == bytes_to_take) { 190 return std::move(*token); 191 } 192 193 token_info.byte_count -= bytes_to_take; 194 195 const TokenId id = MakeTokenId(); 196 issued_tokens_.insert_or_assign( 197 id, TokenInfo{token_info.issue_time, bytes_to_take}); 198 tokens_issued_++; 199 200 // Process alerts. 201 SignalAlertValue<MaxTokensInFlightAlert>(tokens_in_flight()); 202 return Token(weak_self_.GetWeakPtr(), id); 203 } 204 205 // Subscribes to an alert that fires when the watched value strictly exceeds 206 // |threshold|. When that happens, |listener| is called with the alert type 207 // containing the actual value and the alert trigger is removed. 208 // 209 // New alerts will not be signaled until the next event that can change the 210 // value (token issued, retired, etc), so |listener| can re-subscribe (but 211 // likely at a different threshold to avoid a tight loop of re-subscriptions). 212 // 213 // For example, 214 // monitor.SetAlert(MaxBytesInFlightAlert{kMaxBytes}, 215 // [](MaxBytesInFlightAlert value) { /* value.value = 216 // bytes_in_flight() */ }); 217 template <typename AlertType> SetAlert(AlertType threshold,fit::callback<void (decltype (threshold))> listener)218 void SetAlert(AlertType threshold, 219 fit::callback<void(decltype(threshold))> listener) { 220 GetAlertList<AlertType>().push( 221 AlertInfo<AlertType>{threshold, std::move(listener)}); 222 } 223 224 // Convenience function to set a single listener for all of |alerts|, called 225 // if any of the alerts defined by |thresholds| are triggered. 226 template <typename... AlertTypes> SetAlerts(fit::function<void (std::variant<cpp20::type_identity_t<AlertTypes>...>)> listener,AlertTypes...thresholds)227 void SetAlerts( 228 fit::function<void(std::variant<cpp20::type_identity_t<AlertTypes>...>)> 229 listener, 230 AlertTypes... thresholds) { 231 // This is a fold expression over the comma (,) operator with SetAlert. 232 (SetAlert<AlertTypes>(thresholds, listener.share()), ...); 233 } 234 235 private: 236 // Tracks information for each Token issued out-of-line so that Tokens can be 237 // kept small. 238 struct TokenInfo { 239 pw::chrono::SystemClock::time_point issue_time = 240 pw::chrono::SystemClock::time_point::max(); 241 size_t byte_count = 0; 242 }; 243 244 // Records alerts and their subscribers. Removed when fired. 245 template <typename AlertType> 246 struct AlertInfo { 247 AlertType threshold; 248 fit::callback<void(AlertType)> listener; 249 250 // Used by std::priority_queue through std::less. The logic is intentionally 251 // inverted so that the AlertInfo with the smallest threshold appears first. 252 bool operator<(const AlertInfo<AlertType>& other) const { 253 return this->threshold.value > other.threshold.value; 254 } 255 }; 256 257 // Store subscribers so that the earliest and most likely to fire threshold is 258 // highest priority to make testing values against thresholds constant time 259 // and fast. 260 template <typename T> 261 using AlertList = std::priority_queue<AlertInfo<T>>; 262 263 template <typename... AlertTypes> 264 using AlertRegistry = std::tuple<AlertList<AlertTypes>...>; 265 266 // Used in Token to represent an inactive Token object (one that does not 267 // represent an in-flight token in the monitor). 268 static constexpr TokenId kInvalidTokenId = 269 std::numeric_limits<TokenId>::max(); 270 271 template <typename AlertType> GetAlertList()272 AlertList<AlertType>& GetAlertList() { 273 return std::get<AlertList<AlertType>>(alert_registry_); 274 } 275 276 // Signal a change in the value watched by |AlertType| and test it against the 277 // subscribed alert thresholds. Any thresholds strict exceeded (with 278 // std::greater) cause its subscribed listener to be removed and invoked. 279 template <typename AlertType> SignalAlertValue(decltype (AlertType::value)value)280 void SignalAlertValue(decltype(AlertType::value) value) { 281 auto& alert_list = GetAlertList<AlertType>(); 282 std::vector<decltype(AlertInfo<AlertType>::listener)> listeners; 283 while (!alert_list.empty()) { 284 auto& top = alert_list.top(); 285 if (!std::greater()(value, top.threshold.value)) { 286 break; 287 } 288 289 // std::priority_queue intentionally has const access to top() in order to 290 // avoid breaking heap constraints. This cast to remove const and modify 291 // top respects that design intent because (1) it doesn't modify element 292 // order (2) the intent is to pop the top anyways. It is important to call 293 // |listener| after pop in case that call re-subscribes to this call 294 // (which could modify the heap top). 295 listeners.push_back( 296 std::move(const_cast<AlertInfo<AlertType>&>(top).listener)); 297 alert_list.pop(); 298 } 299 300 // Deferring the call to after filtering helps prevent infinite alert loops. 301 for (auto& listener : listeners) { 302 listener(AlertType{value}); 303 } 304 } 305 MakeTokenId()306 TokenId MakeTokenId() { 307 return std::exchange(next_token_id_, 308 (next_token_id_ + 1) % kInvalidTokenId); 309 } 310 Retire(Token * token)311 void Retire(Token* token) { 312 // For consistency, complete all token map and counter modifications before 313 // processing alerts. 314 BT_ASSERT(this == &token->parent_.get()); 315 auto node = issued_tokens_.extract(token->id_); 316 BT_ASSERT(bool{node}); 317 const TokenInfo& token_info = node.mapped(); 318 bytes_in_flight_ -= token_info.byte_count; 319 const pw::chrono::SystemClock::duration age = 320 dispatcher_.now() - token_info.issue_time; 321 retire_log_.Retire(token_info.byte_count, age); 322 323 // Process alerts. 324 SignalAlertValue<MaxAgeRetiredAlert>(age); 325 } 326 327 pw::async::Dispatcher& dispatcher_; 328 329 internal::RetireLog retire_log_; 330 331 // This is likely not the best choice for memory efficiency and insertion 332 // latency (allocation and rehashing are both concerning). A slotmap is likely 333 // a good choice here with some tweaks to Token invalidation and an eye for 334 // implementation (SG14 slot_map may have O(N) insertion). 335 std::unordered_map<TokenId, TokenInfo> issued_tokens_; 336 337 TokenId next_token_id_ = 0; 338 size_t bytes_issued_ = 0; 339 int64_t tokens_issued_ = 0; 340 size_t bytes_in_flight_ = 0; 341 342 // Use a single variable to store all of the alert subscribers. This can be 343 // split by type using std::get (see GetAlertList). 344 AlertRegistry<MaxBytesInFlightAlert, 345 MaxTokensInFlightAlert, 346 MaxAgeRetiredAlert> 347 alert_registry_; 348 349 WeakSelf<PipelineMonitor> weak_self_{this}; 350 }; 351 352 } // namespace bt 353