1 // 2 // 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 // 17 // 18 19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 21 22 #include <grpc/support/port_platform.h> 23 #include <limits.h> 24 #include <stdint.h> 25 26 #include <iosfwd> 27 #include <string> 28 #include <utility> 29 30 #include "absl/functional/function_ref.h" 31 #include "absl/log/check.h" 32 #include "absl/status/status.h" 33 #include "absl/strings/string_view.h" 34 #include "absl/types/optional.h" 35 #include "src/core/ext/transport/chttp2/transport/http2_settings.h" 36 #include "src/core/lib/debug/trace.h" 37 #include "src/core/lib/resource_quota/memory_quota.h" 38 #include "src/core/lib/transport/bdp_estimator.h" 39 #include "src/core/util/time.h" 40 41 namespace grpc { 42 namespace testing { 43 class TrickledCHTTP2; // to make this a friend 44 } // namespace testing 45 } // namespace grpc 46 47 namespace grpc_core { 48 namespace chttp2 { 49 50 static constexpr uint32_t kDefaultWindow = 65535; 51 static constexpr uint32_t kDefaultFrameSize = 16384; 52 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1); 53 // If smaller than this, advertise zero window. 54 static constexpr uint32_t kMinPositiveInitialWindowSize = 1024; 55 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30); 56 // The maximum per-stream flow control window delta to advertise. 57 static constexpr const int64_t kMaxWindowDelta = (1u << 20); 58 static constexpr const int kDefaultPreferredRxCryptoFrameSize = INT_MAX; 59 60 // TODO(ctiller): clean up when flow_control_fixes is enabled by default 61 static constexpr uint32_t kFrameSize = 1024 * 1024; 62 static constexpr const uint32_t kMinInitialWindowSize = 128; 63 64 class TransportFlowControl; 65 class StreamFlowControl; 66 67 enum class StallEdge { kNoChange, kStalled, kUnstalled }; 68 69 // Encapsulates a collections of actions the transport needs to take with 70 // regard to flow control. Each action comes with urgencies that tell the 71 // transport how quickly the action must take place. 72 class GRPC_MUST_USE_RESULT FlowControlAction { 73 public: 74 enum class Urgency : uint8_t { 75 // Nothing to be done. 76 NO_ACTION_NEEDED = 0, 77 // Initiate a write to update the initial window immediately. 78 UPDATE_IMMEDIATELY, 79 // Push the flow control update into a send buffer, to be sent 80 // out the next time a write is initiated. 81 QUEUE_UPDATE, 82 }; 83 send_stream_update()84 Urgency send_stream_update() const { return send_stream_update_; } send_transport_update()85 Urgency send_transport_update() const { return send_transport_update_; } send_initial_window_update()86 Urgency send_initial_window_update() const { 87 return send_initial_window_update_; 88 } send_max_frame_size_update()89 Urgency send_max_frame_size_update() const { 90 return send_max_frame_size_update_; 91 } preferred_rx_crypto_frame_size_update()92 Urgency preferred_rx_crypto_frame_size_update() const { 93 return preferred_rx_crypto_frame_size_update_; 94 } initial_window_size()95 uint32_t initial_window_size() const { return initial_window_size_; } max_frame_size()96 uint32_t max_frame_size() const { return max_frame_size_; } preferred_rx_crypto_frame_size()97 uint32_t preferred_rx_crypto_frame_size() const { 98 return preferred_rx_crypto_frame_size_; 99 } 100 set_send_stream_update(Urgency u)101 FlowControlAction& set_send_stream_update(Urgency u) { 102 send_stream_update_ = u; 103 return *this; 104 } set_send_transport_update(Urgency u)105 FlowControlAction& set_send_transport_update(Urgency u) { 106 send_transport_update_ = u; 107 return *this; 108 } set_send_initial_window_update(Urgency u,uint32_t update)109 FlowControlAction& set_send_initial_window_update(Urgency u, 110 uint32_t update) { 111 send_initial_window_update_ = u; 112 initial_window_size_ = update; 113 return *this; 114 } set_send_max_frame_size_update(Urgency u,uint32_t update)115 FlowControlAction& set_send_max_frame_size_update(Urgency u, 116 uint32_t update) { 117 send_max_frame_size_update_ = u; 118 max_frame_size_ = update; 119 return *this; 120 } set_preferred_rx_crypto_frame_size_update(Urgency u,uint32_t update)121 FlowControlAction& set_preferred_rx_crypto_frame_size_update( 122 Urgency u, uint32_t update) { 123 preferred_rx_crypto_frame_size_update_ = u; 124 preferred_rx_crypto_frame_size_ = update; 125 return *this; 126 } 127 128 static const char* UrgencyString(Urgency u); 129 std::string DebugString() const; 130 AssertEmpty()131 void AssertEmpty() { CHECK(*this == FlowControlAction()); } 132 133 bool operator==(const FlowControlAction& other) const { 134 return send_stream_update_ == other.send_stream_update_ && 135 send_transport_update_ == other.send_transport_update_ && 136 send_initial_window_update_ == other.send_initial_window_update_ && 137 send_max_frame_size_update_ == other.send_max_frame_size_update_ && 138 (send_initial_window_update_ == Urgency::NO_ACTION_NEEDED || 139 initial_window_size_ == other.initial_window_size_) && 140 (send_max_frame_size_update_ == Urgency::NO_ACTION_NEEDED || 141 max_frame_size_ == other.max_frame_size_) && 142 (preferred_rx_crypto_frame_size_update_ == 143 Urgency::NO_ACTION_NEEDED || 144 preferred_rx_crypto_frame_size_ == 145 other.preferred_rx_crypto_frame_size_); 146 } 147 148 private: 149 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; 150 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; 151 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; 152 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 153 Urgency preferred_rx_crypto_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 154 uint32_t initial_window_size_ = 0; 155 uint32_t max_frame_size_ = 0; 156 uint32_t preferred_rx_crypto_frame_size_ = 0; 157 }; 158 159 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency urgency); 160 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action); 161 162 // Implementation of flow control that abides to HTTP/2 spec and attempts 163 // to be as performant as possible. 164 class TransportFlowControl final { 165 public: 166 explicit TransportFlowControl(absl::string_view name, bool enable_bdp_probe, 167 MemoryOwner* memory_owner); ~TransportFlowControl()168 ~TransportFlowControl() {} 169 bdp_probe()170 bool bdp_probe() const { return enable_bdp_probe_; } 171 172 // returns an announce if we should send a transport update to our peer, 173 // else returns zero; writing_anyway indicates if a write would happen 174 // regardless of the send - if it is false and this function returns non-zero, 175 // this announce will cause a write to occur 176 uint32_t DesiredAnnounceSize(bool writing_anyway) const; 177 // notify that we've actually sent a stream window update 178 // (should be DesiredAnnounceSize()) 179 void SentUpdate(uint32_t announce); 180 181 // Older API: combines getting the DesiredAnnounceSize() with SentUpdate() MaybeSendUpdate(bool writing_anyway)182 uint32_t MaybeSendUpdate(bool writing_anyway) { 183 uint32_t n = DesiredAnnounceSize(writing_anyway); 184 SentUpdate(n); 185 return n; 186 } 187 188 // Track an update to the incoming flow control counters - that is how many 189 // tokens we report to our peer that we're willing to accept. 190 // Instantiators *must* call MakeAction before destruction of this value. 191 class IncomingUpdateContext { 192 public: IncomingUpdateContext(TransportFlowControl * tfc)193 explicit IncomingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {} ~IncomingUpdateContext()194 ~IncomingUpdateContext() { CHECK_EQ(tfc_, nullptr); } 195 196 IncomingUpdateContext(const IncomingUpdateContext&) = delete; 197 IncomingUpdateContext& operator=(const IncomingUpdateContext&) = delete; 198 199 // Reads the flow control data and returns an actionable struct that will 200 // tell chttp2 exactly what it needs to do MakeAction()201 FlowControlAction MakeAction() { 202 return std::exchange(tfc_, nullptr)->UpdateAction(FlowControlAction()); 203 } 204 205 // Notify of data receipt. Returns OkStatus if the data was accepted, 206 // else an error status if the connection should be closed. 207 absl::Status RecvData( 208 int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream = 209 []() { return absl::OkStatus(); }); 210 211 // Update a stream announce window delta, keeping track of how much total 212 // positive delta is present on the transport. UpdateAnnouncedWindowDelta(int64_t * delta,int64_t change)213 void UpdateAnnouncedWindowDelta(int64_t* delta, int64_t change) { 214 if (change == 0) return; 215 if (*delta > 0) { 216 tfc_->announced_stream_total_over_incoming_window_ -= *delta; 217 } 218 *delta += change; 219 if (*delta > 0) { 220 tfc_->announced_stream_total_over_incoming_window_ += *delta; 221 } 222 } 223 224 private: 225 TransportFlowControl* tfc_; 226 }; 227 228 // Track an update to the outgoing flow control counters - that is how many 229 // tokens our peer has said we can send. 230 class OutgoingUpdateContext { 231 public: OutgoingUpdateContext(TransportFlowControl * tfc)232 explicit OutgoingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {} StreamSentData(int64_t size)233 void StreamSentData(int64_t size) { tfc_->remote_window_ -= size; } 234 235 // we have received a WINDOW_UPDATE frame for a transport RecvUpdate(uint32_t size)236 void RecvUpdate(uint32_t size) { tfc_->remote_window_ += size; } 237 238 // Finish the update and check whether we became stalled or unstalled. Finish()239 StallEdge Finish() { 240 bool is_stalled = tfc_->remote_window_ <= 0; 241 if (is_stalled != was_stalled_) { 242 return is_stalled ? StallEdge::kStalled : StallEdge::kUnstalled; 243 } else { 244 return StallEdge::kNoChange; 245 } 246 } 247 248 private: 249 TransportFlowControl* tfc_; 250 const bool was_stalled_ = tfc_->remote_window_ <= 0; 251 }; 252 253 // Call periodically (at a low-ish rate, 100ms - 10s makes sense) 254 // to perform more complex flow control calculations and return an action 255 // to let chttp2 change its parameters 256 FlowControlAction PeriodicUpdate(); 257 258 int64_t target_window() const; target_frame_size()259 int64_t target_frame_size() const { return target_frame_size_; } target_preferred_rx_crypto_frame_size()260 int64_t target_preferred_rx_crypto_frame_size() const { 261 return target_preferred_rx_crypto_frame_size_; 262 } 263 bdp_estimator()264 BdpEstimator* bdp_estimator() { return &bdp_estimator_; } 265 acked_init_window()266 uint32_t acked_init_window() const { return acked_init_window_; } queued_init_window()267 uint32_t queued_init_window() const { return target_initial_window_size_; } sent_init_window()268 uint32_t sent_init_window() const { return sent_init_window_; } 269 FlushedSettings()270 void FlushedSettings() { sent_init_window_ = queued_init_window(); } 271 272 FlowControlAction SetAckedInitialWindow(uint32_t value); 273 274 // Getters remote_window()275 int64_t remote_window() const { return remote_window_; } announced_window()276 int64_t announced_window() const { return announced_window_; } 277 announced_stream_total_over_incoming_window()278 int64_t announced_stream_total_over_incoming_window() const { 279 return announced_stream_total_over_incoming_window_; 280 } 281 RemoveAnnouncedWindowDelta(int64_t delta)282 void RemoveAnnouncedWindowDelta(int64_t delta) { 283 if (delta > 0) { 284 announced_stream_total_over_incoming_window_ -= delta; 285 } 286 } 287 288 // A snapshot of the flow control stats to export. 289 struct Stats { 290 int64_t target_window; 291 int64_t target_frame_size; 292 int64_t target_preferred_rx_crypto_frame_size; 293 uint32_t acked_init_window; 294 uint32_t queued_init_window; 295 uint32_t sent_init_window; 296 int64_t remote_window; 297 int64_t announced_window; 298 int64_t announced_stream_total_over_incoming_window; 299 // BDP estimator stats. 300 int64_t bdp_accumulator; 301 int64_t bdp_estimate; 302 double bdp_bw_est; 303 304 std::string ToString() const; 305 }; 306 stats()307 Stats stats() const { 308 Stats stats; 309 stats.target_window = target_window(); 310 stats.target_frame_size = target_frame_size(); 311 stats.target_preferred_rx_crypto_frame_size = 312 target_preferred_rx_crypto_frame_size(); 313 stats.acked_init_window = acked_init_window(); 314 stats.queued_init_window = queued_init_window(); 315 stats.sent_init_window = sent_init_window(); 316 stats.remote_window = remote_window(); 317 stats.announced_window = announced_window(); 318 stats.announced_stream_total_over_incoming_window = 319 announced_stream_total_over_incoming_window(); 320 stats.bdp_accumulator = bdp_estimator_.accumulator(); 321 stats.bdp_estimate = bdp_estimator_.EstimateBdp(); 322 stats.bdp_bw_est = bdp_estimator_.EstimateBandwidth(); 323 return stats; 324 } 325 326 private: 327 double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const; 328 static void UpdateSetting(absl::string_view name, int64_t* desired_value, 329 uint32_t new_desired_value, 330 FlowControlAction* action, 331 FlowControlAction& (FlowControlAction::*set)( 332 FlowControlAction::Urgency, uint32_t)); 333 334 FlowControlAction UpdateAction(FlowControlAction action); 335 336 MemoryOwner* const memory_owner_; 337 338 /// calculating what we should give for local window: 339 /// we track the total amount of flow control over initial window size 340 /// across all streams: this is data that we want to receive right now (it 341 /// has an outstanding read) 342 /// and the total amount of flow control under initial window size across all 343 /// streams: this is data we've read early 344 /// we want to adjust incoming_window such that: 345 /// incoming_window = total_over - max(bdp - total_under, 0) 346 int64_t announced_stream_total_over_incoming_window_ = 0; 347 348 /// should we probe bdp? 349 const bool enable_bdp_probe_; 350 351 // bdp estimation 352 BdpEstimator bdp_estimator_; 353 354 int64_t remote_window_ = kDefaultWindow; 355 int64_t target_initial_window_size_ = kDefaultWindow; 356 int64_t target_frame_size_ = kDefaultFrameSize; 357 int64_t target_preferred_rx_crypto_frame_size_ = 358 kDefaultPreferredRxCryptoFrameSize; 359 int64_t announced_window_ = kDefaultWindow; 360 uint32_t acked_init_window_ = kDefaultWindow; 361 uint32_t sent_init_window_ = kDefaultWindow; 362 }; 363 364 // Implementation of flow control that abides to HTTP/2 spec and attempts 365 // to be as performant as possible. 366 class StreamFlowControl final { 367 public: 368 explicit StreamFlowControl(TransportFlowControl* tfc); ~StreamFlowControl()369 ~StreamFlowControl() { 370 tfc_->RemoveAnnouncedWindowDelta(announced_window_delta_); 371 } 372 373 // Track an update to the incoming flow control counters - that is how many 374 // tokens we report to our peer that we're willing to accept. 375 // Instantiators *must* call MakeAction before destruction of this value. 376 class IncomingUpdateContext { 377 public: IncomingUpdateContext(StreamFlowControl * sfc)378 explicit IncomingUpdateContext(StreamFlowControl* sfc) 379 : tfc_upd_(sfc->tfc_), sfc_(sfc) {} 380 MakeAction()381 FlowControlAction MakeAction() { 382 return sfc_->UpdateAction(tfc_upd_.MakeAction()); 383 } 384 385 // we have received data from the wire 386 absl::Status RecvData(int64_t incoming_frame_size); 387 388 // the application is asking for a certain amount of bytes SetMinProgressSize(int64_t min_progress_size)389 void SetMinProgressSize(int64_t min_progress_size) { 390 sfc_->min_progress_size_ = min_progress_size; 391 } 392 393 void SetPendingSize(int64_t pending_size); 394 395 private: 396 TransportFlowControl::IncomingUpdateContext tfc_upd_; 397 StreamFlowControl* const sfc_; 398 }; 399 400 // Track an update to the outgoing flow control counters - that is how many 401 // tokens our peer has said we can send. 402 class OutgoingUpdateContext { 403 public: OutgoingUpdateContext(StreamFlowControl * sfc)404 explicit OutgoingUpdateContext(StreamFlowControl* sfc) 405 : tfc_upd_(sfc->tfc_), sfc_(sfc) {} 406 // we have received a WINDOW_UPDATE frame for a stream RecvUpdate(uint32_t size)407 void RecvUpdate(uint32_t size) { sfc_->remote_window_delta_ += size; } 408 // we have sent data on the wire, we must track this in our bookkeeping for 409 // the remote peer's flow control. SentData(int64_t outgoing_frame_size)410 void SentData(int64_t outgoing_frame_size) { 411 tfc_upd_.StreamSentData(outgoing_frame_size); 412 sfc_->remote_window_delta_ -= outgoing_frame_size; 413 } 414 415 private: 416 TransportFlowControl::OutgoingUpdateContext tfc_upd_; 417 StreamFlowControl* const sfc_; 418 }; 419 420 // returns an announce if we should send a stream update to our peer, else 421 // returns zero 422 uint32_t DesiredAnnounceSize() const; 423 // notify that we've actually sent a stream window update 424 // (should be DesiredAnnounceSize()) 425 void SentUpdate(uint32_t announce); 426 427 // Older API: combines getting the DesiredAnnounceSize() with SentUpdate() MaybeSendUpdate()428 uint32_t MaybeSendUpdate() { 429 uint32_t n = DesiredAnnounceSize(); 430 SentUpdate(n); 431 return n; 432 } 433 remote_window_delta()434 int64_t remote_window_delta() const { return remote_window_delta_; } announced_window_delta()435 int64_t announced_window_delta() const { return announced_window_delta_; } min_progress_size()436 int64_t min_progress_size() const { return min_progress_size_; } 437 438 // A snapshot of the flow control stats to export. 439 struct Stats { 440 int64_t min_progress_size; 441 int64_t remote_window_delta; 442 int64_t announced_window_delta; 443 absl::optional<int64_t> pending_size; 444 445 std::string ToString() const; 446 }; 447 stats()448 Stats stats() const { 449 Stats stats; 450 stats.min_progress_size = min_progress_size(); 451 stats.remote_window_delta = remote_window_delta(); 452 stats.announced_window_delta = announced_window_delta(); 453 stats.pending_size = pending_size_; 454 return stats; 455 } 456 457 private: 458 TransportFlowControl* const tfc_; 459 int64_t min_progress_size_ = 0; 460 int64_t remote_window_delta_ = 0; 461 int64_t announced_window_delta_ = 0; 462 absl::optional<int64_t> pending_size_; 463 464 FlowControlAction UpdateAction(FlowControlAction action); 465 }; 466 467 class TestOnlyTransportTargetWindowEstimatesMocker { 468 public: ~TestOnlyTransportTargetWindowEstimatesMocker()469 virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {} 470 virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( 471 double current_target) = 0; 472 }; 473 474 extern TestOnlyTransportTargetWindowEstimatesMocker* 475 g_test_only_transport_target_window_estimates_mocker; 476 477 } // namespace chttp2 478 } // namespace grpc_core 479 480 #endif // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 481