1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stdint.h> 25 26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h" 27 #include "src/core/lib/gpr/useful.h" 28 #include "src/core/lib/gprpp/manual_constructor.h" 29 #include "src/core/lib/transport/bdp_estimator.h" 30 #include "src/core/lib/transport/pid_controller.h" 31 32 struct grpc_chttp2_transport; 33 struct grpc_chttp2_stream; 34 35 extern grpc_core::TraceFlag grpc_flowctl_trace; 36 37 namespace grpc { 38 namespace testing { 39 class TrickledCHTTP2; // to make this a friend 40 } // namespace testing 41 } // namespace grpc 42 43 namespace grpc_core { 44 namespace chttp2 { 45 46 static constexpr uint32_t kDefaultWindow = 65535; 47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1); 48 // TODO(ncteisen): Tune this 49 static constexpr uint32_t kFrameSize = 1024 * 1024; 50 51 class TransportFlowControl; 52 class StreamFlowControl; 53 54 // Encapsulates a collections of actions the transport needs to take with 55 // regard to flow control. Each action comes with urgencies that tell the 56 // transport how quickly the action must take place. 57 class FlowControlAction { 58 public: 59 enum class Urgency : uint8_t { 60 // Nothing to be done. 61 NO_ACTION_NEEDED = 0, 62 // Initiate a write to update the initial window immediately. 63 UPDATE_IMMEDIATELY, 64 // Push the flow control update into a send buffer, to be sent 65 // out the next time a write is initiated. 66 QUEUE_UPDATE, 67 }; 68 send_stream_update()69 Urgency send_stream_update() const { return send_stream_update_; } send_transport_update()70 Urgency send_transport_update() const { return send_transport_update_; } send_initial_window_update()71 Urgency send_initial_window_update() const { 72 return send_initial_window_update_; 73 } send_max_frame_size_update()74 Urgency send_max_frame_size_update() const { 75 return send_max_frame_size_update_; 76 } initial_window_size()77 uint32_t initial_window_size() const { return initial_window_size_; } max_frame_size()78 uint32_t max_frame_size() const { return max_frame_size_; } 79 set_send_stream_update(Urgency u)80 FlowControlAction& set_send_stream_update(Urgency u) { 81 send_stream_update_ = u; 82 return *this; 83 } set_send_transport_update(Urgency u)84 FlowControlAction& set_send_transport_update(Urgency u) { 85 send_transport_update_ = u; 86 return *this; 87 } set_send_initial_window_update(Urgency u,uint32_t update)88 FlowControlAction& set_send_initial_window_update(Urgency u, 89 uint32_t update) { 90 send_initial_window_update_ = u; 91 initial_window_size_ = update; 92 return *this; 93 } set_send_max_frame_size_update(Urgency u,uint32_t update)94 FlowControlAction& set_send_max_frame_size_update(Urgency u, 95 uint32_t update) { 96 send_max_frame_size_update_ = u; 97 max_frame_size_ = update; 98 return *this; 99 } 100 101 static const char* UrgencyString(Urgency u); 102 void Trace(grpc_chttp2_transport* t) const; 103 104 private: 105 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; 106 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; 107 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; 108 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 109 uint32_t initial_window_size_ = 0; 110 uint32_t max_frame_size_ = 0; 111 }; 112 113 class FlowControlTrace { 114 public: FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)115 FlowControlTrace(const char* reason, TransportFlowControl* tfc, 116 StreamFlowControl* sfc) { 117 if (enabled_) Init(reason, tfc, sfc); 118 } 119 ~FlowControlTrace()120 ~FlowControlTrace() { 121 if (enabled_) Finish(); 122 } 123 124 private: 125 void Init(const char* reason, TransportFlowControl* tfc, 126 StreamFlowControl* sfc); 127 void Finish(); 128 129 const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace); 130 131 TransportFlowControl* tfc_; 132 StreamFlowControl* sfc_; 133 const char* reason_; 134 int64_t remote_window_; 135 int64_t target_window_; 136 int64_t announced_window_; 137 int64_t remote_window_delta_; 138 int64_t local_window_delta_; 139 int64_t announced_window_delta_; 140 }; 141 142 // Fat interface with all methods a flow control implementation needs to 143 // support. 144 class TransportFlowControlBase { 145 public: TransportFlowControlBase()146 TransportFlowControlBase() {} ~TransportFlowControlBase()147 virtual ~TransportFlowControlBase() {} 148 149 // Is flow control enabled? This is needed in other codepaths like the checks 150 // in parsing and in writing. 151 virtual bool flow_control_enabled() const = 0; 152 153 // Called to check if the transport needs to send a WINDOW_UPDATE frame 154 virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0; 155 156 // Using the protected members, returns and Action to be taken by the 157 // tranport. 158 virtual FlowControlAction MakeAction() = 0; 159 160 // Using the protected members, returns and Action to be taken by the 161 // tranport. Also checks for updates to our BDP estimate and acts 162 // accordingly. 163 virtual FlowControlAction PeriodicUpdate() = 0; 164 165 // Called to do bookkeeping when a stream owned by this transport sends 166 // data on the wire 167 virtual void StreamSentData(int64_t /* size */) = 0; 168 169 // Called to do bookkeeping when a stream owned by this transport receives 170 // data from the wire. Also does error checking for frame size. 171 virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0; 172 173 // Called to do bookkeeping when we receive a WINDOW_UPDATE frame. 174 virtual void RecvUpdate(uint32_t /* size */) = 0; 175 176 // Returns the BdpEstimator held by this object. Caller is responsible for 177 // checking for nullptr. TODO(ncteisen): consider fully encapsulating all 178 // bdp estimator actions inside TransportFlowControl bdp_estimator()179 virtual BdpEstimator* bdp_estimator() { return nullptr; } 180 181 // Getters remote_window()182 int64_t remote_window() const { return remote_window_; } target_window()183 virtual int64_t target_window() const { return target_initial_window_size_; } announced_window()184 int64_t announced_window() const { return announced_window_; } 185 186 // Used in certain benchmarks in which we don't want FlowControl to be a 187 // factor TestOnlyForceHugeWindow()188 virtual void TestOnlyForceHugeWindow() {} 189 190 protected: 191 friend class ::grpc::testing::TrickledCHTTP2; 192 int64_t remote_window_ = kDefaultWindow; 193 int64_t target_initial_window_size_ = kDefaultWindow; 194 int64_t announced_window_ = kDefaultWindow; 195 }; 196 197 // Implementation of flow control that does NOTHING. Always returns maximum 198 // values, never initiates writes, and assumes that the remote peer is doing 199 // the same. To be used to narrow down on flow control as the cause of negative 200 // performance. 201 class TransportFlowControlDisabled final : public TransportFlowControlBase { 202 public: 203 // Maxes out all values 204 explicit TransportFlowControlDisabled(grpc_chttp2_transport* t); 205 flow_control_enabled()206 bool flow_control_enabled() const override { return false; } 207 208 // Never do anything. MaybeSendUpdate(bool)209 uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; } MakeAction()210 FlowControlAction MakeAction() override { return FlowControlAction(); } PeriodicUpdate()211 FlowControlAction PeriodicUpdate() override { return FlowControlAction(); } StreamSentData(int64_t)212 void StreamSentData(int64_t /* size */) override {} RecvData(int64_t)213 grpc_error* RecvData(int64_t /* incoming_frame_size */) override { 214 return GRPC_ERROR_NONE; 215 } RecvUpdate(uint32_t)216 void RecvUpdate(uint32_t /* size */) override {} 217 }; 218 219 // Implementation of flow control that abides to HTTP/2 spec and attempts 220 // to be as performant as possible. 221 class TransportFlowControl final : public TransportFlowControlBase { 222 public: 223 TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe); ~TransportFlowControl()224 ~TransportFlowControl() override {} 225 flow_control_enabled()226 bool flow_control_enabled() const override { return true; } 227 bdp_probe()228 bool bdp_probe() const { return enable_bdp_probe_; } 229 230 // returns an announce if we should send a transport update to our peer, 231 // else returns zero; writing_anyway indicates if a write would happen 232 // regardless of the send - if it is false and this function returns non-zero, 233 // this announce will cause a write to occur 234 uint32_t MaybeSendUpdate(bool writing_anyway) override; 235 236 // Reads the flow control data and returns and actionable struct that will 237 // tell chttp2 exactly what it needs to do MakeAction()238 FlowControlAction MakeAction() override { 239 return UpdateAction(FlowControlAction()); 240 } 241 242 // Call periodically (at a low-ish rate, 100ms - 10s makes sense) 243 // to perform more complex flow control calculations and return an action 244 // to let chttp2 change its parameters 245 FlowControlAction PeriodicUpdate() override; 246 StreamSentData(int64_t size)247 void StreamSentData(int64_t size) override { remote_window_ -= size; } 248 249 grpc_error* ValidateRecvData(int64_t incoming_frame_size); CommitRecvData(int64_t incoming_frame_size)250 void CommitRecvData(int64_t incoming_frame_size) { 251 announced_window_ -= incoming_frame_size; 252 } 253 RecvData(int64_t incoming_frame_size)254 grpc_error* RecvData(int64_t incoming_frame_size) override { 255 FlowControlTrace trace(" data recv", this, nullptr); 256 grpc_error* error = ValidateRecvData(incoming_frame_size); 257 if (error != GRPC_ERROR_NONE) return error; 258 CommitRecvData(incoming_frame_size); 259 return GRPC_ERROR_NONE; 260 } 261 262 // we have received a WINDOW_UPDATE frame for a transport RecvUpdate(uint32_t size)263 void RecvUpdate(uint32_t size) override { 264 FlowControlTrace trace("t updt recv", this, nullptr); 265 remote_window_ += size; 266 } 267 268 // See comment above announced_stream_total_over_incoming_window_ for the 269 // logic behind this decision. target_window()270 int64_t target_window() const override { 271 return static_cast<uint32_t> GPR_MIN( 272 (int64_t)((1u << 31) - 1), 273 announced_stream_total_over_incoming_window_ + 274 target_initial_window_size_); 275 } 276 transport()277 const grpc_chttp2_transport* transport() const { return t_; } 278 PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)279 void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 280 if (delta > 0) { 281 announced_stream_total_over_incoming_window_ -= delta; 282 } 283 } 284 PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)285 void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 286 if (delta > 0) { 287 announced_stream_total_over_incoming_window_ += delta; 288 } 289 } 290 bdp_estimator()291 BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } 292 TestOnlyForceHugeWindow()293 void TestOnlyForceHugeWindow() override { 294 announced_window_ = 1024 * 1024 * 1024; 295 remote_window_ = 1024 * 1024 * 1024; 296 } 297 298 private: 299 double TargetLogBdp(); 300 double SmoothLogBdp(double value); 301 FlowControlAction::Urgency DeltaUrgency(int64_t value, 302 grpc_chttp2_setting_id setting_id); 303 UpdateAction(FlowControlAction action)304 FlowControlAction UpdateAction(FlowControlAction action) { 305 if (announced_window_ < target_window() / 2) { 306 action.set_send_transport_update( 307 FlowControlAction::Urgency::UPDATE_IMMEDIATELY); 308 } 309 return action; 310 } 311 312 const grpc_chttp2_transport* const t_; 313 314 /** calculating what we should give for local window: 315 we track the total amount of flow control over initial window size 316 across all streams: this is data that we want to receive right now (it 317 has an outstanding read) 318 and the total amount of flow control under initial window size across all 319 streams: this is data we've read early 320 we want to adjust incoming_window such that: 321 incoming_window = total_over - max(bdp - total_under, 0) */ 322 int64_t announced_stream_total_over_incoming_window_ = 0; 323 324 /** should we probe bdp? */ 325 const bool enable_bdp_probe_; 326 327 /* bdp estimation */ 328 grpc_core::BdpEstimator bdp_estimator_; 329 330 /* pid controller */ 331 grpc_core::PidController pid_controller_; 332 grpc_millis last_pid_update_ = 0; 333 }; 334 335 // Fat interface with all methods a stream flow control implementation needs 336 // to support. 337 class StreamFlowControlBase { 338 public: StreamFlowControlBase()339 StreamFlowControlBase() {} ~StreamFlowControlBase()340 virtual ~StreamFlowControlBase() {} 341 342 // Updates an action using the protected members. UpdateAction(FlowControlAction)343 virtual FlowControlAction UpdateAction(FlowControlAction /* action */) { 344 abort(); 345 } 346 347 // Using the protected members, returns an Action for this stream to be 348 // taken by the tranport. 349 virtual FlowControlAction MakeAction() = 0; 350 351 // Bookkeeping for when data is sent on this stream. 352 virtual void SentData(int64_t /* outgoing_frame_size */) = 0; 353 354 // Bookkeeping and error checking for when data is received by this stream. 355 virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0; 356 357 // Called to check if this stream needs to send a WINDOW_UPDATE frame. 358 virtual uint32_t MaybeSendUpdate() = 0; 359 360 // Bookkeeping for receiving a WINDOW_UPDATE from for this stream. 361 virtual void RecvUpdate(uint32_t /* size */) = 0; 362 363 // Bookkeeping for when a call pulls bytes out of the transport. At this 364 // point we consider the data 'used' and can thus let out peer know we are 365 // ready for more data. IncomingByteStreamUpdate(size_t,size_t)366 virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */, 367 size_t /* have_already */) { 368 abort(); 369 } 370 371 // Used in certain benchmarks in which we don't want FlowControl to be a 372 // factor TestOnlyForceHugeWindow()373 virtual void TestOnlyForceHugeWindow() {} 374 375 // Getters remote_window_delta()376 int64_t remote_window_delta() { return remote_window_delta_; } local_window_delta()377 int64_t local_window_delta() { return local_window_delta_; } announced_window_delta()378 int64_t announced_window_delta() { return announced_window_delta_; } 379 380 protected: 381 friend class ::grpc::testing::TrickledCHTTP2; 382 int64_t remote_window_delta_ = 0; 383 int64_t local_window_delta_ = 0; 384 int64_t announced_window_delta_ = 0; 385 }; 386 387 // Implementation of flow control that does NOTHING. Always returns maximum 388 // values, never initiates writes, and assumes that the remote peer is doing 389 // the same. To be used to narrow down on flow control as the cause of negative 390 // performance. 391 class StreamFlowControlDisabled : public StreamFlowControlBase { 392 public: UpdateAction(FlowControlAction action)393 FlowControlAction UpdateAction(FlowControlAction action) override { 394 return action; 395 } MakeAction()396 FlowControlAction MakeAction() override { return FlowControlAction(); } SentData(int64_t)397 void SentData(int64_t /* outgoing_frame_size */) override {} RecvData(int64_t)398 grpc_error* RecvData(int64_t /* incoming_frame_size */) override { 399 return GRPC_ERROR_NONE; 400 } MaybeSendUpdate()401 uint32_t MaybeSendUpdate() override { return 0; } RecvUpdate(uint32_t)402 void RecvUpdate(uint32_t /* size */) override {} IncomingByteStreamUpdate(size_t,size_t)403 void IncomingByteStreamUpdate(size_t /* max_size_hint */, 404 size_t /* have_already */) override {} 405 }; 406 407 // Implementation of flow control that abides to HTTP/2 spec and attempts 408 // to be as performant as possible. 409 class StreamFlowControl final : public StreamFlowControlBase { 410 public: 411 StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); ~StreamFlowControl()412 ~StreamFlowControl() override { 413 tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 414 } 415 416 FlowControlAction UpdateAction(FlowControlAction action) override; MakeAction()417 FlowControlAction MakeAction() override { 418 return UpdateAction(tfc_->MakeAction()); 419 } 420 421 // we have sent data on the wire, we must track this in our bookkeeping for 422 // the remote peer's flow control. SentData(int64_t outgoing_frame_size)423 void SentData(int64_t outgoing_frame_size) override { 424 FlowControlTrace tracer(" data sent", tfc_, this); 425 tfc_->StreamSentData(outgoing_frame_size); 426 remote_window_delta_ -= outgoing_frame_size; 427 } 428 429 // we have received data from the wire 430 grpc_error* RecvData(int64_t incoming_frame_size) override; 431 432 // returns an announce if we should send a stream update to our peer, else 433 // returns zero 434 uint32_t MaybeSendUpdate() override; 435 436 // we have received a WINDOW_UPDATE frame for a stream RecvUpdate(uint32_t size)437 void RecvUpdate(uint32_t size) override { 438 FlowControlTrace trace("s updt recv", tfc_, this); 439 remote_window_delta_ += size; 440 } 441 442 // the application is asking for a certain amount of bytes 443 void IncomingByteStreamUpdate(size_t max_size_hint, 444 size_t have_already) override; 445 remote_window_delta()446 int64_t remote_window_delta() const { return remote_window_delta_; } local_window_delta()447 int64_t local_window_delta() const { return local_window_delta_; } announced_window_delta()448 int64_t announced_window_delta() const { return announced_window_delta_; } 449 stream()450 const grpc_chttp2_stream* stream() const { return s_; } 451 TestOnlyForceHugeWindow()452 void TestOnlyForceHugeWindow() override { 453 announced_window_delta_ = 1024 * 1024 * 1024; 454 local_window_delta_ = 1024 * 1024 * 1024; 455 remote_window_delta_ = 1024 * 1024 * 1024; 456 } 457 458 private: 459 TransportFlowControl* const tfc_; 460 const grpc_chttp2_stream* const s_; 461 UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)462 void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) { 463 tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 464 announced_window_delta_ += change; 465 tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 466 } 467 }; 468 469 class TestOnlyTransportTargetWindowEstimatesMocker { 470 public: ~TestOnlyTransportTargetWindowEstimatesMocker()471 virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {} 472 virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate( 473 double current_target) = 0; 474 }; 475 476 extern TestOnlyTransportTargetWindowEstimatesMocker* 477 g_test_only_transport_target_window_estimates_mocker; 478 479 } // namespace chttp2 480 } // namespace grpc_core 481 482 #endif 483