• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <stdint.h>
25 
26 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/gprpp/manual_constructor.h"
29 #include "src/core/lib/transport/bdp_estimator.h"
30 #include "src/core/lib/transport/pid_controller.h"
31 
32 struct grpc_chttp2_transport;
33 struct grpc_chttp2_stream;
34 
35 extern grpc_core::TraceFlag grpc_flowctl_trace;
36 
37 namespace grpc {
38 namespace testing {
39 class TrickledCHTTP2;  // to make this a friend
40 }  // namespace testing
41 }  // namespace grpc
42 
43 namespace grpc_core {
44 namespace chttp2 {
45 
46 static constexpr uint32_t kDefaultWindow = 65535;
47 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
48 // TODO(ncteisen): Tune this
49 static constexpr uint32_t kFrameSize = 1024 * 1024;
50 
51 class TransportFlowControl;
52 class StreamFlowControl;
53 
54 // Encapsulates a collections of actions the transport needs to take with
55 // regard to flow control. Each action comes with urgencies that tell the
56 // transport how quickly the action must take place.
57 class FlowControlAction {
58  public:
59   enum class Urgency : uint8_t {
60     // Nothing to be done.
61     NO_ACTION_NEEDED = 0,
62     // Initiate a write to update the initial window immediately.
63     UPDATE_IMMEDIATELY,
64     // Push the flow control update into a send buffer, to be sent
65     // out the next time a write is initiated.
66     QUEUE_UPDATE,
67   };
68 
send_stream_update()69   Urgency send_stream_update() const { return send_stream_update_; }
send_transport_update()70   Urgency send_transport_update() const { return send_transport_update_; }
send_initial_window_update()71   Urgency send_initial_window_update() const {
72     return send_initial_window_update_;
73   }
send_max_frame_size_update()74   Urgency send_max_frame_size_update() const {
75     return send_max_frame_size_update_;
76   }
initial_window_size()77   uint32_t initial_window_size() const { return initial_window_size_; }
max_frame_size()78   uint32_t max_frame_size() const { return max_frame_size_; }
79 
set_send_stream_update(Urgency u)80   FlowControlAction& set_send_stream_update(Urgency u) {
81     send_stream_update_ = u;
82     return *this;
83   }
set_send_transport_update(Urgency u)84   FlowControlAction& set_send_transport_update(Urgency u) {
85     send_transport_update_ = u;
86     return *this;
87   }
set_send_initial_window_update(Urgency u,uint32_t update)88   FlowControlAction& set_send_initial_window_update(Urgency u,
89                                                     uint32_t update) {
90     send_initial_window_update_ = u;
91     initial_window_size_ = update;
92     return *this;
93   }
set_send_max_frame_size_update(Urgency u,uint32_t update)94   FlowControlAction& set_send_max_frame_size_update(Urgency u,
95                                                     uint32_t update) {
96     send_max_frame_size_update_ = u;
97     max_frame_size_ = update;
98     return *this;
99   }
100 
101   static const char* UrgencyString(Urgency u);
102   void Trace(grpc_chttp2_transport* t) const;
103 
104  private:
105   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
106   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
107   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
108   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
109   uint32_t initial_window_size_ = 0;
110   uint32_t max_frame_size_ = 0;
111 };
112 
113 class FlowControlTrace {
114  public:
FlowControlTrace(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)115   FlowControlTrace(const char* reason, TransportFlowControl* tfc,
116                    StreamFlowControl* sfc) {
117     if (enabled_) Init(reason, tfc, sfc);
118   }
119 
~FlowControlTrace()120   ~FlowControlTrace() {
121     if (enabled_) Finish();
122   }
123 
124  private:
125   void Init(const char* reason, TransportFlowControl* tfc,
126             StreamFlowControl* sfc);
127   void Finish();
128 
129   const bool enabled_ = GRPC_TRACE_FLAG_ENABLED(grpc_flowctl_trace);
130 
131   TransportFlowControl* tfc_;
132   StreamFlowControl* sfc_;
133   const char* reason_;
134   int64_t remote_window_;
135   int64_t target_window_;
136   int64_t announced_window_;
137   int64_t remote_window_delta_;
138   int64_t local_window_delta_;
139   int64_t announced_window_delta_;
140 };
141 
142 // Fat interface with all methods a flow control implementation needs to
143 // support.
144 class TransportFlowControlBase {
145  public:
TransportFlowControlBase()146   TransportFlowControlBase() {}
~TransportFlowControlBase()147   virtual ~TransportFlowControlBase() {}
148 
149   // Is flow control enabled? This is needed in other codepaths like the checks
150   // in parsing and in writing.
151   virtual bool flow_control_enabled() const = 0;
152 
153   // Called to check if the transport needs to send a WINDOW_UPDATE frame
154   virtual uint32_t MaybeSendUpdate(bool /* writing_anyway */) = 0;
155 
156   // Using the protected members, returns and Action to be taken by the
157   // tranport.
158   virtual FlowControlAction MakeAction() = 0;
159 
160   // Using the protected members, returns and Action to be taken by the
161   // tranport. Also checks for updates to our BDP estimate and acts
162   // accordingly.
163   virtual FlowControlAction PeriodicUpdate() = 0;
164 
165   // Called to do bookkeeping when a stream owned by this transport sends
166   // data on the wire
167   virtual void StreamSentData(int64_t /* size */) = 0;
168 
169   // Called to do bookkeeping when a stream owned by this transport receives
170   // data from the wire. Also does error checking for frame size.
171   virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
172 
173   // Called to do bookkeeping when we receive a WINDOW_UPDATE frame.
174   virtual void RecvUpdate(uint32_t /* size */) = 0;
175 
176   // Returns the BdpEstimator held by this object. Caller is responsible for
177   // checking for nullptr. TODO(ncteisen): consider fully encapsulating all
178   // bdp estimator actions inside TransportFlowControl
bdp_estimator()179   virtual BdpEstimator* bdp_estimator() { return nullptr; }
180 
181   // Getters
remote_window()182   int64_t remote_window() const { return remote_window_; }
target_window()183   virtual int64_t target_window() const { return target_initial_window_size_; }
announced_window()184   int64_t announced_window() const { return announced_window_; }
185 
186   // Used in certain benchmarks in which we don't want FlowControl to be a
187   // factor
TestOnlyForceHugeWindow()188   virtual void TestOnlyForceHugeWindow() {}
189 
190  protected:
191   friend class ::grpc::testing::TrickledCHTTP2;
192   int64_t remote_window_ = kDefaultWindow;
193   int64_t target_initial_window_size_ = kDefaultWindow;
194   int64_t announced_window_ = kDefaultWindow;
195 };
196 
197 // Implementation of flow control that does NOTHING. Always returns maximum
198 // values, never initiates writes, and assumes that the remote peer is doing
199 // the same. To be used to narrow down on flow control as the cause of negative
200 // performance.
201 class TransportFlowControlDisabled final : public TransportFlowControlBase {
202  public:
203   // Maxes out all values
204   explicit TransportFlowControlDisabled(grpc_chttp2_transport* t);
205 
flow_control_enabled()206   bool flow_control_enabled() const override { return false; }
207 
208   // Never do anything.
MaybeSendUpdate(bool)209   uint32_t MaybeSendUpdate(bool /* writing_anyway */) override { return 0; }
MakeAction()210   FlowControlAction MakeAction() override { return FlowControlAction(); }
PeriodicUpdate()211   FlowControlAction PeriodicUpdate() override { return FlowControlAction(); }
StreamSentData(int64_t)212   void StreamSentData(int64_t /* size */) override {}
RecvData(int64_t)213   grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
214     return GRPC_ERROR_NONE;
215   }
RecvUpdate(uint32_t)216   void RecvUpdate(uint32_t /* size */) override {}
217 };
218 
219 // Implementation of flow control that abides to HTTP/2 spec and attempts
220 // to be as performant as possible.
221 class TransportFlowControl final : public TransportFlowControlBase {
222  public:
223   TransportFlowControl(const grpc_chttp2_transport* t, bool enable_bdp_probe);
~TransportFlowControl()224   ~TransportFlowControl() override {}
225 
flow_control_enabled()226   bool flow_control_enabled() const override { return true; }
227 
bdp_probe()228   bool bdp_probe() const { return enable_bdp_probe_; }
229 
230   // returns an announce if we should send a transport update to our peer,
231   // else returns zero; writing_anyway indicates if a write would happen
232   // regardless of the send - if it is false and this function returns non-zero,
233   // this announce will cause a write to occur
234   uint32_t MaybeSendUpdate(bool writing_anyway) override;
235 
236   // Reads the flow control data and returns and actionable struct that will
237   // tell chttp2 exactly what it needs to do
MakeAction()238   FlowControlAction MakeAction() override {
239     return UpdateAction(FlowControlAction());
240   }
241 
242   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
243   // to perform more complex flow control calculations and return an action
244   // to let chttp2 change its parameters
245   FlowControlAction PeriodicUpdate() override;
246 
StreamSentData(int64_t size)247   void StreamSentData(int64_t size) override { remote_window_ -= size; }
248 
249   grpc_error* ValidateRecvData(int64_t incoming_frame_size);
CommitRecvData(int64_t incoming_frame_size)250   void CommitRecvData(int64_t incoming_frame_size) {
251     announced_window_ -= incoming_frame_size;
252   }
253 
RecvData(int64_t incoming_frame_size)254   grpc_error* RecvData(int64_t incoming_frame_size) override {
255     FlowControlTrace trace("  data recv", this, nullptr);
256     grpc_error* error = ValidateRecvData(incoming_frame_size);
257     if (error != GRPC_ERROR_NONE) return error;
258     CommitRecvData(incoming_frame_size);
259     return GRPC_ERROR_NONE;
260   }
261 
262   // we have received a WINDOW_UPDATE frame for a transport
RecvUpdate(uint32_t size)263   void RecvUpdate(uint32_t size) override {
264     FlowControlTrace trace("t updt recv", this, nullptr);
265     remote_window_ += size;
266   }
267 
268   // See comment above announced_stream_total_over_incoming_window_ for the
269   // logic behind this decision.
target_window()270   int64_t target_window() const override {
271     return static_cast<uint32_t> GPR_MIN(
272         (int64_t)((1u << 31) - 1),
273         announced_stream_total_over_incoming_window_ +
274             target_initial_window_size_);
275   }
276 
transport()277   const grpc_chttp2_transport* transport() const { return t_; }
278 
PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)279   void PreUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
280     if (delta > 0) {
281       announced_stream_total_over_incoming_window_ -= delta;
282     }
283   }
284 
PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta)285   void PostUpdateAnnouncedWindowOverIncomingWindow(int64_t delta) {
286     if (delta > 0) {
287       announced_stream_total_over_incoming_window_ += delta;
288     }
289   }
290 
bdp_estimator()291   BdpEstimator* bdp_estimator() override { return &bdp_estimator_; }
292 
TestOnlyForceHugeWindow()293   void TestOnlyForceHugeWindow() override {
294     announced_window_ = 1024 * 1024 * 1024;
295     remote_window_ = 1024 * 1024 * 1024;
296   }
297 
298  private:
299   double TargetLogBdp();
300   double SmoothLogBdp(double value);
301   FlowControlAction::Urgency DeltaUrgency(int64_t value,
302                                           grpc_chttp2_setting_id setting_id);
303 
UpdateAction(FlowControlAction action)304   FlowControlAction UpdateAction(FlowControlAction action) {
305     if (announced_window_ < target_window() / 2) {
306       action.set_send_transport_update(
307           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
308     }
309     return action;
310   }
311 
312   const grpc_chttp2_transport* const t_;
313 
314   /** calculating what we should give for local window:
315       we track the total amount of flow control over initial window size
316       across all streams: this is data that we want to receive right now (it
317       has an outstanding read)
318       and the total amount of flow control under initial window size across all
319       streams: this is data we've read early
320       we want to adjust incoming_window such that:
321       incoming_window = total_over - max(bdp - total_under, 0) */
322   int64_t announced_stream_total_over_incoming_window_ = 0;
323 
324   /** should we probe bdp? */
325   const bool enable_bdp_probe_;
326 
327   /* bdp estimation */
328   grpc_core::BdpEstimator bdp_estimator_;
329 
330   /* pid controller */
331   grpc_core::PidController pid_controller_;
332   grpc_millis last_pid_update_ = 0;
333 };
334 
335 // Fat interface with all methods a stream flow control implementation needs
336 // to support.
337 class StreamFlowControlBase {
338  public:
StreamFlowControlBase()339   StreamFlowControlBase() {}
~StreamFlowControlBase()340   virtual ~StreamFlowControlBase() {}
341 
342   // Updates an action using the protected members.
UpdateAction(FlowControlAction)343   virtual FlowControlAction UpdateAction(FlowControlAction /* action */) {
344     abort();
345   }
346 
347   // Using the protected members, returns an Action for this stream to be
348   // taken by the tranport.
349   virtual FlowControlAction MakeAction() = 0;
350 
351   // Bookkeeping for when data is sent on this stream.
352   virtual void SentData(int64_t /* outgoing_frame_size */) = 0;
353 
354   // Bookkeeping and error checking for when data is received by this stream.
355   virtual grpc_error* RecvData(int64_t /* incoming_frame_size */) = 0;
356 
357   // Called to check if this stream needs to send a WINDOW_UPDATE frame.
358   virtual uint32_t MaybeSendUpdate() = 0;
359 
360   // Bookkeeping for receiving a WINDOW_UPDATE from for this stream.
361   virtual void RecvUpdate(uint32_t /* size */) = 0;
362 
363   // Bookkeeping for when a call pulls bytes out of the transport. At this
364   // point we consider the data 'used' and can thus let out peer know we are
365   // ready for more data.
IncomingByteStreamUpdate(size_t,size_t)366   virtual void IncomingByteStreamUpdate(size_t /* max_size_hint */,
367                                         size_t /* have_already */) {
368     abort();
369   }
370 
371   // Used in certain benchmarks in which we don't want FlowControl to be a
372   // factor
TestOnlyForceHugeWindow()373   virtual void TestOnlyForceHugeWindow() {}
374 
375   // Getters
remote_window_delta()376   int64_t remote_window_delta() { return remote_window_delta_; }
local_window_delta()377   int64_t local_window_delta() { return local_window_delta_; }
announced_window_delta()378   int64_t announced_window_delta() { return announced_window_delta_; }
379 
380  protected:
381   friend class ::grpc::testing::TrickledCHTTP2;
382   int64_t remote_window_delta_ = 0;
383   int64_t local_window_delta_ = 0;
384   int64_t announced_window_delta_ = 0;
385 };
386 
387 // Implementation of flow control that does NOTHING. Always returns maximum
388 // values, never initiates writes, and assumes that the remote peer is doing
389 // the same. To be used to narrow down on flow control as the cause of negative
390 // performance.
391 class StreamFlowControlDisabled : public StreamFlowControlBase {
392  public:
UpdateAction(FlowControlAction action)393   FlowControlAction UpdateAction(FlowControlAction action) override {
394     return action;
395   }
MakeAction()396   FlowControlAction MakeAction() override { return FlowControlAction(); }
SentData(int64_t)397   void SentData(int64_t /* outgoing_frame_size */) override {}
RecvData(int64_t)398   grpc_error* RecvData(int64_t /* incoming_frame_size */) override {
399     return GRPC_ERROR_NONE;
400   }
MaybeSendUpdate()401   uint32_t MaybeSendUpdate() override { return 0; }
RecvUpdate(uint32_t)402   void RecvUpdate(uint32_t /* size */) override {}
IncomingByteStreamUpdate(size_t,size_t)403   void IncomingByteStreamUpdate(size_t /* max_size_hint */,
404                                 size_t /* have_already */) override {}
405 };
406 
407 // Implementation of flow control that abides to HTTP/2 spec and attempts
408 // to be as performant as possible.
409 class StreamFlowControl final : public StreamFlowControlBase {
410  public:
411   StreamFlowControl(TransportFlowControl* tfc, const grpc_chttp2_stream* s);
~StreamFlowControl()412   ~StreamFlowControl() override {
413     tfc_->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
414   }
415 
416   FlowControlAction UpdateAction(FlowControlAction action) override;
MakeAction()417   FlowControlAction MakeAction() override {
418     return UpdateAction(tfc_->MakeAction());
419   }
420 
421   // we have sent data on the wire, we must track this in our bookkeeping for
422   // the remote peer's flow control.
SentData(int64_t outgoing_frame_size)423   void SentData(int64_t outgoing_frame_size) override {
424     FlowControlTrace tracer("  data sent", tfc_, this);
425     tfc_->StreamSentData(outgoing_frame_size);
426     remote_window_delta_ -= outgoing_frame_size;
427   }
428 
429   // we have received data from the wire
430   grpc_error* RecvData(int64_t incoming_frame_size) override;
431 
432   // returns an announce if we should send a stream update to our peer, else
433   // returns zero
434   uint32_t MaybeSendUpdate() override;
435 
436   // we have received a WINDOW_UPDATE frame for a stream
RecvUpdate(uint32_t size)437   void RecvUpdate(uint32_t size) override {
438     FlowControlTrace trace("s updt recv", tfc_, this);
439     remote_window_delta_ += size;
440   }
441 
442   // the application is asking for a certain amount of bytes
443   void IncomingByteStreamUpdate(size_t max_size_hint,
444                                 size_t have_already) override;
445 
remote_window_delta()446   int64_t remote_window_delta() const { return remote_window_delta_; }
local_window_delta()447   int64_t local_window_delta() const { return local_window_delta_; }
announced_window_delta()448   int64_t announced_window_delta() const { return announced_window_delta_; }
449 
stream()450   const grpc_chttp2_stream* stream() const { return s_; }
451 
TestOnlyForceHugeWindow()452   void TestOnlyForceHugeWindow() override {
453     announced_window_delta_ = 1024 * 1024 * 1024;
454     local_window_delta_ = 1024 * 1024 * 1024;
455     remote_window_delta_ = 1024 * 1024 * 1024;
456   }
457 
458  private:
459   TransportFlowControl* const tfc_;
460   const grpc_chttp2_stream* const s_;
461 
UpdateAnnouncedWindowDelta(TransportFlowControl * tfc,int64_t change)462   void UpdateAnnouncedWindowDelta(TransportFlowControl* tfc, int64_t change) {
463     tfc->PreUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
464     announced_window_delta_ += change;
465     tfc->PostUpdateAnnouncedWindowOverIncomingWindow(announced_window_delta_);
466   }
467 };
468 
469 class TestOnlyTransportTargetWindowEstimatesMocker {
470  public:
~TestOnlyTransportTargetWindowEstimatesMocker()471   virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
472   virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
473       double current_target) = 0;
474 };
475 
476 extern TestOnlyTransportTargetWindowEstimatesMocker*
477     g_test_only_transport_target_window_estimates_mocker;
478 
479 }  // namespace chttp2
480 }  // namespace grpc_core
481 
482 #endif
483