• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 
24 #include <algorithm>
25 #include <cmath>
26 #include <ostream>
27 #include <string>
28 #include <tuple>
29 #include <vector>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "src/core/util/useful.h"
40 
41 namespace grpc_core {
42 namespace chttp2 {
43 
44 TestOnlyTransportTargetWindowEstimatesMocker*
45     g_test_only_transport_target_window_estimates_mocker;
46 
47 namespace {
48 
49 constexpr const int64_t kMaxWindowUpdateSize = (1u << 31) - 1;
50 
51 }  // namespace
52 
UrgencyString(Urgency u)53 const char* FlowControlAction::UrgencyString(Urgency u) {
54   switch (u) {
55     case Urgency::NO_ACTION_NEEDED:
56       return "no-action";
57     case Urgency::UPDATE_IMMEDIATELY:
58       return "now";
59     case Urgency::QUEUE_UPDATE:
60       return "queue";
61     default:
62       GPR_UNREACHABLE_CODE(return "unknown");
63   }
64   GPR_UNREACHABLE_CODE(return "unknown");
65 }
66 
operator <<(std::ostream & out,FlowControlAction::Urgency u)67 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency u) {
68   return out << FlowControlAction::UrgencyString(u);
69 }
70 
DebugString() const71 std::string FlowControlAction::DebugString() const {
72   std::vector<std::string> segments;
73   if (send_transport_update_ != Urgency::NO_ACTION_NEEDED) {
74     segments.push_back(
75         absl::StrCat("t:", UrgencyString(send_transport_update_)));
76   }
77   if (send_stream_update_ != Urgency::NO_ACTION_NEEDED) {
78     segments.push_back(absl::StrCat("s:", UrgencyString(send_stream_update_)));
79   }
80   if (send_initial_window_update_ != Urgency::NO_ACTION_NEEDED) {
81     segments.push_back(
82         absl::StrCat("iw=", initial_window_size_, ":",
83                      UrgencyString(send_initial_window_update_)));
84   }
85   if (send_max_frame_size_update_ != Urgency::NO_ACTION_NEEDED) {
86     segments.push_back(
87         absl::StrCat("mf=", max_frame_size_, ":",
88                      UrgencyString(send_max_frame_size_update_)));
89   }
90   if (segments.empty()) return "no action";
91   return absl::StrJoin(segments, ",");
92 }
93 
operator <<(std::ostream & out,const FlowControlAction & action)94 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action) {
95   return out << action.DebugString();
96 }
97 
TransportFlowControl(absl::string_view name,bool enable_bdp_probe,MemoryOwner * memory_owner)98 TransportFlowControl::TransportFlowControl(absl::string_view name,
99                                            bool enable_bdp_probe,
100                                            MemoryOwner* memory_owner)
101     : memory_owner_(memory_owner),
102       enable_bdp_probe_(enable_bdp_probe),
103       bdp_estimator_(name) {}
104 
DesiredAnnounceSize(bool writing_anyway) const105 uint32_t TransportFlowControl::DesiredAnnounceSize(bool writing_anyway) const {
106   const uint32_t target_announced_window =
107       static_cast<uint32_t>(target_window());
108   if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
109       announced_window_ != target_announced_window) {
110     return Clamp(target_announced_window - announced_window_, int64_t{0},
111                  kMaxWindowUpdateSize);
112   }
113   return 0;
114 }
115 
SentUpdate(uint32_t announce)116 void TransportFlowControl::SentUpdate(uint32_t announce) {
117   announced_window_ += announce;
118 }
119 
StreamFlowControl(TransportFlowControl * tfc)120 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc) : tfc_(tfc) {}
121 
RecvData(int64_t incoming_frame_size)122 absl::Status StreamFlowControl::IncomingUpdateContext::RecvData(
123     int64_t incoming_frame_size) {
124   return tfc_upd_.RecvData(incoming_frame_size, [this, incoming_frame_size]() {
125     int64_t acked_stream_window =
126         sfc_->announced_window_delta_ + sfc_->tfc_->acked_init_window();
127     if (incoming_frame_size > acked_stream_window) {
128       return absl::InternalError(absl::StrFormat(
129           "frame of size %" PRId64 " overflows local window of %" PRId64,
130           incoming_frame_size, acked_stream_window));
131     }
132 
133     tfc_upd_.UpdateAnnouncedWindowDelta(&sfc_->announced_window_delta_,
134                                         -incoming_frame_size);
135     sfc_->min_progress_size_ -=
136         std::min(sfc_->min_progress_size_, incoming_frame_size);
137     return absl::OkStatus();
138   });
139 }
140 
RecvData(int64_t incoming_frame_size,absl::FunctionRef<absl::Status ()> stream)141 absl::Status TransportFlowControl::IncomingUpdateContext::RecvData(
142     int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream) {
143   if (incoming_frame_size > tfc_->announced_window_) {
144     return absl::InternalError(absl::StrFormat(
145         "frame of size %" PRId64 " overflows local window of %" PRId64,
146         incoming_frame_size, tfc_->announced_window_));
147   }
148   absl::Status error = stream();
149   if (!error.ok()) return error;
150   tfc_->announced_window_ -= incoming_frame_size;
151   return absl::OkStatus();
152 }
153 
target_window() const154 int64_t TransportFlowControl::target_window() const {
155   // See comment above announced_stream_total_over_incoming_window_ for the
156   // logic behind this decision.
157   return static_cast<uint32_t>(
158       std::min(static_cast<int64_t>((1u << 31) - 1),
159                announced_stream_total_over_incoming_window_ +
160                    std::max<int64_t>(1, target_initial_window_size_)));
161 }
162 
UpdateAction(FlowControlAction action)163 FlowControlAction TransportFlowControl::UpdateAction(FlowControlAction action) {
164   const int64_t target = target_window();
165   // round up so that one byte targets are sent.
166   const int64_t send_threshold = (target + 1) / 2;
167   if (announced_window_ < send_threshold) {
168     action.set_send_transport_update(
169         FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
170   }
171   return action;
172 }
173 
174 double
TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const175 TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
176     const {
177   const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
178   const double memory_pressure =
179       memory_owner_->GetPressureInfo().pressure_control_value;
180   // Linear interpolation between two values.
181   // Given a line segment between the two points (t_min, a), and (t_max, b),
182   // and a value t such that t_min <= t <= t_max, return the value on the line
183   // segment at t.
184   auto lerp = [](double t, double t_min, double t_max, double a, double b) {
185     return a + ((b - a) * (t - t_min) / (t_max - t_min));
186   };
187   // We split memory pressure into three broad regions:
188   // 1. Low memory pressure, the "anything goes" case - we assume no memory
189   //    pressure concerns and advertise a huge window to keep things flowing.
190   // 2. Moderate memory pressure, the "adjust to BDP" case - we linearly ramp
191   //    down window size to 2*BDP - which should still allow bytes to flow, but
192   //    is arguably more considered.
193   // 3. High memory pressure - past 50% we linearly ramp down window size from
194   //    BDP to 0 - at which point senders effectively must request to send bytes
195   //    to us.
196   //
197   //          ▲
198   //          │
199   //  4mb ────┤---------x----
200   //          │              -----
201   //  BDP ────┤                   ----x---
202   //          │                           ----
203   //          │                               -----
204   //          │                                    ----
205   //          │                                        -----
206   //          │                                             ---x
207   //          ├─────────┬─────────────┬────────────────────────┬─────►
208   //          │Anything │Adjust to    │Drop to zero            │
209   //          │Goes     │BDP          │                        │
210   //          0%        20%           50%                      100% memory
211   //                                                                pressure
212   const double kAnythingGoesPressure = 0.2;
213   const double kAdjustedToBdpPressure = 0.5;
214   const double kOneMegabyte = 1024.0 * 1024.0;
215   const double kAnythingGoesWindow = std::max(4.0 * kOneMegabyte, bdp);
216   if (memory_pressure < kAnythingGoesPressure) {
217     return kAnythingGoesWindow;
218   } else if (memory_pressure < kAdjustedToBdpPressure) {
219     return lerp(memory_pressure, kAnythingGoesPressure, kAdjustedToBdpPressure,
220                 kAnythingGoesWindow, bdp);
221   } else if (memory_pressure < 1.0) {
222     return lerp(memory_pressure, kAdjustedToBdpPressure, 1.0, bdp, 0);
223   } else {
224     return 0;
225   }
226 }
227 
UpdateSetting(absl::string_view name,int64_t * desired_value,uint32_t new_desired_value,FlowControlAction * action,FlowControlAction & (FlowControlAction::* set)(FlowControlAction::Urgency,uint32_t))228 void TransportFlowControl::UpdateSetting(
229     absl::string_view name, int64_t* desired_value, uint32_t new_desired_value,
230     FlowControlAction* action,
231     FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
232                                                  uint32_t)) {
233   if (new_desired_value != *desired_value) {
234     GRPC_TRACE_LOG(flowctl, INFO)
235         << "[flowctl] UPDATE SETTING " << name << " from " << *desired_value
236         << " to " << new_desired_value;
237     // Reaching zero can only happen for initial window size, and if it occurs
238     // we really want to wake up writes and ensure all the queued stream
239     // window updates are flushed, since stream flow control operates
240     // differently at zero window size.
241     FlowControlAction::Urgency urgency =
242         FlowControlAction::Urgency::QUEUE_UPDATE;
243     if (*desired_value == 0 || new_desired_value == 0) {
244       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
245     }
246     *desired_value = new_desired_value;
247     (action->*set)(urgency, *desired_value);
248   }
249 }
250 
SetAckedInitialWindow(uint32_t value)251 FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) {
252   acked_init_window_ = value;
253   FlowControlAction action;
254   if (acked_init_window_ != target_initial_window_size_) {
255     FlowControlAction::Urgency urgency =
256         FlowControlAction::Urgency::QUEUE_UPDATE;
257     if (acked_init_window_ == 0 || target_initial_window_size_ == 0) {
258       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
259     }
260     action.set_send_initial_window_update(urgency, target_initial_window_size_);
261   }
262   return action;
263 }
264 
PeriodicUpdate()265 FlowControlAction TransportFlowControl::PeriodicUpdate() {
266   FlowControlAction action;
267   if (enable_bdp_probe_) {
268     // get bdp estimate and update initial_window accordingly.
269     // target might change based on how much memory pressure we are under
270     // TODO(ncteisen): experiment with setting target to be huge under low
271     // memory pressure.
272     uint32_t target = static_cast<uint32_t>(RoundUpToPowerOf2(
273         Clamp(TargetInitialWindowSizeBasedOnMemoryPressureAndBdp(), 0.0,
274               static_cast<double>(kMaxInitialWindowSize))));
275     if (target < kMinPositiveInitialWindowSize) target = 0;
276     if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
277       // Hook for simulating unusual flow control situations in tests.
278       target = g_test_only_transport_target_window_estimates_mocker
279                    ->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
280                        target_initial_window_size_ /* current target */);
281     }
282     // Though initial window 'could' drop to 0, we keep the floor at
283     // kMinInitialWindowSize
284     UpdateSetting(Http2Settings::initial_window_size_name(),
285                   &target_initial_window_size_,
286                   std::min(target, Http2Settings::max_initial_window_size()),
287                   &action, &FlowControlAction::set_send_initial_window_update);
288     // we target the max of BDP or bandwidth in microseconds.
289     UpdateSetting(Http2Settings::max_frame_size_name(), &target_frame_size_,
290                   Clamp(target, Http2Settings::min_max_frame_size(),
291                         Http2Settings::max_max_frame_size()),
292                   &action, &FlowControlAction::set_send_max_frame_size_update);
293 
294     if (IsTcpFrameSizeTuningEnabled()) {
295       // Advertise PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to peer. By advertising
296       // PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to the peer, we are informing the
297       // peer that we have tcp frame size tuning enabled and we inform it of our
298       // preferred rx frame sizes. The preferred rx frame size is determined as:
299       // Clamp(target_frame_size_ * 2, 16384, 0x7fffffff). In the future, this
300       // maybe updated to a different function of the memory pressure.
301       UpdateSetting(
302           Http2Settings::preferred_receive_crypto_message_size_name(),
303           &target_preferred_rx_crypto_frame_size_,
304           Clamp(static_cast<unsigned int>(target_frame_size_ * 2),
305                 Http2Settings::min_preferred_receive_crypto_message_size(),
306                 Http2Settings::max_preferred_receive_crypto_message_size()),
307           &action,
308           &FlowControlAction::set_preferred_rx_crypto_frame_size_update);
309     }
310   }
311   return UpdateAction(action);
312 }
313 
ToString() const314 std::string TransportFlowControl::Stats::ToString() const {
315   return absl::StrCat("target_window: ", target_window,
316                       " target_frame_size: ", target_frame_size,
317                       " target_preferred_rx_crypto_frame_size: ",
318                       target_preferred_rx_crypto_frame_size,
319                       " acked_init_window: ", acked_init_window,
320                       " queued_init_window: ", queued_init_window,
321                       " sent_init_window: ", sent_init_window,
322                       " remote_window: ", remote_window,
323                       " announced_window: ", announced_window,
324                       " announced_stream_total_over_incoming_window: ",
325                       announced_stream_total_over_incoming_window,
326                       " bdp_accumulator: ", bdp_accumulator,
327                       " bdp_estimate: ", bdp_estimate,
328                       " bdp_bw_est: ", bdp_bw_est);
329 }
330 
SentUpdate(uint32_t announce)331 void StreamFlowControl::SentUpdate(uint32_t announce) {
332   TransportFlowControl::IncomingUpdateContext tfc_upd(tfc_);
333   pending_size_ = absl::nullopt;
334   tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce);
335   CHECK_EQ(DesiredAnnounceSize(), 0u);
336   std::ignore = tfc_upd.MakeAction();
337 }
338 
DesiredAnnounceSize() const339 uint32_t StreamFlowControl::DesiredAnnounceSize() const {
340   int64_t desired_window_delta = [this]() {
341     if (min_progress_size_ == 0) {
342       if (pending_size_.has_value() &&
343           announced_window_delta_ < -*pending_size_) {
344         return -*pending_size_;
345       } else {
346         return announced_window_delta_;
347       }
348     } else {
349       return std::min(min_progress_size_, kMaxWindowDelta);
350     }
351   }();
352   return Clamp(desired_window_delta - announced_window_delta_, int64_t{0},
353                kMaxWindowUpdateSize);
354 }
355 
UpdateAction(FlowControlAction action)356 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
357   const int64_t desired_announce_size = DesiredAnnounceSize();
358   if (desired_announce_size > 0) {
359     FlowControlAction::Urgency urgency =
360         FlowControlAction::Urgency::QUEUE_UPDATE;
361     // Size at which we probably want to wake up and write regardless of whether
362     // we *have* to.
363     // Currently set at half the initial window size or 8kb (whichever is
364     // greater). 8kb means we don't send rapidly unnecessarily when the initial
365     // window size is small.
366     const int64_t hurry_up_size = std::max(
367         static_cast<int64_t>(tfc_->queued_init_window()) / 2, int64_t{8192});
368     if (desired_announce_size > hurry_up_size) {
369       urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
370     }
371     // min_progress_size_ > 0 means we have a reader ready to read.
372     if (min_progress_size_ > 0) {
373       if (announced_window_delta_ <=
374           -static_cast<int64_t>(tfc_->sent_init_window()) / 2) {
375         urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
376       }
377     }
378     action.set_send_stream_update(urgency);
379   }
380   return action;
381 }
382 
SetPendingSize(int64_t pending_size)383 void StreamFlowControl::IncomingUpdateContext::SetPendingSize(
384     int64_t pending_size) {
385   CHECK_GE(pending_size, 0);
386   sfc_->pending_size_ = pending_size;
387 }
388 
ToString() const389 std::string StreamFlowControl::Stats::ToString() const {
390   return absl::StrCat("min_progress_size: ", min_progress_size,
391                       " remote_window_delta: ", remote_window_delta,
392                       " announced_window_delta: ", announced_window_delta,
393                       pending_size.has_value() ? *pending_size : -1);
394 }
395 
396 }  // namespace chttp2
397 }  // namespace grpc_core
398