• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stdint.h>
25 
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/abstract.h"
29 #include "src/core/lib/gprpp/manual_constructor.h"
30 #include "src/core/lib/transport/bdp_estimator.h"
31 #include "src/core/lib/transport/pid_controller.h"
32 
33 struct grpc_chttp2_transport;
34 struct grpc_chttp2_stream;
35 
36 extern grpc_core::TraceFlag grpc_flowctl_trace;
37 
38 namespace grpc {
39 namespace testing {
40 class TrickledCHTTP2;  // to make this a friend
41 }  // namespace testing
42 }  // namespace grpc
43 
44 namespace grpc_core {
45 namespace chttp2 {
46 
47 static constexpr uint32_t kDefaultWindow = 65535;
48 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
49 // TODO(ncteisen): Tune this
50 static constexpr uint32_t kFrameSize = 1024 * 1024;
51 
52 class TransportFlowControl;
53 class StreamFlowControl;
54 
55 // Encapsulates a collections of actions the transport needs to take with
56 // regard to flow control. Each action comes with urgencies that tell the
57 // transport how quickly the action must take place.
58 class FlowControlAction {
59  public:
60   enum class Urgency : uint8_t {
61     // Nothing to be done.
62     NO_ACTION_NEEDED = 0,
63     // Initiate a write to update the initial window immediately.
64     UPDATE_IMMEDIATELY,
65     // Push the flow control update into a send buffer, to be sent
66     // out the next time a write is initiated.
67     QUEUE_UPDATE,
68   };
69 
send_stream_update()70   Urgency send_stream_update() const { return send_stream_update_; }
send_transport_update()71   Urgency send_transport_update() const { return send_transport_update_; }
send_initial_window_update()72   Urgency send_initial_window_update() const {
73     return send_initial_window_update_;
74   }
send_max_frame_size_update()75   Urgency send_max_frame_size_update() const {
76     return send_max_frame_size_update_;
77   }
initial_window_size()78   uint32_t initial_window_size() const { return initial_window_size_; }
max_frame_size()79   uint32_t max_frame_size() const { return max_frame_size_; }
80 
set_send_stream_update(Urgency u)81   FlowControlAction& set_send_stream_update(Urgency u) {
82     send_stream_update_ = u;
83     return *this;
84   }
set_send_transport_update(Urgency u)85   FlowControlAction& set_send_transport_update(Urgency u) {
86     send_transport_update_ = u;
87     return *this;
88   }
set_send_initial_window_update(Urgency u,uint32_t update)89   FlowControlAction& set_send_initial_window_update(Urgency u,
90                                                     uint32_t update) {
91     send_initial_window_update_ = u;
92     initial_window_size_ = update;
93     return *this;
94   }
set_send_max_frame_size_update(Urgency u,uint32_t update)95   FlowControlAction& set_send_max_frame_size_update(Urgency u,
96                                                     uint32_t update) {
97     send_max_frame_size_update_ = u;
98     max_frame_size_ = update;
99     return *this;
100   }
101 
102   static const char* UrgencyString(Urgency u);
103   void Trace(grpc_chttp2_transport* t) const;
104 
105  private:
106   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
107   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
108   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
109   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
110   uint32_t initial_window_size_ = 0;
111   uint32_t max_frame_size_ = 0;
112 };
113 
114 class FlowControlTrace {
115  public:
FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)116   FlowControlTrace(const char* reason, TransportFlowControl* tfc,
117                    StreamFlowControl* sfc) {
118     if (enabled_) Init(reason, tfc, sfc);
119   }
120 
~FlowControlTrace()121   ~FlowControlTrace() {
122     if (enabled_) Finish();
123   }
124 
125  private:
126   void Init(const char* reason, TransportFlowControl* tfc,
127             StreamFlowControl* sfc);
128   void Finish();
129 
130   const bool enabled_ = grpc_flowctl_trace.enabled();
131 
132   TransportFlowControl* tfc_;
133   StreamFlowControl* sfc_;
134   const char* reason_;
135   int64_t remote_window_;
136   int64_t target_window_;
137   int64_t announced_window_;
138   int64_t remote_window_delta_;
139   int64_t local_window_delta_;
140   int64_t announced_window_delta_;
141 };
142 
143 // Fat interface with all methods a flow control implementation needs to
144 // support. gRPC C Core does not support pure virtual functions, so instead
145 // we abort in any methods which require implementation in the base class.
146 class TransportFlowControlBase {
147  public:
TransportFlowControlBase()148   TransportFlowControlBase() {}
~TransportFlowControlBase()149   virtual ~TransportFlowControlBase() {}
150 
151   // Is flow control enabled? This is needed in other codepaths like the checks
152   // in parsing and in writing.
flow_control_enabled()153   virtual bool flow_control_enabled() const { abort(); }
154 
155   // Called to check if the transport needs to send a WINDOW_UPDATE frame
MaybeSendUpdate(bool writing_anyway)156   virtual uint32_t MaybeSendUpdate(bool writing_anyway) { abort(); }
157 
158   // Using the protected members, returns and Action to be taken by the
159   // tranport.
MakeAction()160   virtual FlowControlAction MakeAction() { abort(); }
161 
162   // Using the protected members, returns and Action to be taken by the
163   // tranport. Also checks for updates to our BDP estimate and acts
164   // accordingly.
PeriodicUpdate()165   virtual FlowControlAction PeriodicUpdate() { abort(); }
166 
167   // Called to do bookkeeping when a stream owned by this transport sends
168   // data on the wire
StreamSentData(int64_t size)169   virtual void StreamSentData(int64_t size) { abort(); }
170 
171   // Called to do bookkeeping when a stream owned by this transport receives
172   // data from the wire. Also does error checking for frame size.
RecvData(int64_t incoming_frame_size)173   virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
174 
175   // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
RecvUpdate(uint32_t size)176   virtual void RecvUpdate(uint32_t size) { abort(); }
177 
178   // Returns the BdpEstimator held by this object. Caller is responsible for
179   // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
180   // bdp estimator actions inside TransportFlowControl
bdp_estimator()181   virtual BdpEstimator* bdp_estimator() { return nullptr; }
182 
183   // Getters
remote_window()184   int64_t remote_window() const { return remote_window_; }
target_window()185   virtual int64_t target_window() const { return target_initial_window_size_; }
announced_window()186   int64_t announced_window() const { return announced_window_; }
187 
188   // Used in certain benchmarks in which we don't want FlowControl to be a
189   // factor
TestOnlyForceHugeWindow()190   virtual void TestOnlyForceHugeWindow() {}
191 
192   GRPC_ABSTRACT_BASE_CLASS
193 
194  protected:
195   friend class ::grpc::testing::TrickledCHTTP2;
196   int64_t remote_window_ = kDefaultWindow;
197   int64_t target_initial_window_size_ = kDefaultWindow;
198   int64_t announced_window_ = kDefaultWindow;
199 };
200 
201 // Implementation of flow control that does NOTHING. Always returns maximum
202 // values, never initiates writes, and assumes that the remote peer is doing
203 // the same. To be used to narrow down on flow control as the cause of negative
204 // performance.
205 class TransportFlowControlDisabled final : public TransportFlowControlBase {
206  public:
207   // Maxes out all values
208   TransportFlowControlDisabled(grpc_chttp2_transport* t);
209 
flow_control_enabled()210   bool flow_control_enabled() const override { return false; }
211 
212   // Never do anything.
MaybeSendUpdate(bool writing_anyway)213   uint32_t MaybeSendUpdate(bool writing_anyway) override { return 0; }
MakeAction()214   FlowControlAction MakeAction() override { return FlowControlAction(); }
PeriodicUpdate()215   FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
StreamSentData(int64_t size)216   void StreamSentData(int64_t size) override {}
RecvData(int64_t incoming_frame_size)217   grpc_error* RecvData(int64_t incoming_frame_size) override {
218     return GRPC_ERROR_NONE;
219   }
RecvUpdate(uint32_t size)220   void RecvUpdate(uint32_t size) override {}
221 };
222 
223 // Implementation of flow control that abides to HTTP/2 spec and attempts
224 // to be as performant as possible.
225 class TransportFlowControl final : public TransportFlowControlBase {
226  public:
227   TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
~TransportFlowControl()228   ~TransportFlowControl() {}
229 
flow_control_enabled()230   bool flow_control_enabled() const override { return true; }
231 
bdp_probe()232   bool bdp_probe() const { return enable_bdp_probe_; }
233 
234   // returns an announce if we should send a transport update to our peer,
235   // else returns zero; writing_anyway indicates if a write would happen
236   // regardless of the send - if it is false and this function returns non-zero,
237   // this announce will cause a write to occur
238   uint32_t MaybeSendUpdate(bool writing_anyway) override;
239 
240   // Reads the flow control data and returns and actionable struct that will
241   // tell chttp2 exactly what it needs to do
MakeAction()242   FlowControlAction MakeAction() override {
243     return UpdateAction(FlowControlAction());
244   }
245 
246   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
247   // to perform more complex flow control calculations and return an action
248   // to let chttp2 change its parameters
249   FlowControlAction PeriodicUpdate() override;
250 
StreamSentData(int64_t size)251   void StreamSentData(int64_t size) override { remote_window_ -= size; }
252 
253   grpc_error* ValidateRecvData(int64_t incoming_frame_size);
CommitRecvData(int64_t incoming_frame_size)254   void CommitRecvData(int64_t incoming_frame_size) {
255     announced_window_ -= incoming_frame_size;
256   }
257 
RecvData(int64_t incoming_frame_size)258   grpc_error* RecvData(int64_t incoming_frame_size) override {
259     FlowControlTrace trace("  data recv", this, nullptr);
260     grpc_error* error = ValidateRecvData(incoming_frame_size);
261     if (error != GRPC_ERROR_NONE) return error;
262     CommitRecvData(incoming_frame_size);
263     return GRPC_ERROR_NONE;
264   }
265 
266   // we have received a WINDOW_UPDATE frame for a transport
RecvUpdate(uint32_t size)267   void RecvUpdate(uint32_t size) override {
268     FlowControlTrace trace("t updt recv", this, nullptr);
269     remote_window_ += size;
270   }
271 
272   // See comment above announced_stream_total_over_incoming_window_ for the
273   // logic behind this decision.
target_window()274   int64_t target_window() const override {
275     return static_cast<uint32_t> GPR_MIN(
276         (int64_t)((1u << 31) - 1),
277         announced_stream_total_over_incoming_window_ +
278             target_initial_window_size_);
279   }
280 
transport()281   const grpc_chttp2_transport* transport() const { return t_; }
282 
PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)283   void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
284     if (delta > 0) {
285       announced_stream_total_over_incoming_window_ -= delta;
286     } else {
287       announced_stream_total_under_incoming_window_ += -delta;
288     }
289   }
290 
PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)291   void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
292     if (delta > 0) {
293       announced_stream_total_over_incoming_window_ += delta;
294     } else {
295       announced_stream_total_under_incoming_window_ -= -delta;
296     }
297   }
298 
bdp_estimator()299   BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
300 
TestOnlyForceHugeWindow()301   void TestOnlyForceHugeWindow() override {
302     announced_window_ = 1024 * 1024 * 1024;
303     remote_window_ = 1024 * 1024 * 1024;
304   }
305 
306  private:
307   double TargetLogBdp();
308   double SmoothLogBdp(double value);
309   FlowControlAction::Urgency DeltaUrgency(int64_t value,
310                                           grpc_chttp2_setting_id setting_id);
311 
UpdateAction(FlowControlAction action)312   FlowControlAction UpdateAction(FlowControlAction action) {
313     if (announced_window_ < target_window() / 2) {
314       action.set_send_transport_update(
315           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
316     }
317     return action;
318   }
319 
320   const grpc_chttp2_transport* const t_;
321 
322   /** calculating what we should give for local window:
323       we track the total amount of flow control over initial window size
324       across all streams: this is data that we want to receive right now (it
325       has an outstanding read)
326       and the total amount of flow control under initial window size across all
327       streams: this is data we've read early
328       we want to adjust incoming_window such that:
329       incoming_window = total_over - max(bdp - total_under, 0) */
330   int64_t announced_stream_total_over_incoming_window_ = 0;
331   int64_t announced_stream_total_under_incoming_window_ = 0;
332 
333   /** should we probe bdp? */
334   const bool enable_bdp_probe_;
335 
336   /* bdp estimation */
337   grpc_core::BdpEstimator bdp_estimator_;
338 
339   /* pid controller */
340   grpc_core::PidController pid_controller_;
341   grpc_millis last_pid_update_ = 0;
342 };
343 
344 // Fat interface with all methods a stream flow control implementation needs
345 // to support. gRPC C Core does not support pure virtual functions, so instead
346 // we abort in any methods which require implementation in the base class.
347 class StreamFlowControlBase {
348  public:
StreamFlowControlBase()349   StreamFlowControlBase() {}
~StreamFlowControlBase()350   virtual ~StreamFlowControlBase() {}
351 
352   // Updates an action using the protected members.
UpdateAction(FlowControlAction action)353   virtual FlowControlAction UpdateAction(FlowControlAction action) { abort(); }
354 
355   // Using the protected members, returns an Action for this stream to be
356   // taken by the tranport.
MakeAction()357   virtual FlowControlAction MakeAction() { abort(); }
358 
359   // Bookkeeping for when data is sent on this stream.
SentData(int64_t outgoing_frame_size)360   virtual void SentData(int64_t outgoing_frame_size) { abort(); }
361 
362   // Bookkeeping and error checking for when data is received by this stream.
RecvData(int64_t incoming_frame_size)363   virtual grpc_error* RecvData(int64_t incoming_frame_size) { abort(); }
364 
365   // Called to check if this stream needs to send a WINDOW_UPDATE frame.
MaybeSendUpdate()366   virtual uint32_t MaybeSendUpdate() { abort(); }
367 
368   // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
RecvUpdate(uint32_t size)369   virtual void RecvUpdate(uint32_t size) { abort(); }
370 
371   // Bookkeeping for when a call pulls bytes out of the transport. At this
372   // point we consider the data 'used' and can thus let out peer know we are
373   // ready for more data.
IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)374   virtual void IncomingByteStreamUpdate(size_t max_size_hint,
375                                         size_t have_already) {
376     abort();
377   }
378 
379   // Used in certain benchmarks in which we don't want FlowControl to be a
380   // factor
TestOnlyForceHugeWindow()381   virtual void TestOnlyForceHugeWindow() {}
382 
383   // Getters
remote_window_delta()384   int64_t remote_window_delta() { return remote_window_delta_; }
local_window_delta()385   int64_t local_window_delta() { return local_window_delta_; }
announced_window_delta()386   int64_t announced_window_delta() { return announced_window_delta_; }
387 
388   GRPC_ABSTRACT_BASE_CLASS
389 
390  protected:
391   friend class ::grpc::testing::TrickledCHTTP2;
392   int64_t remote_window_delta_ = 0;
393   int64_t local_window_delta_ = 0;
394   int64_t announced_window_delta_ = 0;
395 };
396 
397 // Implementation of flow control that does NOTHING. Always returns maximum
398 // values, never initiates writes, and assumes that the remote peer is doing
399 // the same. To be used to narrow down on flow control as the cause of negative
400 // performance.
401 class StreamFlowControlDisabled : public StreamFlowControlBase {
402  public:
UpdateAction(FlowControlAction action)403   FlowControlAction UpdateAction(FlowControlAction action) override {
404     return action;
405   }
MakeAction()406   FlowControlAction MakeAction() override { return FlowControlAction(); }
SentData(int64_t outgoing_frame_size)407   void SentData(int64_t outgoing_frame_size) override {}
RecvData(int64_t incoming_frame_size)408   grpc_error* RecvData(int64_t incoming_frame_size) override {
409     return GRPC_ERROR_NONE;
410   }
MaybeSendUpdate()411   uint32_t MaybeSendUpdate() override { return 0; }
RecvUpdate(uint32_t size)412   void RecvUpdate(uint32_t size) override {}
IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)413   void IncomingByteStreamUpdate(size_t max_size_hint,
414                                 size_t have_already) override {}
415 };
416 
417 // Implementation of flow control that abides to HTTP/2 spec and attempts
418 // to be as performant as possible.
419 class StreamFlowControl final : public StreamFlowControlBase {
420  public:
421   StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl()422   ~StreamFlowControl() {
423     tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
424   }
425 
426   FlowControlAction UpdateAction(FlowControlAction action) override;
MakeAction()427   FlowControlAction MakeAction() override {
428     return UpdateAction(tfc_->MakeAction());
429   }
430 
431   // we have sent data on the wire, we must track this in our bookkeeping for
432   // the remote peer's flow control.
SentData(int64_t outgoing_frame_size)433   void SentData(int64_t outgoing_frame_size) override {
434     FlowControlTrace tracer("  data sent", tfc_, this);
435     tfc_->StreamSentData(outgoing_frame_size);
436     remote_window_delta_ -= outgoing_frame_size;
437   }
438 
439   // we have received data from the wire
440   grpc_error* RecvData(int64_t incoming_frame_size) override;
441 
442   // returns an announce if we should send a stream update to our peer, else
443   // returns zero
444   uint32_t MaybeSendUpdate() override;
445 
446   // we have received a WINDOW_UPDATE frame for a stream
RecvUpdate(uint32_t size)447   void RecvUpdate(uint32_t size) override {
448     FlowControlTrace trace("s updt recv", tfc_, this);
449     remote_window_delta_ += size;
450   }
451 
452   // the application is asking for a certain amount of bytes
453   void IncomingByteStreamUpdate(size_t max_size_hint,
454                                 size_t have_already) override;
455 
remote_window_delta()456   int64_t remote_window_delta() const { return remote_window_delta_; }
local_window_delta()457   int64_t local_window_delta() const { return local_window_delta_; }
announced_window_delta()458   int64_t announced_window_delta() const { return announced_window_delta_; }
459 
stream()460   const grpc_chttp2_stream* stream() const { return s_; }
461 
TestOnlyForceHugeWindow()462   void TestOnlyForceHugeWindow() override {
463     announced_window_delta_ = 1024 * 1024 * 1024;
464     local_window_delta_ = 1024 * 1024 * 1024;
465     remote_window_delta_ = 1024 * 1024 * 1024;
466   }
467 
468  private:
469   TransportFlowControl* const tfc_;
470   const grpc_chttp2_stream* const s_;
471 
UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)472   void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
473     tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
474     announced_window_delta_ += change;
475     tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
476   }
477 };
478 
479 }  // namespace chttp2
480 }  // namespace grpc_core
481 
482 #endif
483