1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <inttypes.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18
19 #include <algorithm>
20 #include <deque>
21 #include <functional>
22 #include <limits>
23 #include <map>
24 #include <memory>
25 #include <utility>
26 #include <vector>
27
28 #include "absl/base/attributes.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_join.h"
31 #include "absl/types/optional.h"
32
33 #include <grpc/event_engine/memory_request.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/time.h>
36
37 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
38 #include "src/core/lib/experiments/config.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/gprpp/time.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 #include "src/core/lib/transport/bdp_estimator.h"
44 #include "src/libfuzzer/libfuzzer_macro.h"
45 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
46 #include "test/core/util/fuzz_config_vars.h"
47
48 // IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
49
50 bool squelch = true;
51
52 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
53
54 namespace grpc_core {
55 namespace chttp2 {
56 namespace {
57
58 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
59
60 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)61 gpr_timespec now_impl(gpr_clock_type clock_type) {
62 GPR_ASSERT(clock_type != GPR_TIMESPAN);
63 gpr_timespec ts = g_now;
64 ts.clock_type = clock_type;
65 return ts;
66 }
67
InitGlobals()68 void InitGlobals() {
69 g_now = {1, 0, GPR_CLOCK_MONOTONIC};
70 TestOnlySetProcessEpoch(g_now);
71 gpr_now_impl = now_impl;
72 }
73
74 class FlowControlFuzzer {
75 public:
FlowControlFuzzer(bool enable_bdp)76 explicit FlowControlFuzzer(bool enable_bdp) {
77 ExecCtx exec_ctx;
78 tfc_ = std::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
79 &memory_owner_);
80 }
81
~FlowControlFuzzer()82 ~FlowControlFuzzer() {
83 ExecCtx exec_ctx;
84 streams_.clear();
85 tfc_.reset();
86 memory_owner_.Release(allocated_memory_);
87 }
88
89 void Perform(const flow_control_fuzzer::Action& action);
90 void AssertNoneStuck() const;
91 void AssertAnnouncedOverInitialWindowSizeCorrect() const;
92
93 private:
94 struct StreamPayload {
95 uint32_t id;
96 uint64_t size;
97 };
98
99 struct SendToRemote {
100 bool bdp_ping = false;
101 absl::optional<uint32_t> initial_window_size;
102 uint32_t transport_window_update;
103 std::vector<StreamPayload> stream_window_updates;
104 };
105
106 struct SendFromRemote {
107 bool bdp_pong = false;
108 absl::optional<uint32_t> ack_initial_window_size;
109 std::vector<StreamPayload> stream_writes;
110 };
111
112 struct Stream {
Streamgrpc_core::chttp2::__anonc52141130111::FlowControlFuzzer::Stream113 explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
114 uint32_t id;
115 StreamFlowControl fc;
116 int64_t queued_writes = 0;
117 int64_t window_delta = 0;
118 };
119
120 void PerformAction(FlowControlAction action, Stream* stream);
GetStream(uint32_t id)121 Stream* GetStream(uint32_t id) {
122 auto it = streams_.find(id);
123 if (it == streams_.end()) {
124 it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
125 }
126 return &it->second;
127 }
GetStream(uint32_t id) const128 const Stream* GetStream(uint32_t id) const {
129 auto it = streams_.find(id);
130 if (it == streams_.end()) {
131 return nullptr;
132 }
133 return &it->second;
134 }
135
136 MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer");
137 MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner();
138 std::unique_ptr<TransportFlowControl> tfc_;
139 absl::optional<uint32_t> queued_initial_window_size_;
140 absl::optional<uint32_t> queued_send_max_frame_size_;
141 bool scheduled_write_ = false;
142 bool sending_initial_window_size_ = false;
143 std::deque<SendToRemote> send_to_remote_;
144 std::deque<SendFromRemote> send_from_remote_;
145 uint32_t remote_initial_window_size_ = kDefaultWindow;
146 int64_t remote_transport_window_size_ = kDefaultWindow;
147 std::map<uint32_t, Stream> streams_;
148 std::vector<uint32_t> streams_to_update_;
149 uint64_t allocated_memory_ = 0;
150 Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch();
151 };
152
Perform(const flow_control_fuzzer::Action & action)153 void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
154 ExecCtx exec_ctx;
155 bool sending_payload = false;
156 switch (action.action_case()) {
157 case flow_control_fuzzer::Action::ACTION_NOT_SET:
158 break;
159 case flow_control_fuzzer::Action::kSetMemoryQuota: {
160 memory_quota_->SetSize(
161 Clamp(action.set_memory_quota(), uint64_t{1},
162 static_cast<uint64_t>(std::numeric_limits<int64_t>::max())));
163 } break;
164 case flow_control_fuzzer::Action::kStepTimeMs: {
165 g_now = gpr_time_add(
166 g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t{1},
167 kMaxAdvanceTimeMillis),
168 GPR_TIMESPAN));
169 exec_ctx.InvalidateNow();
170 if (Timestamp::Now() >= next_bdp_ping_) {
171 scheduled_write_ = true;
172 }
173 } break;
174 case flow_control_fuzzer::Action::kPeriodicUpdate: {
175 PerformAction(tfc_->PeriodicUpdate(), nullptr);
176 } break;
177 case flow_control_fuzzer::Action::kPerformSendToRemote: {
178 scheduled_write_ = true;
179 } break;
180 case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
181 scheduled_write_ = true;
182 sending_payload = true;
183 } break;
184 case flow_control_fuzzer::Action::kReadSendToRemote: {
185 if (send_to_remote_.empty()) break;
186 auto sent_to_remote = send_to_remote_.front();
187 if (sent_to_remote.initial_window_size.has_value()) {
188 if (!squelch) {
189 fprintf(stderr, "Setting initial window size to %d\n",
190 sent_to_remote.initial_window_size.value());
191 }
192 SendFromRemote send_from_remote;
193 send_from_remote.ack_initial_window_size =
194 sent_to_remote.initial_window_size;
195 for (const auto& id_stream : streams_) {
196 GPR_ASSERT(id_stream.second.window_delta +
197 *sent_to_remote.initial_window_size <=
198 (1u << 31) - 1);
199 }
200 remote_initial_window_size_ = *sent_to_remote.initial_window_size;
201 send_from_remote_.push_back(send_from_remote);
202 }
203 if (sent_to_remote.bdp_ping) {
204 SendFromRemote send_from_remote;
205 send_from_remote.bdp_pong = true;
206 send_from_remote_.push_back(send_from_remote);
207 }
208 for (auto stream_update : sent_to_remote.stream_window_updates) {
209 Stream* s = GetStream(stream_update.id);
210 if (!squelch) {
211 fprintf(stderr,
212 "[%" PRIu32 "]: increase window delta by %" PRIu64
213 " from %" PRId64 "\n",
214 stream_update.id, stream_update.size, s->window_delta);
215 }
216 s->window_delta += stream_update.size;
217 GPR_ASSERT(s->window_delta <= chttp2::kMaxWindowDelta);
218 }
219 remote_transport_window_size_ += sent_to_remote.transport_window_update;
220 send_to_remote_.pop_front();
221 } break;
222 case flow_control_fuzzer::Action::kReadSendFromRemote: {
223 if (send_from_remote_.empty()) break;
224 auto sent_from_remote = send_from_remote_.front();
225 if (sent_from_remote.ack_initial_window_size.has_value()) {
226 if (!squelch) {
227 fprintf(stderr, "Received ACK for initial window size %d\n",
228 *sent_from_remote.ack_initial_window_size);
229 }
230 PerformAction(tfc_->SetAckedInitialWindow(
231 *sent_from_remote.ack_initial_window_size),
232 nullptr);
233 sending_initial_window_size_ = false;
234 }
235 if (sent_from_remote.bdp_pong) {
236 next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
237 }
238 for (const auto& stream_write : sent_from_remote.stream_writes) {
239 Stream* stream = GetStream(stream_write.id);
240 if (!squelch) {
241 fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
242 stream_write.id, stream_write.size);
243 }
244 if (auto* bdp = tfc_->bdp_estimator()) {
245 bdp->AddIncomingBytes(stream_write.size);
246 }
247 StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
248 GPR_ASSERT(upd.RecvData(stream_write.size).ok());
249 PerformAction(upd.MakeAction(), stream);
250 }
251 send_from_remote_.pop_front();
252 } break;
253 case flow_control_fuzzer::Action::kStreamWrite: {
254 Stream* s = GetStream(action.stream_write().id());
255 s->queued_writes += action.stream_write().size();
256 } break;
257 case flow_control_fuzzer::Action::kPerformSendFromRemote: {
258 SendFromRemote send;
259 for (auto& id_stream : streams_) {
260 auto send_amount = std::min(
261 {id_stream.second.queued_writes, remote_transport_window_size_,
262 remote_initial_window_size_ + id_stream.second.window_delta});
263 if (send_amount <= 0) continue;
264 send.stream_writes.push_back(
265 {id_stream.first, static_cast<uint64_t>(send_amount)});
266 id_stream.second.queued_writes -= send_amount;
267 id_stream.second.window_delta -= send_amount;
268 remote_transport_window_size_ -= send_amount;
269 }
270 send_from_remote_.push_back(send);
271 } break;
272 case flow_control_fuzzer::Action::kSetMinProgressSize: {
273 Stream* s = GetStream(action.set_min_progress_size().id());
274 StreamFlowControl::IncomingUpdateContext upd(&s->fc);
275 upd.SetMinProgressSize(action.set_min_progress_size().size());
276 PerformAction(upd.MakeAction(), s);
277 } break;
278 case flow_control_fuzzer::Action::kAllocateMemory: {
279 auto allocate = std::min(
280 static_cast<size_t>(action.allocate_memory()),
281 grpc_event_engine::experimental::MemoryRequest::max_allowed_size());
282 allocated_memory_ += allocate;
283 memory_owner_.Reserve(allocate);
284 } break;
285 case flow_control_fuzzer::Action::kDeallocateMemory: {
286 auto deallocate = std::min(
287 static_cast<uint64_t>(action.deallocate_memory()), allocated_memory_);
288 allocated_memory_ -= deallocate;
289 memory_owner_.Release(deallocate);
290 } break;
291 case flow_control_fuzzer::Action::kSetPendingSize: {
292 Stream* s = GetStream(action.set_min_progress_size().id());
293 StreamFlowControl::IncomingUpdateContext upd(&s->fc);
294 upd.SetPendingSize(action.set_pending_size().size());
295 PerformAction(upd.MakeAction(), s);
296 } break;
297 }
298 if (scheduled_write_) {
299 SendToRemote send;
300 if (!squelch) {
301 fprintf(stderr, "**** PERFORM WRITE ****\n");
302 }
303 if (Timestamp::Now() >= next_bdp_ping_) {
304 if (auto* bdp = tfc_->bdp_estimator()) {
305 if (!squelch) {
306 fprintf(stderr, "- schedule bdp ping\n");
307 }
308 bdp->SchedulePing();
309 bdp->StartPing();
310 next_bdp_ping_ = Timestamp::InfFuture();
311 send.bdp_ping = true;
312 }
313 }
314 if (!sending_initial_window_size_ &&
315 queued_initial_window_size_.has_value()) {
316 if (!squelch) {
317 fprintf(stderr, "- send initial window %d\n",
318 *queued_initial_window_size_);
319 }
320 sending_initial_window_size_ = true;
321 send.initial_window_size =
322 std::exchange(queued_initial_window_size_, absl::nullopt);
323 tfc_->FlushedSettings();
324 }
325 std::vector<uint32_t> streams_to_update = std::move(streams_to_update_);
326 streams_to_update_.clear();
327 for (auto stream_id : streams_to_update) {
328 auto* stream = GetStream(stream_id);
329 auto size = stream->fc.MaybeSendUpdate();
330 if (!squelch) {
331 fprintf(stderr, "- send [%" PRId64 "] stream window update %db\n",
332 static_cast<int64_t>(stream->id), size);
333 }
334 send.stream_window_updates.push_back({stream->id, size});
335 }
336 send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
337 queued_send_max_frame_size_.reset();
338 send_to_remote_.emplace_back(std::move(send));
339 scheduled_write_ = false;
340 if (!squelch) {
341 fprintf(stderr, "**** FINISH WRITE ****\n");
342 }
343 }
344 }
345
PerformAction(FlowControlAction action,Stream * stream)346 void FlowControlFuzzer::PerformAction(FlowControlAction action,
347 Stream* stream) {
348 if (!squelch) {
349 fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
350 stream == nullptr ? int64_t{-1} : static_cast<int64_t>(stream->id),
351 action.DebugString().c_str());
352 }
353
354 auto with_urgency = [this](FlowControlAction::Urgency urgency,
355 std::function<void()> f) {
356 switch (urgency) {
357 case FlowControlAction::Urgency::NO_ACTION_NEEDED:
358 break;
359 case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
360 scheduled_write_ = true;
361 ABSL_FALLTHROUGH_INTENDED;
362 case FlowControlAction::Urgency::QUEUE_UPDATE:
363 f();
364 break;
365 }
366 };
367 with_urgency(action.send_stream_update(),
368 [this, stream]() { streams_to_update_.push_back(stream->id); });
369 with_urgency(action.send_transport_update(), []() {});
370 with_urgency(action.send_initial_window_update(), [this, &action]() {
371 GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
372 queued_initial_window_size_ = action.initial_window_size();
373 });
374 with_urgency(action.send_max_frame_size_update(), [this, &action]() {
375 queued_send_max_frame_size_ = action.max_frame_size();
376 });
377 }
378
AssertNoneStuck() const379 void FlowControlFuzzer::AssertNoneStuck() const {
380 GPR_ASSERT(!scheduled_write_);
381
382 // Reconcile all the values to get the view of the remote that is knowable to
383 // the flow control system.
384 std::map<uint32_t, int64_t> reconciled_stream_deltas;
385 int64_t reconciled_transport_window = remote_transport_window_size_;
386 int64_t reconciled_initial_window = remote_initial_window_size_;
387 std::vector<uint64_t> inflight_send_initial_windows;
388 for (const auto& id_stream : streams_) {
389 reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
390 }
391
392 // Anything that's been sent from flow control -> remote needs to be added to
393 // the remote.
394 for (const auto& send_to_remote : send_to_remote_) {
395 if (send_to_remote.initial_window_size.has_value()) {
396 reconciled_initial_window = *send_to_remote.initial_window_size;
397 inflight_send_initial_windows.push_back(
398 *send_to_remote.initial_window_size);
399 }
400 reconciled_transport_window += send_to_remote.transport_window_update;
401 for (const auto& stream_update : send_to_remote.stream_window_updates) {
402 reconciled_stream_deltas[stream_update.id] += stream_update.size;
403 }
404 }
405
406 // Anything that's been sent from remote -> flow control needs to be wound
407 // back into the remote.
408 for (const auto& send_from_remote : send_from_remote_) {
409 for (const auto& stream_write : send_from_remote.stream_writes) {
410 reconciled_stream_deltas[stream_write.id] += stream_write.size;
411 reconciled_transport_window += stream_write.size;
412 }
413 }
414
415 // If we're sending an initial window size we get to consider a queued initial
416 // window size too: it'll be sent as soon as the remote acks the settings
417 // change, which it must.
418 if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) {
419 reconciled_initial_window = *queued_initial_window_size_;
420 inflight_send_initial_windows.push_back(*queued_initial_window_size_);
421 // And since we'll initiate a write, any updates that are queued to be
422 // written will be considered and send their desired updates.
423 reconciled_transport_window += tfc_->DesiredAnnounceSize(true);
424 for (auto stream_id : streams_to_update_) {
425 auto* stream = GetStream(stream_id);
426 if (stream == nullptr) continue;
427 reconciled_stream_deltas[stream_id] += stream->fc.DesiredAnnounceSize();
428 }
429 }
430
431 // Finally, if a stream has indicated it's willing to read, the reconciled
432 // remote *MUST* be in a state where it could send at least one byte.
433 for (const auto& id_stream : streams_) {
434 if (id_stream.second.fc.min_progress_size() == 0) continue;
435 int64_t stream_window =
436 reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
437 if (stream_window <= 0 || reconciled_transport_window <= 0) {
438 fprintf(stderr,
439 "FAILED: stream %d has stream_window=%" PRId64
440 ", transport_window=%" PRId64 ", delta=%" PRId64
441 ", init_window_size=%" PRId64 ", min_progress_size=%" PRId64
442 ", transport announced_stream_total_over_incoming_window=%" PRId64
443 ", transport announced_window=%" PRId64
444 " transport target_window=%" PRId64 " sent_init_window=%d\n",
445 id_stream.first, stream_window, reconciled_transport_window,
446 reconciled_stream_deltas[id_stream.first],
447 reconciled_initial_window,
448 (id_stream.second.fc.min_progress_size()),
449 tfc_->announced_stream_total_over_incoming_window(),
450 tfc_->announced_window(), tfc_->target_window(),
451 tfc_->sent_init_window());
452 fprintf(stderr,
453 "initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n",
454 remote_initial_window_size_,
455 absl::StrJoin(inflight_send_initial_windows, ",").c_str());
456 abort();
457 }
458 }
459 }
460
AssertAnnouncedOverInitialWindowSizeCorrect() const461 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
462 int64_t value_from_streams = 0;
463
464 for (const auto& id_stream : streams_) {
465 const auto& stream = id_stream.second;
466 if (stream.fc.announced_window_delta() > 0) {
467 value_from_streams += stream.fc.announced_window_delta();
468 }
469 }
470
471 GPR_ASSERT(value_from_streams ==
472 tfc_->announced_stream_total_over_incoming_window());
473 }
474
475 } // namespace
476 } // namespace chttp2
477 } // namespace grpc_core
478
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg & msg)479 DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
480 grpc_core::ApplyFuzzConfigVars(msg.config_vars());
481 grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
482 grpc_core::chttp2::InitGlobals();
483 grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
484 for (const auto& action : msg.actions()) {
485 if (!squelch) {
486 fprintf(stderr, "%s\n", action.DebugString().c_str());
487 }
488 fuzzer.Perform(action);
489 fuzzer.AssertNoneStuck();
490 fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
491 }
492 }
493