• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
20 #define GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
21 
22 #include <grpc/support/port_platform.h>
23 #include <limits.h>
24 #include <stdint.h>
25 
26 #include <iosfwd>
27 #include <string>
28 #include <utility>
29 
30 #include "absl/functional/function_ref.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/string_view.h"
34 #include "absl/types/optional.h"
35 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
36 #include "src/core/lib/debug/trace.h"
37 #include "src/core/lib/resource_quota/memory_quota.h"
38 #include "src/core/lib/transport/bdp_estimator.h"
39 #include "src/core/util/time.h"
40 
41 namespace grpc {
42 namespace testing {
43 class TrickledCHTTP2;  // to make this a friend
44 }  // namespace testing
45 }  // namespace grpc
46 
47 namespace grpc_core {
48 namespace chttp2 {
49 
50 static constexpr uint32_t kDefaultWindow = 65535;
51 static constexpr uint32_t kDefaultFrameSize = 16384;
52 static constexpr int64_t kMaxWindow = static_cast<int64_t>((1u << 31) - 1);
53 // If smaller than this, advertise zero window.
54 static constexpr uint32_t kMinPositiveInitialWindowSize = 1024;
55 static constexpr const uint32_t kMaxInitialWindowSize = (1u << 30);
56 // The maximum per-stream flow control window delta to advertise.
57 static constexpr const int64_t kMaxWindowDelta = (1u << 20);
58 static constexpr const int kDefaultPreferredRxCryptoFrameSize = INT_MAX;
59 
60 // TODO(ctiller): clean up when flow_control_fixes is enabled by default
61 static constexpr uint32_t kFrameSize = 1024 * 1024;
62 static constexpr const uint32_t kMinInitialWindowSize = 128;
63 
64 class TransportFlowControl;
65 class StreamFlowControl;
66 
67 enum class StallEdge { kNoChange, kStalled, kUnstalled };
68 
69 // Encapsulates a collections of actions the transport needs to take with
70 // regard to flow control. Each action comes with urgencies that tell the
71 // transport how quickly the action must take place.
72 class GRPC_MUST_USE_RESULT FlowControlAction {
73  public:
74   enum class Urgency : uint8_t {
75     // Nothing to be done.
76     NO_ACTION_NEEDED = 0,
77     // Initiate a write to update the initial window immediately.
78     UPDATE_IMMEDIATELY,
79     // Push the flow control update into a send buffer, to be sent
80     // out the next time a write is initiated.
81     QUEUE_UPDATE,
82   };
83 
send_stream_update()84   Urgency send_stream_update() const { return send_stream_update_; }
send_transport_update()85   Urgency send_transport_update() const { return send_transport_update_; }
send_initial_window_update()86   Urgency send_initial_window_update() const {
87     return send_initial_window_update_;
88   }
send_max_frame_size_update()89   Urgency send_max_frame_size_update() const {
90     return send_max_frame_size_update_;
91   }
preferred_rx_crypto_frame_size_update()92   Urgency preferred_rx_crypto_frame_size_update() const {
93     return preferred_rx_crypto_frame_size_update_;
94   }
initial_window_size()95   uint32_t initial_window_size() const { return initial_window_size_; }
max_frame_size()96   uint32_t max_frame_size() const { return max_frame_size_; }
preferred_rx_crypto_frame_size()97   uint32_t preferred_rx_crypto_frame_size() const {
98     return preferred_rx_crypto_frame_size_;
99   }
100 
set_send_stream_update(Urgency u)101   FlowControlAction& set_send_stream_update(Urgency u) {
102     send_stream_update_ = u;
103     return *this;
104   }
set_send_transport_update(Urgency u)105   FlowControlAction& set_send_transport_update(Urgency u) {
106     send_transport_update_ = u;
107     return *this;
108   }
set_send_initial_window_update(Urgency u,uint32_t update)109   FlowControlAction& set_send_initial_window_update(Urgency u,
110                                                     uint32_t update) {
111     send_initial_window_update_ = u;
112     initial_window_size_ = update;
113     return *this;
114   }
set_send_max_frame_size_update(Urgency u,uint32_t update)115   FlowControlAction& set_send_max_frame_size_update(Urgency u,
116                                                     uint32_t update) {
117     send_max_frame_size_update_ = u;
118     max_frame_size_ = update;
119     return *this;
120   }
set_preferred_rx_crypto_frame_size_update(Urgency u,uint32_t update)121   FlowControlAction& set_preferred_rx_crypto_frame_size_update(
122       Urgency u, uint32_t update) {
123     preferred_rx_crypto_frame_size_update_ = u;
124     preferred_rx_crypto_frame_size_ = update;
125     return *this;
126   }
127 
128   static const char* UrgencyString(Urgency u);
129   std::string DebugString() const;
130 
AssertEmpty()131   void AssertEmpty() { CHECK(*this == FlowControlAction()); }
132 
133   bool operator==(const FlowControlAction& other) const {
134     return send_stream_update_ == other.send_stream_update_ &&
135            send_transport_update_ == other.send_transport_update_ &&
136            send_initial_window_update_ == other.send_initial_window_update_ &&
137            send_max_frame_size_update_ == other.send_max_frame_size_update_ &&
138            (send_initial_window_update_ == Urgency::NO_ACTION_NEEDED ||
139             initial_window_size_ == other.initial_window_size_) &&
140            (send_max_frame_size_update_ == Urgency::NO_ACTION_NEEDED ||
141             max_frame_size_ == other.max_frame_size_) &&
142            (preferred_rx_crypto_frame_size_update_ ==
143                 Urgency::NO_ACTION_NEEDED ||
144             preferred_rx_crypto_frame_size_ ==
145                 other.preferred_rx_crypto_frame_size_);
146   }
147 
148  private:
149   Urgency send_stream_update_ = Urgency::NO_ACTION_NEEDED;
150   Urgency send_transport_update_ = Urgency::NO_ACTION_NEEDED;
151   Urgency send_initial_window_update_ = Urgency::NO_ACTION_NEEDED;
152   Urgency send_max_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
153   Urgency preferred_rx_crypto_frame_size_update_ = Urgency::NO_ACTION_NEEDED;
154   uint32_t initial_window_size_ = 0;
155   uint32_t max_frame_size_ = 0;
156   uint32_t preferred_rx_crypto_frame_size_ = 0;
157 };
158 
159 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency urgency);
160 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action);
161 
162 // Implementation of flow control that abides to HTTP/2 spec and attempts
163 // to be as performant as possible.
164 class TransportFlowControl final {
165  public:
166   explicit TransportFlowControl(absl::string_view name, bool enable_bdp_probe,
167                                 MemoryOwner* memory_owner);
~TransportFlowControl()168   ~TransportFlowControl() {}
169 
bdp_probe()170   bool bdp_probe() const { return enable_bdp_probe_; }
171 
172   // returns an announce if we should send a transport update to our peer,
173   // else returns zero; writing_anyway indicates if a write would happen
174   // regardless of the send - if it is false and this function returns non-zero,
175   // this announce will cause a write to occur
176   uint32_t DesiredAnnounceSize(bool writing_anyway) const;
177   // notify that we've actually sent a stream window update
178   // (should be DesiredAnnounceSize())
179   void SentUpdate(uint32_t announce);
180 
181   // Older API: combines getting the DesiredAnnounceSize() with SentUpdate()
MaybeSendUpdate(bool writing_anyway)182   uint32_t MaybeSendUpdate(bool writing_anyway) {
183     uint32_t n = DesiredAnnounceSize(writing_anyway);
184     SentUpdate(n);
185     return n;
186   }
187 
188   // Track an update to the incoming flow control counters - that is how many
189   // tokens we report to our peer that we're willing to accept.
190   // Instantiators *must* call MakeAction before destruction of this value.
191   class IncomingUpdateContext {
192    public:
IncomingUpdateContext(TransportFlowControl * tfc)193     explicit IncomingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {}
~IncomingUpdateContext()194     ~IncomingUpdateContext() { CHECK_EQ(tfc_, nullptr); }
195 
196     IncomingUpdateContext(const IncomingUpdateContext&) = delete;
197     IncomingUpdateContext& operator=(const IncomingUpdateContext&) = delete;
198 
199     // Reads the flow control data and returns an actionable struct that will
200     // tell chttp2 exactly what it needs to do
MakeAction()201     FlowControlAction MakeAction() {
202       return std::exchange(tfc_, nullptr)->UpdateAction(FlowControlAction());
203     }
204 
205     // Notify of data receipt. Returns OkStatus if the data was accepted,
206     // else an error status if the connection should be closed.
207     absl::Status RecvData(
208         int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream =
209                                          []() { return absl::OkStatus(); });
210 
211     // Update a stream announce window delta, keeping track of how much total
212     // positive delta is present on the transport.
UpdateAnnouncedWindowDelta(int64_t * delta,int64_t change)213     void UpdateAnnouncedWindowDelta(int64_t* delta, int64_t change) {
214       if (change == 0) return;
215       if (*delta > 0) {
216         tfc_->announced_stream_total_over_incoming_window_ -= *delta;
217       }
218       *delta += change;
219       if (*delta > 0) {
220         tfc_->announced_stream_total_over_incoming_window_ += *delta;
221       }
222     }
223 
224    private:
225     TransportFlowControl* tfc_;
226   };
227 
228   // Track an update to the outgoing flow control counters - that is how many
229   // tokens our peer has said we can send.
230   class OutgoingUpdateContext {
231    public:
OutgoingUpdateContext(TransportFlowControl * tfc)232     explicit OutgoingUpdateContext(TransportFlowControl* tfc) : tfc_(tfc) {}
StreamSentData(int64_t size)233     void StreamSentData(int64_t size) { tfc_->remote_window_ -= size; }
234 
235     // we have received a WINDOW_UPDATE frame for a transport
RecvUpdate(uint32_t size)236     void RecvUpdate(uint32_t size) { tfc_->remote_window_ += size; }
237 
238     // Finish the update and check whether we became stalled or unstalled.
Finish()239     StallEdge Finish() {
240       bool is_stalled = tfc_->remote_window_ <= 0;
241       if (is_stalled != was_stalled_) {
242         return is_stalled ? StallEdge::kStalled : StallEdge::kUnstalled;
243       } else {
244         return StallEdge::kNoChange;
245       }
246     }
247 
248    private:
249     TransportFlowControl* tfc_;
250     const bool was_stalled_ = tfc_->remote_window_ <= 0;
251   };
252 
253   // Call periodically (at a low-ish rate, 100ms - 10s makes sense)
254   // to perform more complex flow control calculations and return an action
255   // to let chttp2 change its parameters
256   FlowControlAction PeriodicUpdate();
257 
258   int64_t target_window() const;
target_frame_size()259   int64_t target_frame_size() const { return target_frame_size_; }
target_preferred_rx_crypto_frame_size()260   int64_t target_preferred_rx_crypto_frame_size() const {
261     return target_preferred_rx_crypto_frame_size_;
262   }
263 
bdp_estimator()264   BdpEstimator* bdp_estimator() { return &bdp_estimator_; }
265 
acked_init_window()266   uint32_t acked_init_window() const { return acked_init_window_; }
queued_init_window()267   uint32_t queued_init_window() const { return target_initial_window_size_; }
sent_init_window()268   uint32_t sent_init_window() const { return sent_init_window_; }
269 
FlushedSettings()270   void FlushedSettings() { sent_init_window_ = queued_init_window(); }
271 
272   FlowControlAction SetAckedInitialWindow(uint32_t value);
273 
274   // Getters
remote_window()275   int64_t remote_window() const { return remote_window_; }
announced_window()276   int64_t announced_window() const { return announced_window_; }
277 
announced_stream_total_over_incoming_window()278   int64_t announced_stream_total_over_incoming_window() const {
279     return announced_stream_total_over_incoming_window_;
280   }
281 
RemoveAnnouncedWindowDelta(int64_t delta)282   void RemoveAnnouncedWindowDelta(int64_t delta) {
283     if (delta > 0) {
284       announced_stream_total_over_incoming_window_ -= delta;
285     }
286   }
287 
288   // A snapshot of the flow control stats to export.
289   struct Stats {
290     int64_t target_window;
291     int64_t target_frame_size;
292     int64_t target_preferred_rx_crypto_frame_size;
293     uint32_t acked_init_window;
294     uint32_t queued_init_window;
295     uint32_t sent_init_window;
296     int64_t remote_window;
297     int64_t announced_window;
298     int64_t announced_stream_total_over_incoming_window;
299     // BDP estimator stats.
300     int64_t bdp_accumulator;
301     int64_t bdp_estimate;
302     double bdp_bw_est;
303 
304     std::string ToString() const;
305   };
306 
stats()307   Stats stats() const {
308     Stats stats;
309     stats.target_window = target_window();
310     stats.target_frame_size = target_frame_size();
311     stats.target_preferred_rx_crypto_frame_size =
312         target_preferred_rx_crypto_frame_size();
313     stats.acked_init_window = acked_init_window();
314     stats.queued_init_window = queued_init_window();
315     stats.sent_init_window = sent_init_window();
316     stats.remote_window = remote_window();
317     stats.announced_window = announced_window();
318     stats.announced_stream_total_over_incoming_window =
319         announced_stream_total_over_incoming_window();
320     stats.bdp_accumulator = bdp_estimator_.accumulator();
321     stats.bdp_estimate = bdp_estimator_.EstimateBdp();
322     stats.bdp_bw_est = bdp_estimator_.EstimateBandwidth();
323     return stats;
324   }
325 
326  private:
327   double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const;
328   static void UpdateSetting(absl::string_view name, int64_t* desired_value,
329                             uint32_t new_desired_value,
330                             FlowControlAction* action,
331                             FlowControlAction& (FlowControlAction::*set)(
332                                 FlowControlAction::Urgency, uint32_t));
333 
334   FlowControlAction UpdateAction(FlowControlAction action);
335 
336   MemoryOwner* const memory_owner_;
337 
338   /// calculating what we should give for local window:
339   /// we track the total amount of flow control over initial window size
340   /// across all streams: this is data that we want to receive right now (it
341   /// has an outstanding read)
342   /// and the total amount of flow control under initial window size across all
343   /// streams: this is data we've read early
344   /// we want to adjust incoming_window such that:
345   /// incoming_window = total_over - max(bdp - total_under, 0)
346   int64_t announced_stream_total_over_incoming_window_ = 0;
347 
348   /// should we probe bdp?
349   const bool enable_bdp_probe_;
350 
351   // bdp estimation
352   BdpEstimator bdp_estimator_;
353 
354   int64_t remote_window_ = kDefaultWindow;
355   int64_t target_initial_window_size_ = kDefaultWindow;
356   int64_t target_frame_size_ = kDefaultFrameSize;
357   int64_t target_preferred_rx_crypto_frame_size_ =
358       kDefaultPreferredRxCryptoFrameSize;
359   int64_t announced_window_ = kDefaultWindow;
360   uint32_t acked_init_window_ = kDefaultWindow;
361   uint32_t sent_init_window_ = kDefaultWindow;
362 };
363 
364 // Implementation of flow control that abides to HTTP/2 spec and attempts
365 // to be as performant as possible.
366 class StreamFlowControl final {
367  public:
368   explicit StreamFlowControl(TransportFlowControl* tfc);
~StreamFlowControl()369   ~StreamFlowControl() {
370     tfc_->RemoveAnnouncedWindowDelta(announced_window_delta_);
371   }
372 
373   // Track an update to the incoming flow control counters - that is how many
374   // tokens we report to our peer that we're willing to accept.
375   // Instantiators *must* call MakeAction before destruction of this value.
376   class IncomingUpdateContext {
377    public:
IncomingUpdateContext(StreamFlowControl * sfc)378     explicit IncomingUpdateContext(StreamFlowControl* sfc)
379         : tfc_upd_(sfc->tfc_), sfc_(sfc) {}
380 
MakeAction()381     FlowControlAction MakeAction() {
382       return sfc_->UpdateAction(tfc_upd_.MakeAction());
383     }
384 
385     // we have received data from the wire
386     absl::Status RecvData(int64_t incoming_frame_size);
387 
388     // the application is asking for a certain amount of bytes
SetMinProgressSize(int64_t min_progress_size)389     void SetMinProgressSize(int64_t min_progress_size) {
390       sfc_->min_progress_size_ = min_progress_size;
391     }
392 
393     void SetPendingSize(int64_t pending_size);
394 
395    private:
396     TransportFlowControl::IncomingUpdateContext tfc_upd_;
397     StreamFlowControl* const sfc_;
398   };
399 
400   // Track an update to the outgoing flow control counters - that is how many
401   // tokens our peer has said we can send.
402   class OutgoingUpdateContext {
403    public:
OutgoingUpdateContext(StreamFlowControl * sfc)404     explicit OutgoingUpdateContext(StreamFlowControl* sfc)
405         : tfc_upd_(sfc->tfc_), sfc_(sfc) {}
406     // we have received a WINDOW_UPDATE frame for a stream
RecvUpdate(uint32_t size)407     void RecvUpdate(uint32_t size) { sfc_->remote_window_delta_ += size; }
408     // we have sent data on the wire, we must track this in our bookkeeping for
409     // the remote peer's flow control.
SentData(int64_t outgoing_frame_size)410     void SentData(int64_t outgoing_frame_size) {
411       tfc_upd_.StreamSentData(outgoing_frame_size);
412       sfc_->remote_window_delta_ -= outgoing_frame_size;
413     }
414 
415    private:
416     TransportFlowControl::OutgoingUpdateContext tfc_upd_;
417     StreamFlowControl* const sfc_;
418   };
419 
420   // returns an announce if we should send a stream update to our peer, else
421   // returns zero
422   uint32_t DesiredAnnounceSize() const;
423   // notify that we've actually sent a stream window update
424   // (should be DesiredAnnounceSize())
425   void SentUpdate(uint32_t announce);
426 
427   // Older API: combines getting the DesiredAnnounceSize() with SentUpdate()
MaybeSendUpdate()428   uint32_t MaybeSendUpdate() {
429     uint32_t n = DesiredAnnounceSize();
430     SentUpdate(n);
431     return n;
432   }
433 
remote_window_delta()434   int64_t remote_window_delta() const { return remote_window_delta_; }
announced_window_delta()435   int64_t announced_window_delta() const { return announced_window_delta_; }
min_progress_size()436   int64_t min_progress_size() const { return min_progress_size_; }
437 
438   // A snapshot of the flow control stats to export.
439   struct Stats {
440     int64_t min_progress_size;
441     int64_t remote_window_delta;
442     int64_t announced_window_delta;
443     absl::optional<int64_t> pending_size;
444 
445     std::string ToString() const;
446   };
447 
stats()448   Stats stats() const {
449     Stats stats;
450     stats.min_progress_size = min_progress_size();
451     stats.remote_window_delta = remote_window_delta();
452     stats.announced_window_delta = announced_window_delta();
453     stats.pending_size = pending_size_;
454     return stats;
455   }
456 
457  private:
458   TransportFlowControl* const tfc_;
459   int64_t min_progress_size_ = 0;
460   int64_t remote_window_delta_ = 0;
461   int64_t announced_window_delta_ = 0;
462   absl::optional<int64_t> pending_size_;
463 
464   FlowControlAction UpdateAction(FlowControlAction action);
465 };
466 
467 class TestOnlyTransportTargetWindowEstimatesMocker {
468  public:
~TestOnlyTransportTargetWindowEstimatesMocker()469   virtual ~TestOnlyTransportTargetWindowEstimatesMocker() {}
470   virtual double ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
471       double current_target) = 0;
472 };
473 
474 extern TestOnlyTransportTargetWindowEstimatesMocker*
475     g_test_only_transport_target_window_estimates_mocker;
476 
477 }  // namespace chttp2
478 }  // namespace grpc_core
479 
480 #endif  // GRPC_SRC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_FLOW_CONTROL_H
481