1 /* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <stdint.h> 25 26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h" 27 #include "src/core/lib/gpr/useful.h" 28 #include "src/core/lib/gprpp/abstract.h" 29 #include "src/core/lib/gprpp/manual_constructor.h" 30 #include "src/core/lib/transport/bdp_estimator.h" 31 #include "src/core/lib/transport/pid_controller.h" 32 33 struct grpc_chttp2_transport; 34 struct grpc_chttp2_stream; 35 36 extern grpc_core::TraceFlag grpc_flowctl_trace; 37 38 namespace grpc { 39 namespace testing { 40 class TrickledCHTTP2; // to make this a friend 41 } // namespace testing 42 } // namespace grpc 43 44 namespace grpc_core { 45 namespace chttp2 { 46 47 static constexpr uint32_t kDefaultWindow = 65535; 48 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1); 49 // TODO(ncteisen): Tune this 50 static constexpr uint32_t kFrameSize = 1024 * 1024; 51 52 class TransportFlowControl; 53 class StreamFlowControl; 54 55 // Encapsulates a collections of actions the transport needs to take with 56 // regard to flow control. Each action comes with urgencies that tell the 57 // transport how quickly the action must take place. 58 class FlowControlAction { 59 public: 60 enum class Urgency : uint8_t { 61 // Nothing to be done. 62 NO_ACTION_NEEDED = 0, 63 // Initiate a write to update the initial window immediately. 64 UPDATE_IMMEDIATELY, 65 // Push the flow control update into a send buffer, to be sent 66 // out the next time a write is initiated. 67 QUEUE_UPDATE, 68 }; 69 send_stream_update()70 Urgency send_stream_update() const { return send_stream_update_; } send_transport_update()71 Urgency send_transport_update() const { return send_transport_update_; } send_initial_window_update()72 Urgency send_initial_window_update() const { 73 return send_initial_window_update_; 74 } send_max_frame_size_update()75 Urgency send_max_frame_size_update() const { 76 return send_max_frame_size_update_; 77 } initial_window_size()78 uint32_t initial_window_size() const { return initial_window_size_; } max_frame_size()79 uint32_t max_frame_size() const { return max_frame_size_; } 80 set_send_stream_update(Urgency u)81 FlowControlAction& set_send_stream_update(Urgency u) { 82 send_stream_update_ = u; 83 return *this; 84 } set_send_transport_update(Urgency u)85 FlowControlAction& set_send_transport_update(Urgency u) { 86 send_transport_update_ = u; 87 return *this; 88 } set_send_initial_window_update(Urgency u,uint32_t update)89 FlowControlAction& set_send_initial_window_update(Urgency u, 90 uint32_t update) { 91 send_initial_window_update_ = u; 92 initial_window_size_ = update; 93 return *this; 94 } set_send_max_frame_size_update(Urgency u,uint32_t update)95 FlowControlAction& set_send_max_frame_size_update(Urgency u, 96 uint32_t update) { 97 send_max_frame_size_update_ = u; 98 max_frame_size_ = update; 99 return *this; 100 } 101 102 static const char* UrgencyString(Urgency u); 103 void Trace(grpc_chttp2_transport* t) const; 104 105 private: 106 Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED; 107 Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED; 108 Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED; 109 Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED; 110 uint32_t initial_window_size_ = 0; 111 uint32_t max_frame_size_ = 0; 112 }; 113 114 class FlowControlTrace { 115 public: FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)116 FlowControlTrace(const char* reason, TransportFlowControl* tfc, 117 StreamFlowControl* sfc) { 118 if (enabled_) Init(reason, tfc, sfc); 119 } 120 ~FlowControlTrace()121 ~FlowControlTrace() { 122 if (enabled_) Finish(); 123 } 124 125 private: 126 void Init(const char* reason, TransportFlowControl* tfc, 127 StreamFlowControl* sfc); 128 void Finish(); 129 130 const bool enabled_ = grpc_flowctl_trace.enabled(); 131 132 TransportFlowControl* tfc_; 133 StreamFlowControl* sfc_; 134 const char* reason_; 135 int64_t remote_window_; 136 int64_t target_window_; 137 int64_t announced_window_; 138 int64_t remote_window_delta_; 139 int64_t local_window_delta_; 140 int64_t announced_window_delta_; 141 }; 142 143 // Fat interface with all methods a flow control implementation needs to 144 // support. gRPC C Core does not support pure virtual functions, so instead 145 // we abort in any methods which require implementation in the base class. 146 class TransportFlowControlBase { 147 public: TransportFlowControlBase()148 TransportFlowControlBase() {} ~TransportFlowControlBase()149 virtual ~TransportFlowControlBase() {} 150 151 // Is flow control enabled? This is needed in other codepaths like the checks 152 // in parsing and in writing. flow_control_enabled()153 virtual bool flow_control_enabled() const { abort(); } 154 155 // Called to check if the transport needs to send a WINDOW_UPDATE frame MaybeSendUpdate(bool writing_anyway)156 virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); } 157 158 // Using the protected members, returns and Action to be taken by the 159 // tranport. MakeAction()160 virtual FlowControlAction MakeAction() { abort(); } 161 162 // Using the protected members, returns and Action to be taken by the 163 // tranport. Also checks for updates to our BDP estimate and acts 164 // accordingly. PeriodicUpdate()165 virtual FlowControlAction PeriodicUpdate() { abort(); } 166 167 // Called to do bookkeeping when a stream owned by this transport sends 168 // data on the wire StreamSentData(int64_t size)169 virtual void StreamSentData(int64_t size) { abort(); } 170 171 // Called to do bookkeeping when a stream owned by this transport receives 172 // data from the wire. Also does error checking for frame size. RecvData(int64_t incoming_frame_size)173 virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } 174 175 // Called to do bookkeeping when we receive a WINDOW_UPDATE frame. RecvUpdate(uint32_t size)176 virtual void RecvUpdate(uint32_t size) { abort(); } 177 178 // Returns the BdpEstimator held by this object. Caller is responsible for 179 // checking for nullptr. TODO(ncteisen): consider fully encapsulating all 180 // bdp estimator actions inside TransportFlowControl bdp_estimator()181 virtual BdpEstimator* bdp_estimator() { return nullptr; } 182 183 // Getters remote_window()184 int64_t remote_window() const { return remote_window_; } target_window()185 virtual int64_t target_window() const { return target_initial_window_size_; } announced_window()186 int64_t announced_window() const { return announced_window_; } 187 188 // Used in certain benchmarks in which we don't want FlowControl to be a 189 // factor TestOnlyForceHugeWindow()190 virtual void TestOnlyForceHugeWindow() {} 191 192 GRPC_ABSTRACT_BASE_CLASS 193 194 protected: 195 friend class ::grpc::testing::TrickledCHTTP2; 196 int64_t remote_window_ = kDefaultWindow; 197 int64_t target_initial_window_size_ = kDefaultWindow; 198 int64_t announced_window_ = kDefaultWindow; 199 }; 200 201 // Implementation of flow control that does NOTHING. Always returns maximum 202 // values, never initiates writes, and assumes that the remote peer is doing 203 // the same. To be used to narrow down on flow control as the cause of negative 204 // performance. 205 class TransportFlowControlDisabled final : public TransportFlowControlBase { 206 public: 207 // Maxes out all values 208 TransportFlowControlDisabled(grpc_chttp2_transport* t); 209 flow_control_enabled()210 bool flow_control_enabled() const override { return false; } 211 212 // Never do anything. MaybeSendUpdate(bool writing_anyway)213 uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; } MakeAction()214 FlowControlAction MakeAction() override { return FlowControlAction(); } PeriodicUpdate()215 FlowControlAction PeriodicUpdate() override { return FlowControlAction(); } StreamSentData(int64_t size)216 void StreamSentData(int64_t size) override {} RecvData(int64_t incoming_frame_size)217 grpc_error* RecvData(int64_t incoming_frame_size) override { 218 return GRPC_ERROR_NONE; 219 } RecvUpdate(uint32_t size)220 void RecvUpdate(uint32_t size) override {} 221 }; 222 223 // Implementation of flow control that abides to HTTP/2 spec and attempts 224 // to be as performant as possible. 225 class TransportFlowControl final : public TransportFlowControlBase { 226 public: 227 TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe); ~TransportFlowControl()228 ~TransportFlowControl() {} 229 flow_control_enabled()230 bool flow_control_enabled() const override { return true; } 231 bdp_probe()232 bool bdp_probe() const { return enable_bdp_probe_; } 233 234 // returns an announce if we should send a transport update to our peer, 235 // else returns zero; writing_anyway indicates if a write would happen 236 // regardless of the send - if it is false and this function returns non-zero, 237 // this announce will cause a write to occur 238 uint32_t MaybeSendUpdate(bool writing_anyway) override; 239 240 // Reads the flow control data and returns and actionable struct that will 241 // tell chttp2 exactly what it needs to do MakeAction()242 FlowControlAction MakeAction() override { 243 return UpdateAction(FlowControlAction()); 244 } 245 246 // Call periodically (at a low-ish rate, 100ms - 10s makes sense) 247 // to perform more complex flow control calculations and return an action 248 // to let chttp2 change its parameters 249 FlowControlAction PeriodicUpdate() override; 250 StreamSentData(int64_t size)251 void StreamSentData(int64_t size) override { remote_window_ -= size; } 252 253 grpc_error* ValidateRecvData(int64_t incoming_frame_size); CommitRecvData(int64_t incoming_frame_size)254 void CommitRecvData(int64_t incoming_frame_size) { 255 announced_window_ -= incoming_frame_size; 256 } 257 RecvData(int64_t incoming_frame_size)258 grpc_error* RecvData(int64_t incoming_frame_size) override { 259 FlowControlTrace trace(" data recv", this, nullptr); 260 grpc_error* error = ValidateRecvData(incoming_frame_size); 261 if (error != GRPC_ERROR_NONE) return error; 262 CommitRecvData(incoming_frame_size); 263 return GRPC_ERROR_NONE; 264 } 265 266 // we have received a WINDOW_UPDATE frame for a transport RecvUpdate(uint32_t size)267 void RecvUpdate(uint32_t size) override { 268 FlowControlTrace trace("t updt recv", this, nullptr); 269 remote_window_ += size; 270 } 271 272 // See comment above announced_stream_total_over_incoming_window_ for the 273 // logic behind this decision. target_window()274 int64_t target_window() const override { 275 return static_cast<uint32_t> GPR_MIN( 276 (int64_t)((1u << 31) - 1), 277 announced_stream_total_over_incoming_window_ + 278 target_initial_window_size_); 279 } 280 transport()281 const grpc_chttp2_transport* transport() const { return t_; } 282 PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)283 void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 284 if (delta > 0) { 285 announced_stream_total_over_incoming_window_ -= delta; 286 } else { 287 announced_stream_total_under_incoming_window_ += -delta; 288 } 289 } 290 PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)291 void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) { 292 if (delta > 0) { 293 announced_stream_total_over_incoming_window_ += delta; 294 } else { 295 announced_stream_total_under_incoming_window_ -= -delta; 296 } 297 } 298 bdp_estimator()299 BdpEstimator* bdp_estimator() override { return &bdp_estimator_; } 300 TestOnlyForceHugeWindow()301 void TestOnlyForceHugeWindow() override { 302 announced_window_ = 1024 * 1024 * 1024; 303 remote_window_ = 1024 * 1024 * 1024; 304 } 305 306 private: 307 double TargetLogBdp(); 308 double SmoothLogBdp(double value); 309 FlowControlAction::Urgency DeltaUrgency(int64_t value, 310 grpc_chttp2_setting_id setting_id); 311 UpdateAction(FlowControlAction action)312 FlowControlAction UpdateAction(FlowControlAction action) { 313 if (announced_window_ < target_window() / 2) { 314 action.set_send_transport_update( 315 FlowControlAction::Urgency::UPDATE_IMMEDIATELY); 316 } 317 return action; 318 } 319 320 const grpc_chttp2_transport* const t_; 321 322 /** calculating what we should give for local window: 323 we track the total amount of flow control over initial window size 324 across all streams: this is data that we want to receive right now (it 325 has an outstanding read) 326 and the total amount of flow control under initial window size across all 327 streams: this is data we've read early 328 we want to adjust incoming_window such that: 329 incoming_window = total_over - max(bdp - total_under, 0) */ 330 int64_t announced_stream_total_over_incoming_window_ = 0; 331 int64_t announced_stream_total_under_incoming_window_ = 0; 332 333 /** should we probe bdp? */ 334 const bool enable_bdp_probe_; 335 336 /* bdp estimation */ 337 grpc_core::BdpEstimator bdp_estimator_; 338 339 /* pid controller */ 340 grpc_core::PidController pid_controller_; 341 grpc_millis last_pid_update_ = 0; 342 }; 343 344 // Fat interface with all methods a stream flow control implementation needs 345 // to support. gRPC C Core does not support pure virtual functions, so instead 346 // we abort in any methods which require implementation in the base class. 347 class StreamFlowControlBase { 348 public: StreamFlowControlBase()349 StreamFlowControlBase() {} ~StreamFlowControlBase()350 virtual ~StreamFlowControlBase() {} 351 352 // Updates an action using the protected members. UpdateAction(FlowControlAction action)353 virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); } 354 355 // Using the protected members, returns an Action for this stream to be 356 // taken by the tranport. MakeAction()357 virtual FlowControlAction MakeAction() { abort(); } 358 359 // Bookkeeping for when data is sent on this stream. SentData(int64_t outgoing_frame_size)360 virtual void SentData(int64_t outgoing_frame_size) { abort(); } 361 362 // Bookkeeping and error checking for when data is received by this stream. RecvData(int64_t incoming_frame_size)363 virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); } 364 365 // Called to check if this stream needs to send a WINDOW_UPDATE frame. MaybeSendUpdate()366 virtual uint32_t MaybeSendUpdate() { abort(); } 367 368 // Bookkeeping for receiving a WINDOW_UPDATE from for this stream. RecvUpdate(uint32_t size)369 virtual void RecvUpdate(uint32_t size) { abort(); } 370 371 // Bookkeeping for when a call pulls bytes out of the transport. At this 372 // point we consider the data 'used' and can thus let out peer know we are 373 // ready for more data. IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)374 virtual void IncomingByteStreamUpdate(size_t max_size_hint, 375 size_t have_already) { 376 abort(); 377 } 378 379 // Used in certain benchmarks in which we don't want FlowControl to be a 380 // factor TestOnlyForceHugeWindow()381 virtual void TestOnlyForceHugeWindow() {} 382 383 // Getters remote_window_delta()384 int64_t remote_window_delta() { return remote_window_delta_; } local_window_delta()385 int64_t local_window_delta() { return local_window_delta_; } announced_window_delta()386 int64_t announced_window_delta() { return announced_window_delta_; } 387 388 GRPC_ABSTRACT_BASE_CLASS 389 390 protected: 391 friend class ::grpc::testing::TrickledCHTTP2; 392 int64_t remote_window_delta_ = 0; 393 int64_t local_window_delta_ = 0; 394 int64_t announced_window_delta_ = 0; 395 }; 396 397 // Implementation of flow control that does NOTHING. Always returns maximum 398 // values, never initiates writes, and assumes that the remote peer is doing 399 // the same. To be used to narrow down on flow control as the cause of negative 400 // performance. 401 class StreamFlowControlDisabled : public StreamFlowControlBase { 402 public: UpdateAction(FlowControlAction action)403 FlowControlAction UpdateAction(FlowControlAction action) override { 404 return action; 405 } MakeAction()406 FlowControlAction MakeAction() override { return FlowControlAction(); } SentData(int64_t outgoing_frame_size)407 void SentData(int64_t outgoing_frame_size) override {} RecvData(int64_t incoming_frame_size)408 grpc_error* RecvData(int64_t incoming_frame_size) override { 409 return GRPC_ERROR_NONE; 410 } MaybeSendUpdate()411 uint32_t MaybeSendUpdate() override { return 0; } RecvUpdate(uint32_t size)412 void RecvUpdate(uint32_t size) override {} IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)413 void IncomingByteStreamUpdate(size_t max_size_hint, 414 size_t have_already) override {} 415 }; 416 417 // Implementation of flow control that abides to HTTP/2 spec and attempts 418 // to be as performant as possible. 419 class StreamFlowControl final : public StreamFlowControlBase { 420 public: 421 StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s); ~StreamFlowControl()422 ~StreamFlowControl() { 423 tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 424 } 425 426 FlowControlAction UpdateAction(FlowControlAction action) override; MakeAction()427 FlowControlAction MakeAction() override { 428 return UpdateAction(tfc_->MakeAction()); 429 } 430 431 // we have sent data on the wire, we must track this in our bookkeeping for 432 // the remote peer's flow control. SentData(int64_t outgoing_frame_size)433 void SentData(int64_t outgoing_frame_size) override { 434 FlowControlTrace tracer(" data sent", tfc_, this); 435 tfc_->StreamSentData(outgoing_frame_size); 436 remote_window_delta_ -= outgoing_frame_size; 437 } 438 439 // we have received data from the wire 440 grpc_error* RecvData(int64_t incoming_frame_size) override; 441 442 // returns an announce if we should send a stream update to our peer, else 443 // returns zero 444 uint32_t MaybeSendUpdate() override; 445 446 // we have received a WINDOW_UPDATE frame for a stream RecvUpdate(uint32_t size)447 void RecvUpdate(uint32_t size) override { 448 FlowControlTrace trace("s updt recv", tfc_, this); 449 remote_window_delta_ += size; 450 } 451 452 // the application is asking for a certain amount of bytes 453 void IncomingByteStreamUpdate(size_t max_size_hint, 454 size_t have_already) override; 455 remote_window_delta()456 int64_t remote_window_delta() const { return remote_window_delta_; } local_window_delta()457 int64_t local_window_delta() const { return local_window_delta_; } announced_window_delta()458 int64_t announced_window_delta() const { return announced_window_delta_; } 459 stream()460 const grpc_chttp2_stream* stream() const { return s_; } 461 TestOnlyForceHugeWindow()462 void TestOnlyForceHugeWindow() override { 463 announced_window_delta_ = 1024 * 1024 * 1024; 464 local_window_delta_ = 1024 * 1024 * 1024; 465 remote_window_delta_ = 1024 * 1024 * 1024; 466 } 467 468 private: 469 TransportFlowControl* const tfc_; 470 const grpc_chttp2_stream* const s_; 471 UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)472 void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) { 473 tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 474 announced_window_delta_ += change; 475 tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_); 476 } 477 }; 478 479 } // namespace chttp2 480 } // namespace grpc_core 481 482 #endif 483