1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <math.h>
26 #include <string.h>
27
28 #include <string>
29
30 #include "absl/strings/str_format.h"
31
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/log.h>
34
35 #include "src/core/ext/transport/chttp2/transport/internal.h"
36 #include "src/core/lib/gpr/string.h"
37
38 grpc_core::TraceFlag grpc_flowctl_trace(false, "flowctl");
39
40 namespace grpc_core {
41 namespace chttp2 {
42
43 namespace {
44
45 static constexpr const int kTracePadding = 30;
46 static constexpr const uint32_t kMaxWindowUpdateSize = (1u << 31) - 1;
47
fmt_int64_diff_str(int64_t old_val,int64_t new_val)48 static char* fmt_int64_diff_str(int64_t old_val, int64_t new_val) {
49 std::string str;
50 if (old_val != new_val) {
51 str = absl::StrFormat("%" PRId64 " -> %" PRId64 "", old_val, new_val);
52 } else {
53 str = absl::StrFormat("%" PRId64 "", old_val);
54 }
55 return gpr_leftpad(str.c_str(), ' ', kTracePadding);
56 }
57
fmt_uint32_diff_str(uint32_t old_val,uint32_t new_val)58 static char* fmt_uint32_diff_str(uint32_t old_val, uint32_t new_val) {
59 std::string str;
60 if (old_val != new_val) {
61 str = absl::StrFormat("%" PRIu32 " -> %" PRIu32 "", old_val, new_val);
62 } else {
63 str = absl::StrFormat("%" PRIu32 "", old_val);
64 }
65 return gpr_leftpad(str.c_str(), ' ', kTracePadding);
66 }
67 } // namespace
68
Init(const char * reason,TransportFlowControl * tfc,StreamFlowControl * sfc)69 void FlowControlTrace::Init(const char* reason, TransportFlowControl* tfc,
70 StreamFlowControl* sfc) {
71 tfc_ = tfc;
72 sfc_ = sfc;
73 reason_ = reason;
74 remote_window_ = tfc->remote_window();
75 target_window_ = tfc->target_window();
76 announced_window_ = tfc->announced_window();
77 if (sfc != nullptr) {
78 remote_window_delta_ = sfc->remote_window_delta();
79 local_window_delta_ = sfc->local_window_delta();
80 announced_window_delta_ = sfc->announced_window_delta();
81 }
82 }
83
Finish()84 void FlowControlTrace::Finish() {
85 uint32_t acked_local_window =
86 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
87 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
88 uint32_t remote_window =
89 tfc_->transport()->settings[GRPC_PEER_SETTINGS]
90 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
91 char* trw_str = fmt_int64_diff_str(remote_window_, tfc_->remote_window());
92 char* tlw_str = fmt_int64_diff_str(target_window_, tfc_->target_window());
93 char* taw_str =
94 fmt_int64_diff_str(announced_window_, tfc_->announced_window());
95 char* srw_str;
96 char* slw_str;
97 char* saw_str;
98 if (sfc_ != nullptr) {
99 srw_str = fmt_int64_diff_str(remote_window_delta_ + remote_window,
100 sfc_->remote_window_delta() + remote_window);
101 slw_str =
102 fmt_int64_diff_str(local_window_delta_ + acked_local_window,
103 sfc_->local_window_delta() + acked_local_window);
104 saw_str =
105 fmt_int64_diff_str(announced_window_delta_ + acked_local_window,
106 sfc_->announced_window_delta() + acked_local_window);
107 } else {
108 srw_str = gpr_leftpad("", ' ', kTracePadding);
109 slw_str = gpr_leftpad("", ' ', kTracePadding);
110 saw_str = gpr_leftpad("", ' ', kTracePadding);
111 }
112 gpr_log(GPR_DEBUG,
113 "%p[%u][%s] | %s | trw:%s, tlw:%s, taw:%s, srw:%s, slw:%s, saw:%s",
114 tfc_, sfc_ != nullptr ? sfc_->stream()->id : 0,
115 tfc_->transport()->is_client ? "cli" : "svr", reason_, trw_str,
116 tlw_str, taw_str, srw_str, slw_str, saw_str);
117 gpr_free(trw_str);
118 gpr_free(tlw_str);
119 gpr_free(taw_str);
120 gpr_free(srw_str);
121 gpr_free(slw_str);
122 gpr_free(saw_str);
123 }
124
UrgencyString(Urgency u)125 const char* FlowControlAction::UrgencyString(Urgency u) {
126 switch (u) {
127 case Urgency::NO_ACTION_NEEDED:
128 return "no action";
129 case Urgency::UPDATE_IMMEDIATELY:
130 return "update immediately";
131 case Urgency::QUEUE_UPDATE:
132 return "queue update";
133 default:
134 GPR_UNREACHABLE_CODE(return "unknown");
135 }
136 GPR_UNREACHABLE_CODE(return "unknown");
137 }
138
Trace(grpc_chttp2_transport * t) const139 void FlowControlAction::Trace(grpc_chttp2_transport* t) const {
140 char* iw_str = fmt_uint32_diff_str(
141 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
142 initial_window_size_);
143 char* mf_str = fmt_uint32_diff_str(
144 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE],
145 max_frame_size_);
146 gpr_log(GPR_DEBUG, "t[%s], s[%s], iw:%s:%s mf:%s:%s",
147 UrgencyString(send_transport_update_),
148 UrgencyString(send_stream_update_),
149 UrgencyString(send_initial_window_update_), iw_str,
150 UrgencyString(send_max_frame_size_update_), mf_str);
151 gpr_free(iw_str);
152 gpr_free(mf_str);
153 }
154
TransportFlowControlDisabled(grpc_chttp2_transport * t)155 TransportFlowControlDisabled::TransportFlowControlDisabled(
156 grpc_chttp2_transport* t) {
157 remote_window_ = kMaxWindow;
158 target_initial_window_size_ = kMaxWindow;
159 announced_window_ = kMaxWindow;
160 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
161 kFrameSize;
162 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
163 kFrameSize;
164 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE] =
165 kFrameSize;
166 t->settings[GRPC_PEER_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
167 kMaxWindow;
168 t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
169 kMaxWindow;
170 t->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] =
171 kMaxWindow;
172 }
173
TransportFlowControl(const grpc_chttp2_transport * t,bool enable_bdp_probe)174 TransportFlowControl::TransportFlowControl(const grpc_chttp2_transport* t,
175 bool enable_bdp_probe)
176 : t_(t),
177 enable_bdp_probe_(enable_bdp_probe),
178 bdp_estimator_(t->peer_string),
179 pid_controller_(grpc_core::PidController::Args()
180 .set_gain_p(4)
181 .set_gain_i(8)
182 .set_gain_d(0)
183 .set_initial_control_value(TargetLogBdp())
184 .set_min_control_value(-1)
185 .set_max_control_value(25)
186 .set_integral_range(10)),
187 last_pid_update_(grpc_core::ExecCtx::Get()->Now()) {}
188
MaybeSendUpdate(bool writing_anyway)189 uint32_t TransportFlowControl::MaybeSendUpdate(bool writing_anyway) {
190 FlowControlTrace trace("t updt sent", this, nullptr);
191 const uint32_t target_announced_window =
192 static_cast<uint32_t>(target_window());
193 if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
194 announced_window_ != target_announced_window) {
195 const uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
196 target_announced_window - announced_window_, 0, kMaxWindowUpdateSize);
197 announced_window_ += announce;
198 return announce;
199 }
200 return 0;
201 }
202
ValidateRecvData(int64_t incoming_frame_size)203 grpc_error* TransportFlowControl::ValidateRecvData(
204 int64_t incoming_frame_size) {
205 if (incoming_frame_size > announced_window_) {
206 return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
207 absl::StrFormat("frame of size %" PRId64
208 " overflows local window of %" PRId64,
209 incoming_frame_size, announced_window_)
210 .c_str());
211 }
212 return GRPC_ERROR_NONE;
213 }
214
StreamFlowControl(TransportFlowControl * tfc,const grpc_chttp2_stream * s)215 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc,
216 const grpc_chttp2_stream* s)
217 : tfc_(tfc), s_(s) {}
218
RecvData(int64_t incoming_frame_size)219 grpc_error* StreamFlowControl::RecvData(int64_t incoming_frame_size) {
220 FlowControlTrace trace(" data recv", tfc_, this);
221
222 grpc_error* error = GRPC_ERROR_NONE;
223 error = tfc_->ValidateRecvData(incoming_frame_size);
224 if (error != GRPC_ERROR_NONE) return error;
225
226 uint32_t sent_init_window =
227 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
228 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
229 uint32_t acked_init_window =
230 tfc_->transport()->settings[GRPC_ACKED_SETTINGS]
231 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
232
233 int64_t acked_stream_window = announced_window_delta_ + acked_init_window;
234 int64_t sent_stream_window = announced_window_delta_ + sent_init_window;
235 if (incoming_frame_size > acked_stream_window) {
236 if (incoming_frame_size <= sent_stream_window) {
237 gpr_log(GPR_ERROR,
238 "Incoming frame of size %" PRId64
239 " exceeds local window size of %" PRId64
240 ".\n"
241 "The (un-acked, future) window size would be %" PRId64
242 " which is not exceeded.\n"
243 "This would usually cause a disconnection, but allowing it due to"
244 "broken HTTP2 implementations in the wild.\n"
245 "See (for example) https://github.com/netty/netty/issues/6520.",
246 incoming_frame_size, acked_stream_window, sent_stream_window);
247 } else {
248 return GRPC_ERROR_CREATE_FROM_COPIED_STRING(
249 absl::StrFormat("frame of size %" PRId64
250 " overflows local window of %" PRId64,
251 incoming_frame_size, acked_stream_window)
252 .c_str());
253 }
254 }
255
256 UpdateAnnouncedWindowDelta(tfc_, -incoming_frame_size);
257 local_window_delta_ -= incoming_frame_size;
258 tfc_->CommitRecvData(incoming_frame_size);
259 return GRPC_ERROR_NONE;
260 }
261
MaybeSendUpdate()262 uint32_t StreamFlowControl::MaybeSendUpdate() {
263 FlowControlTrace trace("s updt sent", tfc_, this);
264 if (local_window_delta_ > announced_window_delta_) {
265 uint32_t announce = static_cast<uint32_t> GPR_CLAMP(
266 local_window_delta_ - announced_window_delta_, 0, kMaxWindowUpdateSize);
267 UpdateAnnouncedWindowDelta(tfc_, announce);
268 return announce;
269 }
270 return 0;
271 }
272
IncomingByteStreamUpdate(size_t max_size_hint,size_t have_already)273 void StreamFlowControl::IncomingByteStreamUpdate(size_t max_size_hint,
274 size_t have_already) {
275 FlowControlTrace trace("app st recv", tfc_, this);
276 uint32_t max_recv_bytes;
277 uint32_t sent_init_window =
278 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
279 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
280
281 /* clamp max recv hint to an allowable size */
282 if (max_size_hint >= kMaxWindowUpdateSize - sent_init_window) {
283 max_recv_bytes = kMaxWindowUpdateSize - sent_init_window;
284 } else {
285 max_recv_bytes = static_cast<uint32_t>(max_size_hint);
286 }
287
288 /* account for bytes already received but unknown to higher layers */
289 if (max_recv_bytes >= have_already) {
290 max_recv_bytes -= static_cast<uint32_t>(have_already);
291 } else {
292 max_recv_bytes = 0;
293 }
294
295 /* add some small lookahead to keep pipelines flowing */
296 GPR_DEBUG_ASSERT(max_recv_bytes <= kMaxWindowUpdateSize - sent_init_window);
297 if (local_window_delta_ < max_recv_bytes) {
298 uint32_t add_max_recv_bytes =
299 static_cast<uint32_t>(max_recv_bytes - local_window_delta_);
300 local_window_delta_ += add_max_recv_bytes;
301 }
302 }
303
304 // Take in a target and modifies it based on the memory pressure of the system
AdjustForMemoryPressure(grpc_resource_quota * quota,double target)305 static double AdjustForMemoryPressure(grpc_resource_quota* quota,
306 double target) {
307 // do not increase window under heavy memory pressure.
308 double memory_pressure = grpc_resource_quota_get_memory_pressure(quota);
309 static const double kLowMemPressure = 0.1;
310 static const double kZeroTarget = 22;
311 static const double kHighMemPressure = 0.8;
312 static const double kMaxMemPressure = 0.9;
313 if (memory_pressure < kLowMemPressure && target < kZeroTarget) {
314 target = (target - kZeroTarget) * memory_pressure / kLowMemPressure +
315 kZeroTarget;
316 } else if (memory_pressure > kHighMemPressure) {
317 target *= 1 - GPR_MIN(1, (memory_pressure - kHighMemPressure) /
318 (kMaxMemPressure - kHighMemPressure));
319 }
320 return target;
321 }
322
TargetLogBdp()323 double TransportFlowControl::TargetLogBdp() {
324 return AdjustForMemoryPressure(
325 grpc_resource_user_quota(grpc_endpoint_get_resource_user(t_->ep)),
326 1 + log2(bdp_estimator_.EstimateBdp()));
327 }
328
SmoothLogBdp(double value)329 double TransportFlowControl::SmoothLogBdp(double value) {
330 grpc_millis now = grpc_core::ExecCtx::Get()->Now();
331 double bdp_error = value - pid_controller_.last_control_value();
332 const double dt = static_cast<double>(now - last_pid_update_) * 1e-3;
333 last_pid_update_ = now;
334 // Limit dt to 100ms
335 const double kMaxDt = 0.1;
336 return pid_controller_.Update(bdp_error, dt > kMaxDt ? kMaxDt : dt);
337 }
338
DeltaUrgency(int64_t value,grpc_chttp2_setting_id setting_id)339 FlowControlAction::Urgency TransportFlowControl::DeltaUrgency(
340 int64_t value, grpc_chttp2_setting_id setting_id) {
341 int64_t delta = value - static_cast<int64_t>(
342 t_->settings[GRPC_LOCAL_SETTINGS][setting_id]);
343 // TODO(ncteisen): tune this
344 if (delta != 0 && (delta <= -value / 5 || delta >= value / 5)) {
345 return FlowControlAction::Urgency::QUEUE_UPDATE;
346 } else {
347 return FlowControlAction::Urgency::NO_ACTION_NEEDED;
348 }
349 }
350
PeriodicUpdate()351 FlowControlAction TransportFlowControl::PeriodicUpdate() {
352 FlowControlAction action;
353 if (enable_bdp_probe_) {
354 // get bdp estimate and update initial_window accordingly.
355 // target might change based on how much memory pressure we are under
356 // TODO(ncteisen): experiment with setting target to be huge under low
357 // memory pressure.
358 const double target = pow(2, SmoothLogBdp(TargetLogBdp()));
359
360 // Though initial window 'could' drop to 0, we keep the floor at 128
361 target_initial_window_size_ =
362 static_cast<int32_t> GPR_CLAMP(target, 128, INT32_MAX);
363
364 action.set_send_initial_window_update(
365 DeltaUrgency(target_initial_window_size_,
366 GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE),
367 static_cast<uint32_t>(target_initial_window_size_));
368
369 // get bandwidth estimate and update max_frame accordingly.
370 double bw_dbl = bdp_estimator_.EstimateBandwidth();
371 // we target the max of BDP or bandwidth in microseconds.
372 int32_t frame_size = static_cast<int32_t> GPR_CLAMP(
373 GPR_MAX((int32_t)GPR_CLAMP(bw_dbl, 0, INT_MAX) / 1000,
374 target_initial_window_size_),
375 16384, 16777215);
376 action.set_send_max_frame_size_update(
377 DeltaUrgency(static_cast<int64_t>(frame_size),
378 GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE),
379 frame_size);
380 }
381 return UpdateAction(action);
382 }
383
UpdateAction(FlowControlAction action)384 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
385 // TODO(ncteisen): tune this
386 if (!s_->read_closed) {
387 uint32_t sent_init_window =
388 tfc_->transport()->settings[GRPC_SENT_SETTINGS]
389 [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
390 if (local_window_delta_ > announced_window_delta_ &&
391 announced_window_delta_ + sent_init_window <= sent_init_window / 2) {
392 action.set_send_stream_update(
393 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
394 } else if (local_window_delta_ > announced_window_delta_) {
395 action.set_send_stream_update(FlowControlAction::Urgency::QUEUE_UPDATE);
396 }
397 }
398
399 return action;
400 }
401
402 } // namespace chttp2
403 } // namespace grpc_core
404