1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23
24 #include <algorithm>
25 #include <cmath>
26 #include <ostream>
27 #include <string>
28 #include <tuple>
29 #include <vector>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/strings/str_cat.h"
34 #include "absl/strings/str_format.h"
35 #include "absl/strings/str_join.h"
36 #include "src/core/ext/transport/chttp2/transport/http2_settings.h"
37 #include "src/core/lib/experiments/experiments.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "src/core/util/useful.h"
40
41 namespace grpc_core {
42 namespace chttp2 {
43
44 TestOnlyTransportTargetWindowEstimatesMocker*
45 g_test_only_transport_target_window_estimates_mocker;
46
47 namespace {
48
49 constexpr const int64_t kMaxWindowUpdateSize = (1u << 31) - 1;
50
51 } // namespace
52
UrgencyString(Urgency u)53 const char* FlowControlAction::UrgencyString(Urgency u) {
54 switch (u) {
55 case Urgency::NO_ACTION_NEEDED:
56 return "no-action";
57 case Urgency::UPDATE_IMMEDIATELY:
58 return "now";
59 case Urgency::QUEUE_UPDATE:
60 return "queue";
61 default:
62 GPR_UNREACHABLE_CODE(return "unknown");
63 }
64 GPR_UNREACHABLE_CODE(return "unknown");
65 }
66
operator <<(std::ostream & out,FlowControlAction::Urgency u)67 std::ostream& operator<<(std::ostream& out, FlowControlAction::Urgency u) {
68 return out << FlowControlAction::UrgencyString(u);
69 }
70
DebugString() const71 std::string FlowControlAction::DebugString() const {
72 std::vector<std::string> segments;
73 if (send_transport_update_ != Urgency::NO_ACTION_NEEDED) {
74 segments.push_back(
75 absl::StrCat("t:", UrgencyString(send_transport_update_)));
76 }
77 if (send_stream_update_ != Urgency::NO_ACTION_NEEDED) {
78 segments.push_back(absl::StrCat("s:", UrgencyString(send_stream_update_)));
79 }
80 if (send_initial_window_update_ != Urgency::NO_ACTION_NEEDED) {
81 segments.push_back(
82 absl::StrCat("iw=", initial_window_size_, ":",
83 UrgencyString(send_initial_window_update_)));
84 }
85 if (send_max_frame_size_update_ != Urgency::NO_ACTION_NEEDED) {
86 segments.push_back(
87 absl::StrCat("mf=", max_frame_size_, ":",
88 UrgencyString(send_max_frame_size_update_)));
89 }
90 if (segments.empty()) return "no action";
91 return absl::StrJoin(segments, ",");
92 }
93
operator <<(std::ostream & out,const FlowControlAction & action)94 std::ostream& operator<<(std::ostream& out, const FlowControlAction& action) {
95 return out << action.DebugString();
96 }
97
TransportFlowControl(absl::string_view name,bool enable_bdp_probe,MemoryOwner * memory_owner)98 TransportFlowControl::TransportFlowControl(absl::string_view name,
99 bool enable_bdp_probe,
100 MemoryOwner* memory_owner)
101 : memory_owner_(memory_owner),
102 enable_bdp_probe_(enable_bdp_probe),
103 bdp_estimator_(name) {}
104
DesiredAnnounceSize(bool writing_anyway) const105 uint32_t TransportFlowControl::DesiredAnnounceSize(bool writing_anyway) const {
106 const uint32_t target_announced_window =
107 static_cast<uint32_t>(target_window());
108 if ((writing_anyway || announced_window_ <= target_announced_window / 2) &&
109 announced_window_ != target_announced_window) {
110 return Clamp(target_announced_window - announced_window_, int64_t{0},
111 kMaxWindowUpdateSize);
112 }
113 return 0;
114 }
115
SentUpdate(uint32_t announce)116 void TransportFlowControl::SentUpdate(uint32_t announce) {
117 announced_window_ += announce;
118 }
119
StreamFlowControl(TransportFlowControl * tfc)120 StreamFlowControl::StreamFlowControl(TransportFlowControl* tfc) : tfc_(tfc) {}
121
RecvData(int64_t incoming_frame_size)122 absl::Status StreamFlowControl::IncomingUpdateContext::RecvData(
123 int64_t incoming_frame_size) {
124 return tfc_upd_.RecvData(incoming_frame_size, [this, incoming_frame_size]() {
125 int64_t acked_stream_window =
126 sfc_->announced_window_delta_ + sfc_->tfc_->acked_init_window();
127 if (incoming_frame_size > acked_stream_window) {
128 return absl::InternalError(absl::StrFormat(
129 "frame of size %" PRId64 " overflows local window of %" PRId64,
130 incoming_frame_size, acked_stream_window));
131 }
132
133 tfc_upd_.UpdateAnnouncedWindowDelta(&sfc_->announced_window_delta_,
134 -incoming_frame_size);
135 sfc_->min_progress_size_ -=
136 std::min(sfc_->min_progress_size_, incoming_frame_size);
137 return absl::OkStatus();
138 });
139 }
140
RecvData(int64_t incoming_frame_size,absl::FunctionRef<absl::Status ()> stream)141 absl::Status TransportFlowControl::IncomingUpdateContext::RecvData(
142 int64_t incoming_frame_size, absl::FunctionRef<absl::Status()> stream) {
143 if (incoming_frame_size > tfc_->announced_window_) {
144 return absl::InternalError(absl::StrFormat(
145 "frame of size %" PRId64 " overflows local window of %" PRId64,
146 incoming_frame_size, tfc_->announced_window_));
147 }
148 absl::Status error = stream();
149 if (!error.ok()) return error;
150 tfc_->announced_window_ -= incoming_frame_size;
151 return absl::OkStatus();
152 }
153
target_window() const154 int64_t TransportFlowControl::target_window() const {
155 // See comment above announced_stream_total_over_incoming_window_ for the
156 // logic behind this decision.
157 return static_cast<uint32_t>(
158 std::min(static_cast<int64_t>((1u << 31) - 1),
159 announced_stream_total_over_incoming_window_ +
160 std::max<int64_t>(1, target_initial_window_size_)));
161 }
162
UpdateAction(FlowControlAction action)163 FlowControlAction TransportFlowControl::UpdateAction(FlowControlAction action) {
164 const int64_t target = target_window();
165 // round up so that one byte targets are sent.
166 const int64_t send_threshold = (target + 1) / 2;
167 if (announced_window_ < send_threshold) {
168 action.set_send_transport_update(
169 FlowControlAction::Urgency::UPDATE_IMMEDIATELY);
170 }
171 return action;
172 }
173
174 double
TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const175 TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
176 const {
177 const double bdp = bdp_estimator_.EstimateBdp() * 2.0;
178 const double memory_pressure =
179 memory_owner_->GetPressureInfo().pressure_control_value;
180 // Linear interpolation between two values.
181 // Given a line segment between the two points (t_min, a), and (t_max, b),
182 // and a value t such that t_min <= t <= t_max, return the value on the line
183 // segment at t.
184 auto lerp = [](double t, double t_min, double t_max, double a, double b) {
185 return a + ((b - a) * (t - t_min) / (t_max - t_min));
186 };
187 // We split memory pressure into three broad regions:
188 // 1. Low memory pressure, the "anything goes" case - we assume no memory
189 // pressure concerns and advertise a huge window to keep things flowing.
190 // 2. Moderate memory pressure, the "adjust to BDP" case - we linearly ramp
191 // down window size to 2*BDP - which should still allow bytes to flow, but
192 // is arguably more considered.
193 // 3. High memory pressure - past 50% we linearly ramp down window size from
194 // BDP to 0 - at which point senders effectively must request to send bytes
195 // to us.
196 //
197 // ▲
198 // │
199 // 4mb ────┤---------x----
200 // │ -----
201 // BDP ────┤ ----x---
202 // │ ----
203 // │ -----
204 // │ ----
205 // │ -----
206 // │ ---x
207 // ├─────────┬─────────────┬────────────────────────┬─────►
208 // │Anything │Adjust to │Drop to zero │
209 // │Goes │BDP │ │
210 // 0% 20% 50% 100% memory
211 // pressure
212 const double kAnythingGoesPressure = 0.2;
213 const double kAdjustedToBdpPressure = 0.5;
214 const double kOneMegabyte = 1024.0 * 1024.0;
215 const double kAnythingGoesWindow = std::max(4.0 * kOneMegabyte, bdp);
216 if (memory_pressure < kAnythingGoesPressure) {
217 return kAnythingGoesWindow;
218 } else if (memory_pressure < kAdjustedToBdpPressure) {
219 return lerp(memory_pressure, kAnythingGoesPressure, kAdjustedToBdpPressure,
220 kAnythingGoesWindow, bdp);
221 } else if (memory_pressure < 1.0) {
222 return lerp(memory_pressure, kAdjustedToBdpPressure, 1.0, bdp, 0);
223 } else {
224 return 0;
225 }
226 }
227
UpdateSetting(absl::string_view name,int64_t * desired_value,uint32_t new_desired_value,FlowControlAction * action,FlowControlAction & (FlowControlAction::* set)(FlowControlAction::Urgency,uint32_t))228 void TransportFlowControl::UpdateSetting(
229 absl::string_view name, int64_t* desired_value, uint32_t new_desired_value,
230 FlowControlAction* action,
231 FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
232 uint32_t)) {
233 if (new_desired_value != *desired_value) {
234 GRPC_TRACE_LOG(flowctl, INFO)
235 << "[flowctl] UPDATE SETTING " << name << " from " << *desired_value
236 << " to " << new_desired_value;
237 // Reaching zero can only happen for initial window size, and if it occurs
238 // we really want to wake up writes and ensure all the queued stream
239 // window updates are flushed, since stream flow control operates
240 // differently at zero window size.
241 FlowControlAction::Urgency urgency =
242 FlowControlAction::Urgency::QUEUE_UPDATE;
243 if (*desired_value == 0 || new_desired_value == 0) {
244 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
245 }
246 *desired_value = new_desired_value;
247 (action->*set)(urgency, *desired_value);
248 }
249 }
250
SetAckedInitialWindow(uint32_t value)251 FlowControlAction TransportFlowControl::SetAckedInitialWindow(uint32_t value) {
252 acked_init_window_ = value;
253 FlowControlAction action;
254 if (acked_init_window_ != target_initial_window_size_) {
255 FlowControlAction::Urgency urgency =
256 FlowControlAction::Urgency::QUEUE_UPDATE;
257 if (acked_init_window_ == 0 || target_initial_window_size_ == 0) {
258 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
259 }
260 action.set_send_initial_window_update(urgency, target_initial_window_size_);
261 }
262 return action;
263 }
264
PeriodicUpdate()265 FlowControlAction TransportFlowControl::PeriodicUpdate() {
266 FlowControlAction action;
267 if (enable_bdp_probe_) {
268 // get bdp estimate and update initial_window accordingly.
269 // target might change based on how much memory pressure we are under
270 // TODO(ncteisen): experiment with setting target to be huge under low
271 // memory pressure.
272 uint32_t target = static_cast<uint32_t>(RoundUpToPowerOf2(
273 Clamp(TargetInitialWindowSizeBasedOnMemoryPressureAndBdp(), 0.0,
274 static_cast<double>(kMaxInitialWindowSize))));
275 if (target < kMinPositiveInitialWindowSize) target = 0;
276 if (g_test_only_transport_target_window_estimates_mocker != nullptr) {
277 // Hook for simulating unusual flow control situations in tests.
278 target = g_test_only_transport_target_window_estimates_mocker
279 ->ComputeNextTargetInitialWindowSizeFromPeriodicUpdate(
280 target_initial_window_size_ /* current target */);
281 }
282 // Though initial window 'could' drop to 0, we keep the floor at
283 // kMinInitialWindowSize
284 UpdateSetting(Http2Settings::initial_window_size_name(),
285 &target_initial_window_size_,
286 std::min(target, Http2Settings::max_initial_window_size()),
287 &action, &FlowControlAction::set_send_initial_window_update);
288 // we target the max of BDP or bandwidth in microseconds.
289 UpdateSetting(Http2Settings::max_frame_size_name(), &target_frame_size_,
290 Clamp(target, Http2Settings::min_max_frame_size(),
291 Http2Settings::max_max_frame_size()),
292 &action, &FlowControlAction::set_send_max_frame_size_update);
293
294 if (IsTcpFrameSizeTuningEnabled()) {
295 // Advertise PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to peer. By advertising
296 // PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to the peer, we are informing the
297 // peer that we have tcp frame size tuning enabled and we inform it of our
298 // preferred rx frame sizes. The preferred rx frame size is determined as:
299 // Clamp(target_frame_size_ * 2, 16384, 0x7fffffff). In the future, this
300 // maybe updated to a different function of the memory pressure.
301 UpdateSetting(
302 Http2Settings::preferred_receive_crypto_message_size_name(),
303 &target_preferred_rx_crypto_frame_size_,
304 Clamp(static_cast<unsigned int>(target_frame_size_ * 2),
305 Http2Settings::min_preferred_receive_crypto_message_size(),
306 Http2Settings::max_preferred_receive_crypto_message_size()),
307 &action,
308 &FlowControlAction::set_preferred_rx_crypto_frame_size_update);
309 }
310 }
311 return UpdateAction(action);
312 }
313
ToString() const314 std::string TransportFlowControl::Stats::ToString() const {
315 return absl::StrCat("target_window: ", target_window,
316 " target_frame_size: ", target_frame_size,
317 " target_preferred_rx_crypto_frame_size: ",
318 target_preferred_rx_crypto_frame_size,
319 " acked_init_window: ", acked_init_window,
320 " queued_init_window: ", queued_init_window,
321 " sent_init_window: ", sent_init_window,
322 " remote_window: ", remote_window,
323 " announced_window: ", announced_window,
324 " announced_stream_total_over_incoming_window: ",
325 announced_stream_total_over_incoming_window,
326 " bdp_accumulator: ", bdp_accumulator,
327 " bdp_estimate: ", bdp_estimate,
328 " bdp_bw_est: ", bdp_bw_est);
329 }
330
SentUpdate(uint32_t announce)331 void StreamFlowControl::SentUpdate(uint32_t announce) {
332 TransportFlowControl::IncomingUpdateContext tfc_upd(tfc_);
333 pending_size_ = absl::nullopt;
334 tfc_upd.UpdateAnnouncedWindowDelta(&announced_window_delta_, announce);
335 CHECK_EQ(DesiredAnnounceSize(), 0u);
336 std::ignore = tfc_upd.MakeAction();
337 }
338
DesiredAnnounceSize() const339 uint32_t StreamFlowControl::DesiredAnnounceSize() const {
340 int64_t desired_window_delta = [this]() {
341 if (min_progress_size_ == 0) {
342 if (pending_size_.has_value() &&
343 announced_window_delta_ < -*pending_size_) {
344 return -*pending_size_;
345 } else {
346 return announced_window_delta_;
347 }
348 } else {
349 return std::min(min_progress_size_, kMaxWindowDelta);
350 }
351 }();
352 return Clamp(desired_window_delta - announced_window_delta_, int64_t{0},
353 kMaxWindowUpdateSize);
354 }
355
UpdateAction(FlowControlAction action)356 FlowControlAction StreamFlowControl::UpdateAction(FlowControlAction action) {
357 const int64_t desired_announce_size = DesiredAnnounceSize();
358 if (desired_announce_size > 0) {
359 FlowControlAction::Urgency urgency =
360 FlowControlAction::Urgency::QUEUE_UPDATE;
361 // Size at which we probably want to wake up and write regardless of whether
362 // we *have* to.
363 // Currently set at half the initial window size or 8kb (whichever is
364 // greater). 8kb means we don't send rapidly unnecessarily when the initial
365 // window size is small.
366 const int64_t hurry_up_size = std::max(
367 static_cast<int64_t>(tfc_->queued_init_window()) / 2, int64_t{8192});
368 if (desired_announce_size > hurry_up_size) {
369 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
370 }
371 // min_progress_size_ > 0 means we have a reader ready to read.
372 if (min_progress_size_ > 0) {
373 if (announced_window_delta_ <=
374 -static_cast<int64_t>(tfc_->sent_init_window()) / 2) {
375 urgency = FlowControlAction::Urgency::UPDATE_IMMEDIATELY;
376 }
377 }
378 action.set_send_stream_update(urgency);
379 }
380 return action;
381 }
382
SetPendingSize(int64_t pending_size)383 void StreamFlowControl::IncomingUpdateContext::SetPendingSize(
384 int64_t pending_size) {
385 CHECK_GE(pending_size, 0);
386 sfc_->pending_size_ = pending_size;
387 }
388
ToString() const389 std::string StreamFlowControl::Stats::ToString() const {
390 return absl::StrCat("min_progress_size: ", min_progress_size,
391 " remote_window_delta: ", remote_window_delta,
392 " announced_window_delta: ", announced_window_delta,
393 pending_size.has_value() ? *pending_size : -1);
394 }
395
396 } // namespace chttp2
397 } // namespace grpc_core
398