1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/event_engine/memory_request.h>
16 #include <grpc/support/time.h>
17 #include <inttypes.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20
21 #include <algorithm>
22 #include <deque>
23 #include <functional>
24 #include <limits>
25 #include <map>
26 #include <memory>
27 #include <utility>
28 #include <vector>
29
30 #include "absl/base/attributes.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/types/optional.h"
35 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
36 #include "src/core/lib/experiments/config.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "src/core/lib/transport/bdp_estimator.h"
40 #include "src/core/util/time.h"
41 #include "src/core/util/useful.h"
42 #include "src/libfuzzer/libfuzzer_macro.h"
43 #include "test/core/test_util/fuzz_config_vars.h"
44 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
45
46 // IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
47
48 bool squelch = true;
49
50 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
51
52 namespace grpc_core {
53 namespace chttp2 {
54 namespace {
55
56 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
57
58 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)59 gpr_timespec now_impl(gpr_clock_type clock_type) {
60 CHECK(clock_type != GPR_TIMESPAN);
61 gpr_timespec ts = g_now;
62 ts.clock_type = clock_type;
63 return ts;
64 }
65
InitGlobals()66 void InitGlobals() {
67 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
68 TestOnlySetProcessEpoch(g_now);
69 gpr_now_impl = now_impl;
70 }
71
72 class FlowControlFuzzer {
73 public:
FlowControlFuzzer(bool enable_bdp)74 explicit FlowControlFuzzer(bool enable_bdp) {
75 ExecCtx exec_ctx;
76 tfc_ = std::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
77 &memory_owner_);
78 }
79
~FlowControlFuzzer()80 ~FlowControlFuzzer() {
81 ExecCtx exec_ctx;
82 streams_.clear();
83 tfc_.reset();
84 memory_owner_.Release(allocated_memory_);
85 }
86
87 void Perform(const flow_control_fuzzer::Action& action);
88 void AssertNoneStuck() const;
89 void AssertAnnouncedOverInitialWindowSizeCorrect() const;
90
91 private:
92 struct StreamPayload {
93 uint32_t id;
94 uint64_t size;
95 };
96
97 struct SendToRemote {
98 bool bdp_ping = false;
99 absl::optional<uint32_t> initial_window_size;
100 uint32_t transport_window_update;
101 std::vector<StreamPayload> stream_window_updates;
102 };
103
104 struct SendFromRemote {
105 bool bdp_pong = false;
106 absl::optional<uint32_t> ack_initial_window_size;
107 std::vector<StreamPayload> stream_writes;
108 };
109
110 struct Stream {
Streamgrpc_core::chttp2::__anon2ca8f0570111::FlowControlFuzzer::Stream111 explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
112 uint32_t id;
113 StreamFlowControl fc;
114 int64_t queued_writes = 0;
115 int64_t window_delta = 0;
116 };
117
118 void PerformAction(FlowControlAction action, Stream* stream);
GetStream(uint32_t id)119 Stream* GetStream(uint32_t id) {
120 auto it = streams_.find(id);
121 if (it == streams_.end()) {
122 it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
123 }
124 return &it->second;
125 }
GetStream(uint32_t id) const126 const Stream* GetStream(uint32_t id) const {
127 auto it = streams_.find(id);
128 if (it == streams_.end()) {
129 return nullptr;
130 }
131 return &it->second;
132 }
133
134 MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer");
135 MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner();
136 std::unique_ptr<TransportFlowControl> tfc_;
137 absl::optional<uint32_t> queued_initial_window_size_;
138 absl::optional<uint32_t> queued_send_max_frame_size_;
139 bool scheduled_write_ = false;
140 bool sending_initial_window_size_ = false;
141 std::deque<SendToRemote> send_to_remote_;
142 std::deque<SendFromRemote> send_from_remote_;
143 uint32_t remote_initial_window_size_ = kDefaultWindow;
144 int64_t remote_transport_window_size_ = kDefaultWindow;
145 std::map<uint32_t, Stream> streams_;
146 std::vector<uint32_t> streams_to_update_;
147 uint64_t allocated_memory_ = 0;
148 Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch();
149 };
150
Perform(const flow_control_fuzzer::Action & action)151 void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
152 ExecCtx exec_ctx;
153 bool sending_payload = false;
154 switch (action.action_case()) {
155 case flow_control_fuzzer::Action::ACTION_NOT_SET:
156 break;
157 case flow_control_fuzzer::Action::kSetMemoryQuota: {
158 memory_quota_->SetSize(
159 Clamp(action.set_memory_quota(), uint64_t{1},
160 static_cast<uint64_t>(std::numeric_limits<int64_t>::max())));
161 } break;
162 case flow_control_fuzzer::Action::kStepTimeMs: {
163 g_now = gpr_time_add(
164 g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t{1},
165 kMaxAdvanceTimeMillis),
166 GPR_TIMESPAN));
167 exec_ctx.InvalidateNow();
168 if (Timestamp::Now() >= next_bdp_ping_) {
169 scheduled_write_ = true;
170 }
171 } break;
172 case flow_control_fuzzer::Action::kPeriodicUpdate: {
173 PerformAction(tfc_->PeriodicUpdate(), nullptr);
174 } break;
175 case flow_control_fuzzer::Action::kPerformSendToRemote: {
176 scheduled_write_ = true;
177 } break;
178 case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
179 scheduled_write_ = true;
180 sending_payload = true;
181 } break;
182 case flow_control_fuzzer::Action::kReadSendToRemote: {
183 if (send_to_remote_.empty()) break;
184 auto sent_to_remote = send_to_remote_.front();
185 if (sent_to_remote.initial_window_size.has_value()) {
186 if (!squelch) {
187 fprintf(stderr, "Setting initial window size to %d\n",
188 sent_to_remote.initial_window_size.value());
189 }
190 SendFromRemote send_from_remote;
191 send_from_remote.ack_initial_window_size =
192 sent_to_remote.initial_window_size;
193 for (const auto& id_stream : streams_) {
194 CHECK(id_stream.second.window_delta +
195 *sent_to_remote.initial_window_size <=
196 (1u << 31) - 1);
197 }
198 remote_initial_window_size_ = *sent_to_remote.initial_window_size;
199 send_from_remote_.push_back(send_from_remote);
200 }
201 if (sent_to_remote.bdp_ping) {
202 SendFromRemote send_from_remote;
203 send_from_remote.bdp_pong = true;
204 send_from_remote_.push_back(send_from_remote);
205 }
206 for (auto stream_update : sent_to_remote.stream_window_updates) {
207 Stream* s = GetStream(stream_update.id);
208 if (!squelch) {
209 fprintf(stderr,
210 "[%" PRIu32 "]: increase window delta by %" PRIu64
211 " from %" PRId64 "\n",
212 stream_update.id, stream_update.size, s->window_delta);
213 }
214 s->window_delta += stream_update.size;
215 CHECK(s->window_delta <= chttp2::kMaxWindowDelta);
216 }
217 remote_transport_window_size_ += sent_to_remote.transport_window_update;
218 send_to_remote_.pop_front();
219 } break;
220 case flow_control_fuzzer::Action::kReadSendFromRemote: {
221 if (send_from_remote_.empty()) break;
222 auto sent_from_remote = send_from_remote_.front();
223 if (sent_from_remote.ack_initial_window_size.has_value()) {
224 if (!squelch) {
225 fprintf(stderr, "Received ACK for initial window size %d\n",
226 *sent_from_remote.ack_initial_window_size);
227 }
228 PerformAction(tfc_->SetAckedInitialWindow(
229 *sent_from_remote.ack_initial_window_size),
230 nullptr);
231 sending_initial_window_size_ = false;
232 }
233 if (sent_from_remote.bdp_pong) {
234 next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
235 }
236 for (const auto& stream_write : sent_from_remote.stream_writes) {
237 Stream* stream = GetStream(stream_write.id);
238 if (!squelch) {
239 fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
240 stream_write.id, stream_write.size);
241 }
242 if (auto* bdp = tfc_->bdp_estimator()) {
243 bdp->AddIncomingBytes(stream_write.size);
244 }
245 StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
246 CHECK_OK(upd.RecvData(stream_write.size));
247 PerformAction(upd.MakeAction(), stream);
248 }
249 send_from_remote_.pop_front();
250 } break;
251 case flow_control_fuzzer::Action::kStreamWrite: {
252 Stream* s = GetStream(action.stream_write().id());
253 s->queued_writes += action.stream_write().size();
254 } break;
255 case flow_control_fuzzer::Action::kPerformSendFromRemote: {
256 SendFromRemote send;
257 for (auto& id_stream : streams_) {
258 auto send_amount = std::min(
259 {id_stream.second.queued_writes, remote_transport_window_size_,
260 remote_initial_window_size_ + id_stream.second.window_delta});
261 if (send_amount <= 0) continue;
262 send.stream_writes.push_back(
263 {id_stream.first, static_cast<uint64_t>(send_amount)});
264 id_stream.second.queued_writes -= send_amount;
265 id_stream.second.window_delta -= send_amount;
266 remote_transport_window_size_ -= send_amount;
267 }
268 send_from_remote_.push_back(send);
269 } break;
270 case flow_control_fuzzer::Action::kSetMinProgressSize: {
271 Stream* s = GetStream(action.set_min_progress_size().id());
272 StreamFlowControl::IncomingUpdateContext upd(&s->fc);
273 upd.SetMinProgressSize(action.set_min_progress_size().size());
274 PerformAction(upd.MakeAction(), s);
275 } break;
276 case flow_control_fuzzer::Action::kAllocateMemory: {
277 auto allocate = std::min(
278 static_cast<size_t>(action.allocate_memory()),
279 grpc_event_engine::experimental::MemoryRequest::max_allowed_size());
280 allocated_memory_ += allocate;
281 memory_owner_.Reserve(allocate);
282 } break;
283 case flow_control_fuzzer::Action::kDeallocateMemory: {
284 auto deallocate = std::min(
285 static_cast<uint64_t>(action.deallocate_memory()), allocated_memory_);
286 allocated_memory_ -= deallocate;
287 memory_owner_.Release(deallocate);
288 } break;
289 case flow_control_fuzzer::Action::kSetPendingSize: {
290 Stream* s = GetStream(action.set_min_progress_size().id());
291 StreamFlowControl::IncomingUpdateContext upd(&s->fc);
292 upd.SetPendingSize(action.set_pending_size().size());
293 PerformAction(upd.MakeAction(), s);
294 } break;
295 }
296 if (scheduled_write_) {
297 SendToRemote send;
298 if (!squelch) {
299 fprintf(stderr, "**** PERFORM WRITE ****\n");
300 }
301 if (Timestamp::Now() >= next_bdp_ping_) {
302 if (auto* bdp = tfc_->bdp_estimator()) {
303 if (!squelch) {
304 fprintf(stderr, "- schedule bdp ping\n");
305 }
306 bdp->SchedulePing();
307 bdp->StartPing();
308 next_bdp_ping_ = Timestamp::InfFuture();
309 send.bdp_ping = true;
310 }
311 }
312 if (!sending_initial_window_size_ &&
313 queued_initial_window_size_.has_value()) {
314 if (!squelch) {
315 fprintf(stderr, "- send initial window %d\n",
316 *queued_initial_window_size_);
317 }
318 sending_initial_window_size_ = true;
319 send.initial_window_size =
320 std::exchange(queued_initial_window_size_, absl::nullopt);
321 tfc_->FlushedSettings();
322 }
323 std::vector<uint32_t> streams_to_update = std::move(streams_to_update_);
324 streams_to_update_.clear();
325 for (auto stream_id : streams_to_update) {
326 auto* stream = GetStream(stream_id);
327 auto size = stream->fc.MaybeSendUpdate();
328 if (!squelch) {
329 fprintf(stderr, "- send [%" PRId64 "] stream window update %db\n",
330 static_cast<int64_t>(stream->id), size);
331 }
332 send.stream_window_updates.push_back({stream->id, size});
333 }
334 send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
335 queued_send_max_frame_size_.reset();
336 send_to_remote_.emplace_back(std::move(send));
337 scheduled_write_ = false;
338 if (!squelch) {
339 fprintf(stderr, "**** FINISH WRITE ****\n");
340 }
341 }
342 }
343
PerformAction(FlowControlAction action,Stream * stream)344 void FlowControlFuzzer::PerformAction(FlowControlAction action,
345 Stream* stream) {
346 if (!squelch) {
347 fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
348 stream == nullptr ? int64_t{-1} : static_cast<int64_t>(stream->id),
349 action.DebugString().c_str());
350 }
351
352 auto with_urgency = [this](FlowControlAction::Urgency urgency,
353 std::function<void()> f) {
354 switch (urgency) {
355 case FlowControlAction::Urgency::NO_ACTION_NEEDED:
356 break;
357 case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
358 scheduled_write_ = true;
359 ABSL_FALLTHROUGH_INTENDED;
360 case FlowControlAction::Urgency::QUEUE_UPDATE:
361 f();
362 break;
363 }
364 };
365 with_urgency(action.send_stream_update(),
366 [this, stream]() { streams_to_update_.push_back(stream->id); });
367 with_urgency(action.send_transport_update(), []() {});
368 with_urgency(action.send_initial_window_update(), [this, &action]() {
369 CHECK(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
370 queued_initial_window_size_ = action.initial_window_size();
371 });
372 with_urgency(action.send_max_frame_size_update(), [this, &action]() {
373 queued_send_max_frame_size_ = action.max_frame_size();
374 });
375 }
376
AssertNoneStuck() const377 void FlowControlFuzzer::AssertNoneStuck() const {
378 CHECK(!scheduled_write_);
379
380 // Reconcile all the values to get the view of the remote that is knowable to
381 // the flow control system.
382 std::map<uint32_t, int64_t> reconciled_stream_deltas;
383 int64_t reconciled_transport_window = remote_transport_window_size_;
384 int64_t reconciled_initial_window = remote_initial_window_size_;
385 std::vector<uint64_t> inflight_send_initial_windows;
386 for (const auto& id_stream : streams_) {
387 reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
388 }
389
390 // Anything that's been sent from flow control -> remote needs to be added to
391 // the remote.
392 for (const auto& send_to_remote : send_to_remote_) {
393 if (send_to_remote.initial_window_size.has_value()) {
394 reconciled_initial_window = *send_to_remote.initial_window_size;
395 inflight_send_initial_windows.push_back(
396 *send_to_remote.initial_window_size);
397 }
398 reconciled_transport_window += send_to_remote.transport_window_update;
399 for (const auto& stream_update : send_to_remote.stream_window_updates) {
400 reconciled_stream_deltas[stream_update.id] += stream_update.size;
401 }
402 }
403
404 // Anything that's been sent from remote -> flow control needs to be wound
405 // back into the remote.
406 for (const auto& send_from_remote : send_from_remote_) {
407 for (const auto& stream_write : send_from_remote.stream_writes) {
408 reconciled_stream_deltas[stream_write.id] += stream_write.size;
409 reconciled_transport_window += stream_write.size;
410 }
411 }
412
413 // If we're sending an initial window size we get to consider a queued initial
414 // window size too: it'll be sent as soon as the remote acks the settings
415 // change, which it must.
416 if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) {
417 reconciled_initial_window = *queued_initial_window_size_;
418 inflight_send_initial_windows.push_back(*queued_initial_window_size_);
419 // And since we'll initiate a write, any updates that are queued to be
420 // written will be considered and send their desired updates.
421 reconciled_transport_window += tfc_->DesiredAnnounceSize(true);
422 for (auto stream_id : streams_to_update_) {
423 auto* stream = GetStream(stream_id);
424 if (stream == nullptr) continue;
425 reconciled_stream_deltas[stream_id] += stream->fc.DesiredAnnounceSize();
426 }
427 }
428
429 // Finally, if a stream has indicated it's willing to read, the reconciled
430 // remote *MUST* be in a state where it could send at least one byte.
431 for (const auto& id_stream : streams_) {
432 if (id_stream.second.fc.min_progress_size() == 0) continue;
433 int64_t stream_window =
434 reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
435 if (stream_window <= 0 || reconciled_transport_window <= 0) {
436 fprintf(stderr,
437 "FAILED: stream %d has stream_window=%" PRId64
438 ", transport_window=%" PRId64 ", delta=%" PRId64
439 ", init_window_size=%" PRId64 ", min_progress_size=%" PRId64
440 ", transport announced_stream_total_over_incoming_window=%" PRId64
441 ", transport announced_window=%" PRId64
442 " transport target_window=%" PRId64 " sent_init_window=%d\n",
443 id_stream.first, stream_window, reconciled_transport_window,
444 reconciled_stream_deltas[id_stream.first],
445 reconciled_initial_window,
446 (id_stream.second.fc.min_progress_size()),
447 tfc_->announced_stream_total_over_incoming_window(),
448 tfc_->announced_window(), tfc_->target_window(),
449 tfc_->sent_init_window());
450 fprintf(stderr,
451 "initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n",
452 remote_initial_window_size_,
453 absl::StrJoin(inflight_send_initial_windows, ",").c_str());
454 abort();
455 }
456 }
457 }
458
AssertAnnouncedOverInitialWindowSizeCorrect() const459 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
460 int64_t value_from_streams = 0;
461
462 for (const auto& id_stream : streams_) {
463 const auto& stream = id_stream.second;
464 if (stream.fc.announced_window_delta() > 0) {
465 value_from_streams += stream.fc.announced_window_delta();
466 }
467 }
468
469 CHECK(value_from_streams ==
470 tfc_->announced_stream_total_over_incoming_window());
471 }
472
473 } // namespace
474 } // namespace chttp2
475 } // namespace grpc_core
476
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg & msg)477 DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
478 grpc_core::ApplyFuzzConfigVars(msg.config_vars());
479 grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
480 grpc_core::chttp2::InitGlobals();
481 grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
482 for (const auto& action : msg.actions()) {
483 if (!squelch) {
484 fprintf(stderr, "%s\n", action.DebugString().c_str());
485 }
486 fuzzer.Perform(action);
487 fuzzer.AssertNoneStuck();
488 fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
489 }
490 }
491