• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <inttypes.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 
19 #include <algorithm>
20 #include <deque>
21 #include <functional>
22 #include <limits>
23 #include <map>
24 #include <memory>
25 #include <utility>
26 #include <vector>
27 
28 #include "absl/base/attributes.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_join.h"
31 #include "absl/types/optional.h"
32 
33 #include <grpc/event_engine/memory_request.h>
34 #include <grpc/support/log.h>
35 #include <grpc/support/time.h>
36 
37 #include "src/core/ext/transport/chttp2/transport/flow_control.h"
38 #include "src/core/lib/experiments/config.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/gprpp/time.h"
41 #include "src/core/lib/iomgr/exec_ctx.h"
42 #include "src/core/lib/resource_quota/memory_quota.h"
43 #include "src/core/lib/transport/bdp_estimator.h"
44 #include "src/libfuzzer/libfuzzer_macro.h"
45 #include "test/core/transport/chttp2/flow_control_fuzzer.pb.h"
46 #include "test/core/util/fuzz_config_vars.h"
47 
48 // IWYU pragma: no_include <google/protobuf/repeated_ptr_field.h>
49 
50 bool squelch = true;
51 
52 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
53 
54 namespace grpc_core {
55 namespace chttp2 {
56 namespace {
57 
58 constexpr uint64_t kMaxAdvanceTimeMillis = 24ull * 365 * 3600 * 1000;
59 
60 gpr_timespec g_now;
now_impl(gpr_clock_type clock_type)61 gpr_timespec now_impl(gpr_clock_type clock_type) {
62   GPR_ASSERT(clock_type != GPR_TIMESPAN);
63   gpr_timespec ts = g_now;
64   ts.clock_type = clock_type;
65   return ts;
66 }
67 
InitGlobals()68 void InitGlobals() {
69   g_now = {1, 0, GPR_CLOCK_MONOTONIC};
70   TestOnlySetProcessEpoch(g_now);
71   gpr_now_impl = now_impl;
72 }
73 
74 class FlowControlFuzzer {
75  public:
FlowControlFuzzer(bool enable_bdp)76   explicit FlowControlFuzzer(bool enable_bdp) {
77     ExecCtx exec_ctx;
78     tfc_ = std::make_unique<TransportFlowControl>("fuzzer", enable_bdp,
79                                                   &memory_owner_);
80   }
81 
~FlowControlFuzzer()82   ~FlowControlFuzzer() {
83     ExecCtx exec_ctx;
84     streams_.clear();
85     tfc_.reset();
86     memory_owner_.Release(allocated_memory_);
87   }
88 
89   void Perform(const flow_control_fuzzer::Action& action);
90   void AssertNoneStuck() const;
91   void AssertAnnouncedOverInitialWindowSizeCorrect() const;
92 
93  private:
94   struct StreamPayload {
95     uint32_t id;
96     uint64_t size;
97   };
98 
99   struct SendToRemote {
100     bool bdp_ping = false;
101     absl::optional<uint32_t> initial_window_size;
102     uint32_t transport_window_update;
103     std::vector<StreamPayload> stream_window_updates;
104   };
105 
106   struct SendFromRemote {
107     bool bdp_pong = false;
108     absl::optional<uint32_t> ack_initial_window_size;
109     std::vector<StreamPayload> stream_writes;
110   };
111 
112   struct Stream {
Streamgrpc_core::chttp2::__anonc52141130111::FlowControlFuzzer::Stream113     explicit Stream(uint32_t id, TransportFlowControl* tfc) : id(id), fc(tfc) {}
114     uint32_t id;
115     StreamFlowControl fc;
116     int64_t queued_writes = 0;
117     int64_t window_delta = 0;
118   };
119 
120   void PerformAction(FlowControlAction action, Stream* stream);
GetStream(uint32_t id)121   Stream* GetStream(uint32_t id) {
122     auto it = streams_.find(id);
123     if (it == streams_.end()) {
124       it = streams_.emplace(id, Stream{id, tfc_.get()}).first;
125     }
126     return &it->second;
127   }
GetStream(uint32_t id) const128   const Stream* GetStream(uint32_t id) const {
129     auto it = streams_.find(id);
130     if (it == streams_.end()) {
131       return nullptr;
132     }
133     return &it->second;
134   }
135 
136   MemoryQuotaRefPtr memory_quota_ = MakeMemoryQuota("fuzzer");
137   MemoryOwner memory_owner_ = memory_quota_->CreateMemoryOwner();
138   std::unique_ptr<TransportFlowControl> tfc_;
139   absl::optional<uint32_t> queued_initial_window_size_;
140   absl::optional<uint32_t> queued_send_max_frame_size_;
141   bool scheduled_write_ = false;
142   bool sending_initial_window_size_ = false;
143   std::deque<SendToRemote> send_to_remote_;
144   std::deque<SendFromRemote> send_from_remote_;
145   uint32_t remote_initial_window_size_ = kDefaultWindow;
146   int64_t remote_transport_window_size_ = kDefaultWindow;
147   std::map<uint32_t, Stream> streams_;
148   std::vector<uint32_t> streams_to_update_;
149   uint64_t allocated_memory_ = 0;
150   Timestamp next_bdp_ping_ = Timestamp::ProcessEpoch();
151 };
152 
Perform(const flow_control_fuzzer::Action & action)153 void FlowControlFuzzer::Perform(const flow_control_fuzzer::Action& action) {
154   ExecCtx exec_ctx;
155   bool sending_payload = false;
156   switch (action.action_case()) {
157     case flow_control_fuzzer::Action::ACTION_NOT_SET:
158       break;
159     case flow_control_fuzzer::Action::kSetMemoryQuota: {
160       memory_quota_->SetSize(
161           Clamp(action.set_memory_quota(), uint64_t{1},
162                 static_cast<uint64_t>(std::numeric_limits<int64_t>::max())));
163     } break;
164     case flow_control_fuzzer::Action::kStepTimeMs: {
165       g_now = gpr_time_add(
166           g_now, gpr_time_from_millis(Clamp(action.step_time_ms(), uint64_t{1},
167                                             kMaxAdvanceTimeMillis),
168                                       GPR_TIMESPAN));
169       exec_ctx.InvalidateNow();
170       if (Timestamp::Now() >= next_bdp_ping_) {
171         scheduled_write_ = true;
172       }
173     } break;
174     case flow_control_fuzzer::Action::kPeriodicUpdate: {
175       PerformAction(tfc_->PeriodicUpdate(), nullptr);
176     } break;
177     case flow_control_fuzzer::Action::kPerformSendToRemote: {
178       scheduled_write_ = true;
179     } break;
180     case flow_control_fuzzer::Action::kPerformSendToRemoteWithPayload: {
181       scheduled_write_ = true;
182       sending_payload = true;
183     } break;
184     case flow_control_fuzzer::Action::kReadSendToRemote: {
185       if (send_to_remote_.empty()) break;
186       auto sent_to_remote = send_to_remote_.front();
187       if (sent_to_remote.initial_window_size.has_value()) {
188         if (!squelch) {
189           fprintf(stderr, "Setting initial window size to %d\n",
190                   sent_to_remote.initial_window_size.value());
191         }
192         SendFromRemote send_from_remote;
193         send_from_remote.ack_initial_window_size =
194             sent_to_remote.initial_window_size;
195         for (const auto& id_stream : streams_) {
196           GPR_ASSERT(id_stream.second.window_delta +
197                          *sent_to_remote.initial_window_size <=
198                      (1u << 31) - 1);
199         }
200         remote_initial_window_size_ = *sent_to_remote.initial_window_size;
201         send_from_remote_.push_back(send_from_remote);
202       }
203       if (sent_to_remote.bdp_ping) {
204         SendFromRemote send_from_remote;
205         send_from_remote.bdp_pong = true;
206         send_from_remote_.push_back(send_from_remote);
207       }
208       for (auto stream_update : sent_to_remote.stream_window_updates) {
209         Stream* s = GetStream(stream_update.id);
210         if (!squelch) {
211           fprintf(stderr,
212                   "[%" PRIu32 "]: increase window delta by %" PRIu64
213                   " from %" PRId64 "\n",
214                   stream_update.id, stream_update.size, s->window_delta);
215         }
216         s->window_delta += stream_update.size;
217         GPR_ASSERT(s->window_delta <= chttp2::kMaxWindowDelta);
218       }
219       remote_transport_window_size_ += sent_to_remote.transport_window_update;
220       send_to_remote_.pop_front();
221     } break;
222     case flow_control_fuzzer::Action::kReadSendFromRemote: {
223       if (send_from_remote_.empty()) break;
224       auto sent_from_remote = send_from_remote_.front();
225       if (sent_from_remote.ack_initial_window_size.has_value()) {
226         if (!squelch) {
227           fprintf(stderr, "Received ACK for initial window size %d\n",
228                   *sent_from_remote.ack_initial_window_size);
229         }
230         PerformAction(tfc_->SetAckedInitialWindow(
231                           *sent_from_remote.ack_initial_window_size),
232                       nullptr);
233         sending_initial_window_size_ = false;
234       }
235       if (sent_from_remote.bdp_pong) {
236         next_bdp_ping_ = tfc_->bdp_estimator()->CompletePing();
237       }
238       for (const auto& stream_write : sent_from_remote.stream_writes) {
239         Stream* stream = GetStream(stream_write.id);
240         if (!squelch) {
241           fprintf(stderr, "[%" PRIu32 "]: recv write of %" PRIu64 "\n",
242                   stream_write.id, stream_write.size);
243         }
244         if (auto* bdp = tfc_->bdp_estimator()) {
245           bdp->AddIncomingBytes(stream_write.size);
246         }
247         StreamFlowControl::IncomingUpdateContext upd(&stream->fc);
248         GPR_ASSERT(upd.RecvData(stream_write.size).ok());
249         PerformAction(upd.MakeAction(), stream);
250       }
251       send_from_remote_.pop_front();
252     } break;
253     case flow_control_fuzzer::Action::kStreamWrite: {
254       Stream* s = GetStream(action.stream_write().id());
255       s->queued_writes += action.stream_write().size();
256     } break;
257     case flow_control_fuzzer::Action::kPerformSendFromRemote: {
258       SendFromRemote send;
259       for (auto& id_stream : streams_) {
260         auto send_amount = std::min(
261             {id_stream.second.queued_writes, remote_transport_window_size_,
262              remote_initial_window_size_ + id_stream.second.window_delta});
263         if (send_amount <= 0) continue;
264         send.stream_writes.push_back(
265             {id_stream.first, static_cast<uint64_t>(send_amount)});
266         id_stream.second.queued_writes -= send_amount;
267         id_stream.second.window_delta -= send_amount;
268         remote_transport_window_size_ -= send_amount;
269       }
270       send_from_remote_.push_back(send);
271     } break;
272     case flow_control_fuzzer::Action::kSetMinProgressSize: {
273       Stream* s = GetStream(action.set_min_progress_size().id());
274       StreamFlowControl::IncomingUpdateContext upd(&s->fc);
275       upd.SetMinProgressSize(action.set_min_progress_size().size());
276       PerformAction(upd.MakeAction(), s);
277     } break;
278     case flow_control_fuzzer::Action::kAllocateMemory: {
279       auto allocate = std::min(
280           static_cast<size_t>(action.allocate_memory()),
281           grpc_event_engine::experimental::MemoryRequest::max_allowed_size());
282       allocated_memory_ += allocate;
283       memory_owner_.Reserve(allocate);
284     } break;
285     case flow_control_fuzzer::Action::kDeallocateMemory: {
286       auto deallocate = std::min(
287           static_cast<uint64_t>(action.deallocate_memory()), allocated_memory_);
288       allocated_memory_ -= deallocate;
289       memory_owner_.Release(deallocate);
290     } break;
291     case flow_control_fuzzer::Action::kSetPendingSize: {
292       Stream* s = GetStream(action.set_min_progress_size().id());
293       StreamFlowControl::IncomingUpdateContext upd(&s->fc);
294       upd.SetPendingSize(action.set_pending_size().size());
295       PerformAction(upd.MakeAction(), s);
296     } break;
297   }
298   if (scheduled_write_) {
299     SendToRemote send;
300     if (!squelch) {
301       fprintf(stderr, "**** PERFORM WRITE ****\n");
302     }
303     if (Timestamp::Now() >= next_bdp_ping_) {
304       if (auto* bdp = tfc_->bdp_estimator()) {
305         if (!squelch) {
306           fprintf(stderr, "- schedule bdp ping\n");
307         }
308         bdp->SchedulePing();
309         bdp->StartPing();
310         next_bdp_ping_ = Timestamp::InfFuture();
311         send.bdp_ping = true;
312       }
313     }
314     if (!sending_initial_window_size_ &&
315         queued_initial_window_size_.has_value()) {
316       if (!squelch) {
317         fprintf(stderr, "- send initial window %d\n",
318                 *queued_initial_window_size_);
319       }
320       sending_initial_window_size_ = true;
321       send.initial_window_size =
322           std::exchange(queued_initial_window_size_, absl::nullopt);
323       tfc_->FlushedSettings();
324     }
325     std::vector<uint32_t> streams_to_update = std::move(streams_to_update_);
326     streams_to_update_.clear();
327     for (auto stream_id : streams_to_update) {
328       auto* stream = GetStream(stream_id);
329       auto size = stream->fc.MaybeSendUpdate();
330       if (!squelch) {
331         fprintf(stderr, "- send [%" PRId64 "] stream window update %db\n",
332                 static_cast<int64_t>(stream->id), size);
333       }
334       send.stream_window_updates.push_back({stream->id, size});
335     }
336     send.transport_window_update = tfc_->MaybeSendUpdate(sending_payload);
337     queued_send_max_frame_size_.reset();
338     send_to_remote_.emplace_back(std::move(send));
339     scheduled_write_ = false;
340     if (!squelch) {
341       fprintf(stderr, "**** FINISH WRITE ****\n");
342     }
343   }
344 }
345 
PerformAction(FlowControlAction action,Stream * stream)346 void FlowControlFuzzer::PerformAction(FlowControlAction action,
347                                       Stream* stream) {
348   if (!squelch) {
349     fprintf(stderr, "[%" PRId64 "]: ACTION: %s\n",
350             stream == nullptr ? int64_t{-1} : static_cast<int64_t>(stream->id),
351             action.DebugString().c_str());
352   }
353 
354   auto with_urgency = [this](FlowControlAction::Urgency urgency,
355                              std::function<void()> f) {
356     switch (urgency) {
357       case FlowControlAction::Urgency::NO_ACTION_NEEDED:
358         break;
359       case FlowControlAction::Urgency::UPDATE_IMMEDIATELY:
360         scheduled_write_ = true;
361         ABSL_FALLTHROUGH_INTENDED;
362       case FlowControlAction::Urgency::QUEUE_UPDATE:
363         f();
364         break;
365     }
366   };
367   with_urgency(action.send_stream_update(),
368                [this, stream]() { streams_to_update_.push_back(stream->id); });
369   with_urgency(action.send_transport_update(), []() {});
370   with_urgency(action.send_initial_window_update(), [this, &action]() {
371     GPR_ASSERT(action.initial_window_size() <= chttp2::kMaxInitialWindowSize);
372     queued_initial_window_size_ = action.initial_window_size();
373   });
374   with_urgency(action.send_max_frame_size_update(), [this, &action]() {
375     queued_send_max_frame_size_ = action.max_frame_size();
376   });
377 }
378 
AssertNoneStuck() const379 void FlowControlFuzzer::AssertNoneStuck() const {
380   GPR_ASSERT(!scheduled_write_);
381 
382   // Reconcile all the values to get the view of the remote that is knowable to
383   // the flow control system.
384   std::map<uint32_t, int64_t> reconciled_stream_deltas;
385   int64_t reconciled_transport_window = remote_transport_window_size_;
386   int64_t reconciled_initial_window = remote_initial_window_size_;
387   std::vector<uint64_t> inflight_send_initial_windows;
388   for (const auto& id_stream : streams_) {
389     reconciled_stream_deltas[id_stream.first] = id_stream.second.window_delta;
390   }
391 
392   // Anything that's been sent from flow control -> remote needs to be added to
393   // the remote.
394   for (const auto& send_to_remote : send_to_remote_) {
395     if (send_to_remote.initial_window_size.has_value()) {
396       reconciled_initial_window = *send_to_remote.initial_window_size;
397       inflight_send_initial_windows.push_back(
398           *send_to_remote.initial_window_size);
399     }
400     reconciled_transport_window += send_to_remote.transport_window_update;
401     for (const auto& stream_update : send_to_remote.stream_window_updates) {
402       reconciled_stream_deltas[stream_update.id] += stream_update.size;
403     }
404   }
405 
406   // Anything that's been sent from remote -> flow control needs to be wound
407   // back into the remote.
408   for (const auto& send_from_remote : send_from_remote_) {
409     for (const auto& stream_write : send_from_remote.stream_writes) {
410       reconciled_stream_deltas[stream_write.id] += stream_write.size;
411       reconciled_transport_window += stream_write.size;
412     }
413   }
414 
415   // If we're sending an initial window size we get to consider a queued initial
416   // window size too: it'll be sent as soon as the remote acks the settings
417   // change, which it must.
418   if (sending_initial_window_size_ && queued_initial_window_size_.has_value()) {
419     reconciled_initial_window = *queued_initial_window_size_;
420     inflight_send_initial_windows.push_back(*queued_initial_window_size_);
421     // And since we'll initiate a write, any updates that are queued to be
422     // written will be considered and send their desired updates.
423     reconciled_transport_window += tfc_->DesiredAnnounceSize(true);
424     for (auto stream_id : streams_to_update_) {
425       auto* stream = GetStream(stream_id);
426       if (stream == nullptr) continue;
427       reconciled_stream_deltas[stream_id] += stream->fc.DesiredAnnounceSize();
428     }
429   }
430 
431   // Finally, if a stream has indicated it's willing to read, the reconciled
432   // remote *MUST* be in a state where it could send at least one byte.
433   for (const auto& id_stream : streams_) {
434     if (id_stream.second.fc.min_progress_size() == 0) continue;
435     int64_t stream_window =
436         reconciled_stream_deltas[id_stream.first] + reconciled_initial_window;
437     if (stream_window <= 0 || reconciled_transport_window <= 0) {
438       fprintf(stderr,
439               "FAILED: stream %d has stream_window=%" PRId64
440               ", transport_window=%" PRId64 ", delta=%" PRId64
441               ", init_window_size=%" PRId64 ", min_progress_size=%" PRId64
442               ", transport announced_stream_total_over_incoming_window=%" PRId64
443               ", transport announced_window=%" PRId64
444               " transport target_window=%" PRId64 " sent_init_window=%d\n",
445               id_stream.first, stream_window, reconciled_transport_window,
446               reconciled_stream_deltas[id_stream.first],
447               reconciled_initial_window,
448               (id_stream.second.fc.min_progress_size()),
449               tfc_->announced_stream_total_over_incoming_window(),
450               tfc_->announced_window(), tfc_->target_window(),
451               tfc_->sent_init_window());
452       fprintf(stderr,
453               "initial_window breakdown: remote=%" PRId32 ", in-flight={%s}\n",
454               remote_initial_window_size_,
455               absl::StrJoin(inflight_send_initial_windows, ",").c_str());
456       abort();
457     }
458   }
459 }
460 
AssertAnnouncedOverInitialWindowSizeCorrect() const461 void FlowControlFuzzer::AssertAnnouncedOverInitialWindowSizeCorrect() const {
462   int64_t value_from_streams = 0;
463 
464   for (const auto& id_stream : streams_) {
465     const auto& stream = id_stream.second;
466     if (stream.fc.announced_window_delta() > 0) {
467       value_from_streams += stream.fc.announced_window_delta();
468     }
469   }
470 
471   GPR_ASSERT(value_from_streams ==
472              tfc_->announced_stream_total_over_incoming_window());
473 }
474 
475 }  // namespace
476 }  // namespace chttp2
477 }  // namespace grpc_core
478 
DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg & msg)479 DEFINE_PROTO_FUZZER(const flow_control_fuzzer::Msg& msg) {
480   grpc_core::ApplyFuzzConfigVars(msg.config_vars());
481   grpc_core::TestOnlyReloadExperimentsFromConfigVariables();
482   grpc_core::chttp2::InitGlobals();
483   grpc_core::chttp2::FlowControlFuzzer fuzzer(msg.enable_bdp());
484   for (const auto& action : msg.actions()) {
485     if (!squelch) {
486       fprintf(stderr, "%s\n", action.DebugString().c_str());
487     }
488     fuzzer.Perform(action);
489     fuzzer.AssertNoneStuck();
490     fuzzer.AssertAnnouncedOverInitialWindowSizeCorrect();
491   }
492 }
493