• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/event_engine/memory_request.h>
16 #include <grpc/support/time.h>
17 #include <inttypes.h>
18 #include <stdio.h>
19 #include <stdlib.h>
20 
21 #include <algorithm>
22 #include <deque>
23 #include <functional>
24 #include <limits>
25 #include <map>
26 #include <memory>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/base/attributes.h"
31 #include "absl/log/check.h"
32 #include "absl/status/status.h"
33 #include "absl/strings/str_join.h"
34 #include "absl/types/optional.h"
35 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
36 #include "src/core/lib/experiments/config.h"
37 #include "src/core/lib/iomgr/exec_ctx.h"
38 #include "src/core/lib/resource_quota/memory_quota.h"
39 #include "src/core/lib/transport/bdp_estimator.h"
40 #include "src/core/util/time.h"
41 #include "src/core/util/useful.h"
42 #include "src/libfuzzer/libfuzzer_macro.h"
43 #include "test/core/test_util/fuzz_config_vars.h"
44 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
45 
46 // IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
47 
48 bool squelch = true;
49 
50 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
51 
52 namespace grpc_core {
53 namespace chttp2 {
54 namespace {
55 
56 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
57 
58 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)59 gpr_timespec now_impl(gpr_clock_type clock_type) {
60   CHECK(clock_type != GPR_TIMESPAN);
61   gpr_timespec ts = g_now;
62   ts.clock_type = clock_type;
63   return ts;
64 }
65 
InitGlobals()66 void InitGlobals() {
67   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
68   TestOnlySetProcessEpoch(g_now);
69   gpr_now_impl = now_impl;
70 }
71 
72 class FlowControlFuzzer {
73  public:
FlowControlFuzzer(bool enable_bdp)74   explicit FlowControlFuzzer(bool enable_bdp) {
75     ExecCtx exec_ctx;
76     tfc_ = std::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
77                                                   &memory_owner_);
78   }
79 
~FlowControlFuzzer()80   ~FlowControlFuzzer() {
81     ExecCtx exec_ctx;
82     streams_.clear();
83     tfc_.reset();
84     memory_owner_.Release(allocated_memory_);
85   }
86 
87   void Perform(const flow_control_fuzzer::Action& action);
88   void AssertNoneStuck() const;
89   void AssertAnnouncedOverInitialWindowSizeCorrect() const;
90 
91  private:
92   struct StreamPayload {
93     uint32_t id;
94     uint64_t size;
95   };
96 
97   struct SendToRemote {
98     bool bdp_ping = false;
99     absl::optional<uint32_t> initial_window_size;
100     uint32_t transport_window_update;
101     std::vector<StreamPayload> stream_window_updates;
102   };
103 
104   struct SendFromRemote {
105     bool bdp_pong = false;
106     absl::optional<uint32_t> ack_initial_window_size;
107     std::vector<StreamPayload> stream_writes;
108   };
109 
110   struct Stream {
Streamgrpc_core::chttp2::__anon2ca8f0570111::FlowControlFuzzer::Stream111     explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
112     uint32_t id;
113     StreamFlowControl fc;
114     int64_t queued_writes = 0;
115     int64_t window_delta = 0;
116   };
117 
118   void PerformAction(FlowControlAction action, Stream* stream);
GetStream(uint32_t id)119   Stream* GetStream(uint32_t id) {
120     auto it = streams_.find(id);
121     if (it == streams_.end()) {
122       it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
123     }
124     return &it->second;
125   }
GetStream(uint32_t id) const126   const Stream* GetStream(uint32_t id) const {
127     auto it = streams_.find(id);
128     if (it == streams_.end()) {
129       return nullptr;
130     }
131     return &it->second;
132   }
133 
134   MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer");
135   MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner();
136   std::unique_ptr<TransportFlowControl> tfc_;
137   absl::optional<uint32_t> queued_initial_window_size_;
138   absl::optional<uint32_t> queued_send_max_frame_size_;
139   bool scheduled_write_ = false;
140   bool sending_initial_window_size_ = false;
141   std::deque<SendToRemote> send_to_remote_;
142   std::deque<SendFromRemote> send_from_remote_;
143   uint32_t remote_initial_window_size_ = kDefaultWindow;
144   int64_t remote_transport_window_size_ = kDefaultWindow;
145   std::map<uint32_t, Stream> streams_;
146   std::vector<uint32_t> streams_to_update_;
147   uint64_t allocated_memory_ = 0;
148   Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch();
149 };
150 
Perform(const flow_control_fuzzer::Action & action)151 void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
152   ExecCtx exec_ctx;
153   bool sending_payload = false;
154   switch (action.action_case()) {
155     case flow_control_fuzzer::Action::ACTION_NOT_SET:
156       break;
157     case flow_control_fuzzer::Action::kSetMemoryQuota: {
158       memory_quota_->SetSize(
159           Clamp(action.set_memory_quota(), uint64_t{1},
160                 static_cast<uint64_t>(std::numeric_limits<int64_t>::max())));
161     } break;
162     case flow_control_fuzzer::Action::kStepTimeMs: {
163       g_now = gpr_time_add(
164           g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t{1},
165                                             kMaxAdvanceTimeMillis),
166                                       GPR_TIMESPAN));
167       exec_ctx.InvalidateNow();
168       if (Timestamp::Now() >= next_bdp_ping_) {
169         scheduled_write_ = true;
170       }
171     } break;
172     case flow_control_fuzzer::Action::kPeriodicUpdate: {
173       PerformAction(tfc_->PeriodicUpdate(), nullptr);
174     } break;
175     case flow_control_fuzzer::Action::kPerformSendToRemote: {
176       scheduled_write_ = true;
177     } break;
178     case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
179       scheduled_write_ = true;
180       sending_payload = true;
181     } break;
182     case flow_control_fuzzer::Action::kReadSendToRemote: {
183       if (send_to_remote_.empty()) break;
184       auto sent_to_remote = send_to_remote_.front();
185       if (sent_to_remote.initial_window_size.has_value()) {
186         if (!squelch) {
187           fprintf(stderr, "Setting initial window size to %d\n",
188                   sent_to_remote.initial_window_size.value());
189         }
190         SendFromRemote send_from_remote;
191         send_from_remote.ack_initial_window_size =
192             sent_to_remote.initial_window_size;
193         for (const auto& id_stream : streams_) {
194           CHECK(id_stream.second.window_delta +
195                     *sent_to_remote.initial_window_size <=
196                 (1u << 31) - 1);
197         }
198         remote_initial_window_size_ = *sent_to_remote.initial_window_size;
199         send_from_remote_.push_back(send_from_remote);
200       }
201       if (sent_to_remote.bdp_ping) {
202         SendFromRemote send_from_remote;
203         send_from_remote.bdp_pong = true;
204         send_from_remote_.push_back(send_from_remote);
205       }
206       for (auto stream_update : sent_to_remote.stream_window_updates) {
207         Stream* s = GetStream(stream_update.id);
208         if (!squelch) {
209           fprintf(stderr,
210                   "[%" PRIu32 "]: increase window delta by %" PRIu64
211                   " from %" PRId64 "\n",
212                   stream_update.id, stream_update.size, s->window_delta);
213         }
214         s->window_delta += stream_update.size;
215         CHECK(s->window_delta <= chttp2::kMaxWindowDelta);
216       }
217       remote_transport_window_size_ += sent_to_remote.transport_window_update;
218       send_to_remote_.pop_front();
219     } break;
220     case flow_control_fuzzer::Action::kReadSendFromRemote: {
221       if (send_from_remote_.empty()) break;
222       auto sent_from_remote = send_from_remote_.front();
223       if (sent_from_remote.ack_initial_window_size.has_value()) {
224         if (!squelch) {
225           fprintf(stderr, "Received ACK for initial window size %d\n",
226                   *sent_from_remote.ack_initial_window_size);
227         }
228         PerformAction(tfc_->SetAckedInitialWindow(
229                           *sent_from_remote.ack_initial_window_size),
230                       nullptr);
231         sending_initial_window_size_ = false;
232       }
233       if (sent_from_remote.bdp_pong) {
234         next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
235       }
236       for (const auto& stream_write : sent_from_remote.stream_writes) {
237         Stream* stream = GetStream(stream_write.id);
238         if (!squelch) {
239           fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
240                   stream_write.id, stream_write.size);
241         }
242         if (auto* bdp = tfc_->bdp_estimator()) {
243           bdp->AddIncomingBytes(stream_write.size);
244         }
245         StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
246         CHECK_OK(upd.RecvData(stream_write.size));
247         PerformAction(upd.MakeAction(), stream);
248       }
249       send_from_remote_.pop_front();
250     } break;
251     case flow_control_fuzzer::Action::kStreamWrite: {
252       Stream* s = GetStream(action.stream_write().id());
253       s->queued_writes += action.stream_write().size();
254     } break;
255     case flow_control_fuzzer::Action::kPerformSendFromRemote: {
256       SendFromRemote send;
257       for (auto& id_stream : streams_) {
258         auto send_amount = std::min(
259             {id_stream.second.queued_writes, remote_transport_window_size_,
260              remote_initial_window_size_ + id_stream.second.window_delta});
261         if (send_amount <= 0) continue;
262         send.stream_writes.push_back(
263             {id_stream.first, static_cast<uint64_t>(send_amount)});
264         id_stream.second.queued_writes -= send_amount;
265         id_stream.second.window_delta -= send_amount;
266         remote_transport_window_size_ -= send_amount;
267       }
268       send_from_remote_.push_back(send);
269     } break;
270     case flow_control_fuzzer::Action::kSetMinProgressSize: {
271       Stream* s = GetStream(action.set_min_progress_size().id());
272       StreamFlowControl::IncomingUpdateContext upd(&s->fc);
273       upd.SetMinProgressSize(action.set_min_progress_size().size());
274       PerformAction(upd.MakeAction(), s);
275     } break;
276     case flow_control_fuzzer::Action::kAllocateMemory: {
277       auto allocate = std::min(
278           static_cast<size_t>(action.allocate_memory()),
279           grpc_event_engine::experimental::MemoryRequest::max_allowed_size());
280       allocated_memory_ += allocate;
281       memory_owner_.Reserve(allocate);
282     } break;
283     case flow_control_fuzzer::Action::kDeallocateMemory: {
284       auto deallocate = std::min(
285           static_cast<uint64_t>(action.deallocate_memory()), allocated_memory_);
286       allocated_memory_ -= deallocate;
287       memory_owner_.Release(deallocate);
288     } break;
289     case flow_control_fuzzer::Action::kSetPendingSize: {
290       Stream* s = GetStream(action.set_min_progress_size().id());
291       StreamFlowControl::IncomingUpdateContext upd(&s->fc);
292       upd.SetPendingSize(action.set_pending_size().size());
293       PerformAction(upd.MakeAction(), s);
294     } break;
295   }
296   if (scheduled_write_) {
297     SendToRemote send;
298     if (!squelch) {
299       fprintf(stderr, "**** PERFORM WRITE ****\n");
300     }
301     if (Timestamp::Now() >= next_bdp_ping_) {
302       if (auto* bdp = tfc_->bdp_estimator()) {
303         if (!squelch) {
304           fprintf(stderr, "- schedule bdp ping\n");
305         }
306         bdp->SchedulePing();
307         bdp->StartPing();
308         next_bdp_ping_ = Timestamp::InfFuture();
309         send.bdp_ping = true;
310       }
311     }
312     if (!sending_initial_window_size_ &&
313         queued_initial_window_size_.has_value()) {
314       if (!squelch) {
315         fprintf(stderr, "- send initial window %d\n",
316                 *queued_initial_window_size_);
317       }
318       sending_initial_window_size_ = true;
319       send.initial_window_size =
320           std::exchange(queued_initial_window_size_, absl::nullopt);
321       tfc_->FlushedSettings();
322     }
323     std::vector<uint32_t> streams_to_update = std::move(streams_to_update_);
324     streams_to_update_.clear();
325     for (auto stream_id : streams_to_update) {
326       auto* stream = GetStream(stream_id);
327       auto size = stream->fc.MaybeSendUpdate();
328       if (!squelch) {
329         fprintf(stderr, "- send [%" PRId64 "] stream window update %db\n",
330                 static_cast<int64_t>(stream->id), size);
331       }
332       send.stream_window_updates.push_back({stream->id, size});
333     }
334     send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
335     queued_send_max_frame_size_.reset();
336     send_to_remote_.emplace_back(std::move(send));
337     scheduled_write_ = false;
338     if (!squelch) {
339       fprintf(stderr, "**** FINISH WRITE ****\n");
340     }
341   }
342 }
343 
PerformAction(FlowControlAction action,Stream * stream)344 void FlowControlFuzzer::PerformAction(FlowControlAction action,
345                                       Stream* stream) {
346   if (!squelch) {
347     fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
348             stream == nullptr ? int64_t{-1} : static_cast<int64_t>(stream->id),
349             action.DebugString().c_str());
350   }
351 
352   auto with_urgency = [this](FlowControlAction::Urgency urgency,
353                              std::function<void()> f) {
354     switch (urgency) {
355       case FlowControlAction::Urgency::NO_ACTION_NEEDED:
356         break;
357       case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
358         scheduled_write_ = true;
359         ABSL_FALLTHROUGH_INTENDED;
360       case FlowControlAction::Urgency::QUEUE_UPDATE:
361         f();
362         break;
363     }
364   };
365   with_urgency(action.send_stream_update(),
366                [this, stream]() { streams_to_update_.push_back(stream->id); });
367   with_urgency(action.send_transport_update(), []() {});
368   with_urgency(action.send_initial_window_update(), [this, &action]() {
369     CHECK(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
370     queued_initial_window_size_ = action.initial_window_size();
371   });
372   with_urgency(action.send_max_frame_size_update(), [this, &action]() {
373     queued_send_max_frame_size_ = action.max_frame_size();
374   });
375 }
376 
AssertNoneStuck() const377 void FlowControlFuzzer::AssertNoneStuck() const {
378   CHECK(!scheduled_write_);
379 
380   // Reconcile all the values to get the view of the remote that is knowable to
381   // the flow control system.
382   std::map<uint32_t, int64_t> reconciled_stream_deltas;
383   int64_t reconciled_transport_window = remote_transport_window_size_;
384   int64_t reconciled_initial_window = remote_initial_window_size_;
385   std::vector<uint64_t> inflight_send_initial_windows;
386   for (const auto& id_stream : streams_) {
387     reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
388   }
389 
390   // Anything that's been sent from flow control -> remote needs to be added to
391   // the remote.
392   for (const auto& send_to_remote : send_to_remote_) {
393     if (send_to_remote.initial_window_size.has_value()) {
394       reconciled_initial_window = *send_to_remote.initial_window_size;
395       inflight_send_initial_windows.push_back(
396           *send_to_remote.initial_window_size);
397     }
398     reconciled_transport_window += send_to_remote.transport_window_update;
399     for (const auto& stream_update : send_to_remote.stream_window_updates) {
400       reconciled_stream_deltas[stream_update.id] += stream_update.size;
401     }
402   }
403 
404   // Anything that's been sent from remote -> flow control needs to be wound
405   // back into the remote.
406   for (const auto& send_from_remote : send_from_remote_) {
407     for (const auto& stream_write : send_from_remote.stream_writes) {
408       reconciled_stream_deltas[stream_write.id] += stream_write.size;
409       reconciled_transport_window += stream_write.size;
410     }
411   }
412 
413   // If we're sending an initial window size we get to consider a queued initial
414   // window size too: it'll be sent as soon as the remote acks the settings
415   // change, which it must.
416   if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) {
417     reconciled_initial_window = *queued_initial_window_size_;
418     inflight_send_initial_windows.push_back(*queued_initial_window_size_);
419     // And since we'll initiate a write, any updates that are queued to be
420     // written will be considered and send their desired updates.
421     reconciled_transport_window += tfc_->DesiredAnnounceSize(true);
422     for (auto stream_id : streams_to_update_) {
423       auto* stream = GetStream(stream_id);
424       if (stream == nullptr) continue;
425       reconciled_stream_deltas[stream_id] += stream->fc.DesiredAnnounceSize();
426     }
427   }
428 
429   // Finally, if a stream has indicated it's willing to read, the reconciled
430   // remote *MUST* be in a state where it could send at least one byte.
431   for (const auto& id_stream : streams_) {
432     if (id_stream.second.fc.min_progress_size() == 0) continue;
433     int64_t stream_window =
434         reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
435     if (stream_window <= 0 || reconciled_transport_window <= 0) {
436       fprintf(stderr,
437               "FAILED: stream %d has stream_window=%" PRId64
438               ", transport_window=%" PRId64 ", delta=%" PRId64
439               ", init_window_size=%" PRId64 ", min_progress_size=%" PRId64
440               ", transport announced_stream_total_over_incoming_window=%" PRId64
441               ", transport announced_window=%" PRId64
442               " transport target_window=%" PRId64 " sent_init_window=%d\n",
443               id_stream.first, stream_window, reconciled_transport_window,
444               reconciled_stream_deltas[id_stream.first],
445               reconciled_initial_window,
446               (id_stream.second.fc.min_progress_size()),
447               tfc_->announced_stream_total_over_incoming_window(),
448               tfc_->announced_window(), tfc_->target_window(),
449               tfc_->sent_init_window());
450       fprintf(stderr,
451               "initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n",
452               remote_initial_window_size_,
453               absl::StrJoin(inflight_send_initial_windows, ",").c_str());
454       abort();
455     }
456   }
457 }
458 
AssertAnnouncedOverInitialWindowSizeCorrect() const459 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
460   int64_t value_from_streams = 0;
461 
462   for (const auto& id_stream : streams_) {
463     const auto& stream = id_stream.second;
464     if (stream.fc.announced_window_delta() > 0) {
465       value_from_streams += stream.fc.announced_window_delta();
466     }
467   }
468 
469   CHECK(value_from_streams ==
470         tfc_->announced_stream_total_over_incoming_window());
471 }
472 
473 }  // namespace
474 }  // namespace chttp2
475 }  // namespace grpc_core
476 
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg & msg)477 DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
478   grpc_core::ApplyFuzzConfigVars(msg.config_vars());
479   grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
480   grpc_core::chttp2::InitGlobals();
481   grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
482   for (const auto& action : msg.actions()) {
483     if (!squelch) {
484       fprintf(stderr, "%s\n", action.DebugString().c_str());
485     }
486     fuzzer.Perform(action);
487     fuzzer.AssertNoneStuck();
488     fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
489   }
490 }
491