• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <math.h>
26 #include <string.h>
27 
28 #include <string>
29 
30 #include "absl/strings/str_format.h"
31 
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34 
35 #include "src/core/ext/transport/chttp2/transport/internal.h"
36 #include "src/core/lib/gpr/string.h"
37 
38 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
39 
40 namespace grpc_core {
41 namespace chttp2 {
42 
43 TestOnlyTransportTargetWindowEstimatesMocker*
44     g_test_only_transport_target_window_estimates_mocker;
45 
46 namespace {
47 
48 static constexpr const int kTracePadding = 30;
49 static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
50 
fmt_int64_diff_str(int64_t old_val,int64_t new_val)51 static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
52   std::string str;
53   if (old_val != new_val) {
54     str = absl::StrFormat("%" PRId64 " -> %" PRId64 "", old_val, new_val);
55   } else {
56     str = absl::StrFormat("%" PRId64 "", old_val);
57   }
58   return gpr_leftpad(str.c_str(), ' ', kTracePadding);
59 }
60 
fmt_uint32_diff_str(uint32_t old_val,uint32_t new_val)61 static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
62   std::string str;
63   if (old_val != new_val) {
64     str = absl::StrFormat("%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
65   } else {
66     str = absl::StrFormat("%" PRIu32 "", old_val);
67   }
68   return gpr_leftpad(str.c_str(), ' ', kTracePadding);
69 }
70 }  // namespace
71 
Init(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)72 void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
73                             StreamFlowControl* sfc) {
74   tfc_ = tfc;
75   sfc_ = sfc;
76   reason_ = reason;
77   remote_window_ = tfc->remote_window();
78   target_window_ = tfc->target_window();
79   announced_window_ = tfc->announced_window();
80   if (sfc != nullptr) {
81     remote_window_delta_ = sfc->remote_window_delta();
82     local_window_delta_ = sfc->local_window_delta();
83     announced_window_delta_ = sfc->announced_window_delta();
84   }
85 }
86 
Finish()87 void FlowControlTrace::Finish() {
88   uint32_t acked_local_window =
89       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
90                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
91   uint32_t remote_window =
92       tfc_->transport()->settings[GRPC_PEER_SETTINGS]
93                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
94   char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
95   char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
96   char* taw_str =
97       fmt_int64_diff_str(announced_window_, tfc_->announced_window());
98   char* srw_str;
99   char* slw_str;
100   char* saw_str;
101   if (sfc_ != nullptr) {
102     srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
103                                  sfc_->remote_window_delta() + remote_window);
104     slw_str =
105         fmt_int64_diff_str(local_window_delta_ + acked_local_window,
106                            sfc_->local_window_delta() + acked_local_window);
107     saw_str =
108         fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
109                            sfc_->announced_window_delta() + acked_local_window);
110   } else {
111     srw_str = gpr_leftpad("", ' ', kTracePadding);
112     slw_str = gpr_leftpad("", ' ', kTracePadding);
113     saw_str = gpr_leftpad("", ' ', kTracePadding);
114   }
115   gpr_log(GPR_DEBUG,
116           "%p[%u][%s] | %s | trw:%s, tlw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
117           tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
118           tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
119           tlw_str, taw_str, srw_str, slw_str, saw_str);
120   gpr_free(trw_str);
121   gpr_free(tlw_str);
122   gpr_free(taw_str);
123   gpr_free(srw_str);
124   gpr_free(slw_str);
125   gpr_free(saw_str);
126 }
127 
UrgencyString(Urgency u)128 const char* FlowControlAction::UrgencyString(Urgency u) {
129   switch (u) {
130     case Urgency::NO_ACTION_NEEDED:
131       return "no action";
132     case Urgency::UPDATE_IMMEDIATELY:
133       return "update immediately";
134     case Urgency::QUEUE_UPDATE:
135       return "queue update";
136     default:
137       GPR_UNREACHABLE_CODE(return "unknown");
138   }
139   GPR_UNREACHABLE_CODE(return "unknown");
140 }
141 
Trace(grpc_chttp2_transport * t) const142 void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
143   char* iw_str = fmt_uint32_diff_str(
144       t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
145       initial_window_size_);
146   char* mf_str = fmt_uint32_diff_str(
147       t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
148       max_frame_size_);
149   gpr_log(GPR_DEBUG, "t[%s],  s[%s], iw:%s:%s mf:%s:%s",
150           UrgencyString(send_transport_update_),
151           UrgencyString(send_stream_update_),
152           UrgencyString(send_initial_window_update_), iw_str,
153           UrgencyString(send_max_frame_size_update_), mf_str);
154   gpr_free(iw_str);
155   gpr_free(mf_str);
156 }
157 
TransportFlowControlDisabled(grpc_chttp2_transport * t)158 TransportFlowControlDisabled::TransportFlowControlDisabled(
159     grpc_chttp2_transport* t) {
160   remote_window_ = kMaxWindow;
161   target_initial_window_size_ = kMaxWindow;
162   announced_window_ = kMaxWindow;
163   t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
164       kFrameSize;
165   t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
166       kFrameSize;
167   t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
168       kFrameSize;
169   t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
170       kMaxWindow;
171   t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
172       kMaxWindow;
173   t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
174       kMaxWindow;
175 }
176 
TransportFlowControl(const grpc_chttp2_transport * t,bool enable_bdp_probe)177 TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
178                                            bool enable_bdp_probe)
179     : t_(t),
180       enable_bdp_probe_(enable_bdp_probe),
181       bdp_estimator_(t->peer_string.c_str()),
182       pid_controller_(grpc_core::PidController::Args()
183                           .set_gain_p(4)
184                           .set_gain_i(8)
185                           .set_gain_d(0)
186                           .set_initial_control_value(TargetLogBdp())
187                           .set_min_control_value(-1)
188                           .set_max_control_value(25)
189                           .set_integral_range(10)),
190       last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {}
191 
MaybeSendUpdate(bool writing_anyway)192 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
193   FlowControlTrace trace("t updt sent", this, nullptr);
194   const uint32_t target_announced_window =
195       static_cast<uint32_t>(target_window());
196   if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
197       announced_window_ != target_announced_window) {
198     const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
199         target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
200     announced_window_ += announce;
201     return announce;
202   }
203   return 0;
204 }
205 
ValidateRecvData(int64_t incoming_frame_size)206 grpc_error* TransportFlowControl::ValidateRecvData(
207     int64_t incoming_frame_size) {
208   if (incoming_frame_size > announced_window_) {
209     return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
210         absl::StrFormat("frame of size %" PRId64
211                         " overflows local window of %" PRId64,
212                         incoming_frame_size, announced_window_)
213             .c_str());
214   }
215   return GRPC_ERROR_NONE;
216 }
217 
StreamFlowControl(TransportFlowControl * tfc,const grpc_chttp2_stream * s)218 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
219                                      const grpc_chttp2_stream* s)
220     : tfc_(tfc), s_(s) {}
221 
RecvData(int64_t incoming_frame_size)222 grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
223   FlowControlTrace trace("  data recv", tfc_, this);
224 
225   grpc_error* error = GRPC_ERROR_NONE;
226   error = tfc_->ValidateRecvData(incoming_frame_size);
227   if (error != GRPC_ERROR_NONE) return error;
228 
229   uint32_t sent_init_window =
230       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
231                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
232   uint32_t acked_init_window =
233       tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
234                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
235 
236   int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
237   int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
238   if (incoming_frame_size > acked_stream_window) {
239     if (incoming_frame_size <= sent_stream_window) {
240       gpr_log(GPR_ERROR,
241               "Incoming frame of size %" PRId64
242               " exceeds local window size of %" PRId64
243               ".\n"
244               "The (un-acked, future) window size would be %" PRId64
245               " which is not exceeded.\n"
246               "This would usually cause a disconnection, but allowing it due to"
247               "broken HTTP2 implementations in the wild.\n"
248               "See (for example) https://github.com/netty/netty/issues/6520.",
249               incoming_frame_size, acked_stream_window, sent_stream_window);
250     } else {
251       return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
252           absl::StrFormat("frame of size %" PRId64
253                           " overflows local window of %" PRId64,
254                           incoming_frame_size, acked_stream_window)
255               .c_str());
256     }
257   }
258 
259   UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
260   local_window_delta_ -= incoming_frame_size;
261   tfc_->CommitRecvData(incoming_frame_size);
262   return GRPC_ERROR_NONE;
263 }
264 
MaybeSendUpdate()265 uint32_t StreamFlowControl::MaybeSendUpdate() {
266   FlowControlTrace trace("s updt sent", tfc_, this);
267   if (local_window_delta_ > announced_window_delta_) {
268     uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
269         local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
270     UpdateAnnouncedWindowDelta(tfc_, announce);
271     return announce;
272   }
273   return 0;
274 }
275 
IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)276 void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
277                                                  size_t have_already) {
278   FlowControlTrace trace("app st recv", tfc_, this);
279   uint32_t max_recv_bytes;
280   uint32_t sent_init_window =
281       tfc_->transport()->settings[GRPC_SENT_SETTINGS]
282                                  [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
283 
284   /* clamp max recv hint to an allowable size */
285   if (max_size_hint >= kMaxWindowUpdateSize - sent_init_window) {
286     max_recv_bytes = kMaxWindowUpdateSize - sent_init_window;
287   } else {
288     max_recv_bytes = static_cast<uint32_t>(max_size_hint);
289   }
290 
291   /* account for bytes already received but unknown to higher layers */
292   if (max_recv_bytes >= have_already) {
293     max_recv_bytes -= static_cast<uint32_t>(have_already);
294   } else {
295     max_recv_bytes = 0;
296   }
297 
298   /* add some small lookahead to keep pipelines flowing */
299   GPR_DEBUG_ASSERT(max_recv_bytes <= kMaxWindowUpdateSize - sent_init_window);
300   if (local_window_delta_ < max_recv_bytes) {
301     uint32_t add_max_recv_bytes =
302         static_cast<uint32_t>(max_recv_bytes - local_window_delta_);
303     local_window_delta_ += add_max_recv_bytes;
304   }
305 }
306 
307 // Take in a target and modifies it based on the memory pressure of the system
AdjustForMemoryPressure(grpc_resource_quota * quota,double target)308 static double AdjustForMemoryPressure(grpc_resource_quota* quota,
309                                       double target) {
310   // do not increase window under heavy memory pressure.
311   double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
312   static const double kLowMemPressure = 0.1;
313   static const double kZeroTarget = 22;
314   static const double kHighMemPressure = 0.8;
315   static const double kMaxMemPressure = 0.9;
316   if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
317     target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
318              kZeroTarget;
319   } else if (memory_pressure > kHighMemPressure) {
320     target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) /
321                                  (kMaxMemPressure - kHighMemPressure));
322   }
323   return target;
324 }
325 
TargetLogBdp()326 double TransportFlowControl::TargetLogBdp() {
327   return AdjustForMemoryPressure(
328       grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
329       1 + log2(bdp_estimator_.EstimateBdp()));
330 }
331 
SmoothLogBdp(double value)332 double TransportFlowControl::SmoothLogBdp(double value) {
333   grpc_millis now = grpc_core::ExecCtx::Get()->Now();
334   double bdp_error = value - pid_controller_.last_control_value();
335   const double dt = static_cast<double>(now - last_pid_update_) * 1e-3;
336   last_pid_update_ = now;
337   // Limit dt to 100ms
338   const double kMaxDt = 0.1;
339   return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
340 }
341 
DeltaUrgency(int64_t value,grpc_chttp2_setting_id setting_id)342 FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
343     int64_t value, grpc_chttp2_setting_id setting_id) {
344   int64_t delta = value - static_cast<int64_t>(
345                               t_->settings[GRPC_LOCAL_SETTINGS][setting_id]);
346   // TODO(ncteisen): tune this
347   if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
348     return FlowControlAction::Urgency::QUEUE_UPDATE;
349   } else {
350     return FlowControlAction::Urgency::NO_ACTION_NEEDED;
351   }
352 }
353 
PeriodicUpdate()354 FlowControlAction TransportFlowControl::PeriodicUpdate() {
355   FlowControlAction action;
356   if (enable_bdp_probe_) {
357     // get bdp estimate and update initial_window accordingly.
358     // target might change based on how much memory pressure we are under
359     // TODO(ncteisen): experiment with setting target to be huge under low
360     // memory pressure.
361     double target = pow(2, SmoothLogBdp(TargetLogBdp()));
362     if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
363       // Hook for simulating unusual flow control situations in tests.
364       target = g_test_only_transport_target_window_estimates_mocker
365                    ->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
366                        target_initial_window_size_ /* current target */);
367     }
368     // Though initial window 'could' drop to 0, we keep the floor at 128
369     target_initial_window_size_ =
370         static_cast<int32_t> GPR_CLAMP(target, 128, INT32_MAX);
371 
372     action.set_send_initial_window_update(
373         DeltaUrgency(target_initial_window_size_,
374                      GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
375         static_cast<uint32_t>(target_initial_window_size_));
376 
377     // get bandwidth estimate and update max_frame accordingly.
378     double bw_dbl = bdp_estimator_.EstimateBandwidth();
379     // we target the max of BDP or bandwidth in microseconds.
380     int32_t frame_size = static_cast<int32_t> GPR_CLAMP(
381         GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
382                 target_initial_window_size_),
383         16384, 16777215);
384     action.set_send_max_frame_size_update(
385         DeltaUrgency(static_cast<int64_t>(frame_size),
386                      GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
387         frame_size);
388   }
389   return UpdateAction(action);
390 }
391 
UpdateAction(FlowControlAction action)392 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
393   // TODO(ncteisen): tune this
394   if (!s_->read_closed) {
395     uint32_t sent_init_window =
396         tfc_->transport()->settings[GRPC_SENT_SETTINGS]
397                                    [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
398     if (local_window_delta_ > announced_window_delta_ &&
399         announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
400       action.set_send_stream_update(
401           FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
402     } else if (local_window_delta_ > announced_window_delta_) {
403       action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
404     }
405   }
406 
407   return action;
408 }
409 
410 }  // namespace chttp2
411 }  // namespace grpc_core
412