• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/end2end/fuzzers/network_input.h"
16 
17 #include <grpc/slice.h>
18 #include <stddef.h>
19 #include <stdint.h>
20 
21 #include <algorithm>
22 #include <chrono>
23 #include <string>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/log/log.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/types/span.h"
30 #include "src/core/config/core_configuration.h"
31 #include "src/core/ext/transport/chaotic_good/frame_header.h"
32 #include "src/core/ext/transport/chttp2/transport/frame.h"
33 #include "src/core/ext/transport/chttp2/transport/varint.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_args_preconditioning.h"
36 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
37 #include "src/core/lib/event_engine/tcp_socket_utils.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_buffer.h"
41 #include "src/core/util/useful.h"
42 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
43 #include "test/core/test_util/mock_endpoint.h"
44 
45 using grpc_event_engine::experimental::EventEngine;
46 
47 namespace grpc_core {
48 
49 namespace {
SliceFromH2Frame(Http2Frame frame)50 grpc_slice SliceFromH2Frame(Http2Frame frame) {
51   SliceBuffer buffer;
52   Serialize(absl::Span<Http2Frame>(&frame, 1), buffer);
53   return buffer.JoinIntoSlice().TakeCSlice();
54 }
55 
SliceBufferFromBytes(const std::string & bytes)56 SliceBuffer SliceBufferFromBytes(const std::string& bytes) {
57   SliceBuffer buffer;
58   buffer.Append(Slice::FromCopiedString(bytes));
59   return buffer;
60 }
61 
AppendLength(size_t length,std::vector<uint8_t> * bytes)62 void AppendLength(size_t length, std::vector<uint8_t>* bytes) {
63   VarintWriter<1> writer(length);
64   uint8_t buffer[8];
65   writer.Write(0, buffer);
66   bytes->insert(bytes->end(), buffer, buffer + writer.length());
67 }
68 
SliceBufferFromSimpleHeaders(const fuzzer_input::SimpleHeaders & headers)69 SliceBuffer SliceBufferFromSimpleHeaders(
70     const fuzzer_input::SimpleHeaders& headers) {
71   std::vector<uint8_t> temp;
72   auto add_header = [&temp](absl::string_view key, absl::string_view value) {
73     temp.push_back(0);
74     AppendLength(key.length(), &temp);
75     temp.insert(temp.end(), key.begin(), key.end());
76     AppendLength(value.length(), &temp);
77     temp.insert(temp.end(), value.begin(), value.end());
78   };
79   if (headers.has_status()) {
80     add_header(":status", headers.status());
81   }
82   if (headers.has_scheme()) {
83     add_header(":scheme", headers.scheme());
84   }
85   if (headers.has_method()) {
86     add_header(":method", headers.method());
87   }
88   if (headers.has_authority()) {
89     add_header(":authority", headers.authority());
90   }
91   if (headers.has_path()) {
92     add_header(":path", headers.path());
93   }
94   for (const auto& header : headers.headers()) {
95     if (header.has_key() && header.has_value()) {
96       add_header(header.key(), header.value());
97       ;
98     }
99     if (header.has_raw_bytes()) {
100       for (auto c : header.raw_bytes()) {
101         temp.push_back(static_cast<uint8_t>(c));
102       }
103     }
104   }
105   if (headers.has_grpc_timeout()) {
106     add_header("grpc-timeout", headers.grpc_timeout());
107   }
108   if (headers.has_te()) {
109     add_header("te", headers.te());
110   }
111   if (headers.has_content_type()) {
112     add_header("content-type", headers.content_type());
113   }
114   if (headers.has_grpc_encoding()) {
115     add_header("grpc-encoding", headers.grpc_encoding());
116   }
117   if (headers.has_grpc_internal_encoding_request()) {
118     add_header("grpc-internal-encoding-request",
119                headers.grpc_internal_encoding_request());
120   }
121   if (headers.has_grpc_accept_encoding()) {
122     add_header("grpc-accept-encoding", headers.grpc_accept_encoding());
123   }
124   if (headers.has_user_agent()) {
125     add_header("user-agent", headers.user_agent());
126   }
127   if (headers.has_grpc_message()) {
128     add_header("grpc-message", headers.grpc_message());
129   }
130   if (headers.has_host()) {
131     add_header("host", headers.host());
132   }
133   if (headers.has_endpoint_load_metrics_bin()) {
134     add_header("endpoint-load-metrics-bin",
135                headers.endpoint_load_metrics_bin());
136   }
137   if (headers.has_grpc_server_stats_bin()) {
138     add_header("grpc-server-stats-bin", headers.grpc_server_stats_bin());
139   }
140   if (headers.has_grpc_trace_bin()) {
141     add_header("grpc-trace-bin", headers.grpc_trace_bin());
142   }
143   if (headers.has_grpc_tags_bin()) {
144     add_header("grpc-tags-bin", headers.grpc_tags_bin());
145   }
146   if (headers.has_x_envoy_peer_metadata()) {
147     add_header("x-envoy-peer-metadata", headers.x_envoy_peer_metadata());
148   }
149   if (headers.has_grpc_status()) {
150     add_header("grpc-status", headers.grpc_status());
151   }
152   if (headers.has_grpc_previous_rpc_attempts()) {
153     add_header("grpc-previous-rpc-attempts",
154                headers.grpc_previous_rpc_attempts());
155   }
156   if (headers.has_grpc_retry_pushback_ms()) {
157     add_header("grpc-retry-pushback-ms", headers.grpc_retry_pushback_ms());
158   }
159   if (headers.has_grpclb_client_stats()) {
160     add_header("grpclb_client_stats", headers.grpclb_client_stats());
161   }
162   if (headers.has_lb_token()) {
163     add_header("lb-token", headers.lb_token());
164   }
165   if (headers.has_lb_cost_bin()) {
166     add_header("lb-cost-bin", headers.lb_cost_bin());
167   }
168   if (headers.has_chaotic_good_connection_type()) {
169     add_header("chaotic-good-connection-type",
170                headers.chaotic_good_connection_type());
171   }
172   if (headers.has_chaotic_good_connection_id()) {
173     add_header("chaotic-good-connection-id",
174                headers.chaotic_good_connection_id());
175   }
176   if (headers.has_chaotic_good_alignment()) {
177     add_header("chaotic-good-alignment", headers.chaotic_good_alignment());
178   }
179   SliceBuffer buffer;
180   buffer.Append(Slice::FromCopiedBuffer(temp.data(), temp.size()));
181   return buffer;
182 }
183 
184 template <typename T>
SliceBufferFromHeaderPayload(const T & payload)185 SliceBuffer SliceBufferFromHeaderPayload(const T& payload) {
186   switch (payload.payload_case()) {
187     case T::kRawBytes:
188       return SliceBufferFromBytes(payload.raw_bytes());
189     case T::kSimpleHeader:
190       return SliceBufferFromSimpleHeaders(payload.simple_header());
191     case T::PAYLOAD_NOT_SET:
192       break;
193   }
194   return SliceBuffer();
195 }
196 
ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame & frame)197 SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
198   chaotic_good::FrameHeader h;
199   SliceBuffer suffix;
200   h.stream_id = frame.stream_id();
201   switch (frame.frame_type_case()) {
202     case fuzzer_input::ChaoticGoodFrame::kKnownType:
203       switch (frame.known_type()) {
204         case fuzzer_input::ChaoticGoodFrame::SETTINGS:
205           h.type = chaotic_good::FrameType::kSettings;
206           break;
207         case fuzzer_input::ChaoticGoodFrame::CLIENT_INITIAL_METADATA:
208           h.type = chaotic_good::FrameType::kClientInitialMetadata;
209           break;
210         case fuzzer_input::ChaoticGoodFrame::MESSAGE:
211           h.type = chaotic_good::FrameType::kMessage;
212           break;
213         case fuzzer_input::ChaoticGoodFrame::CLIENT_END_OF_STREAM:
214           h.type = chaotic_good::FrameType::kClientEndOfStream;
215           break;
216         case fuzzer_input::ChaoticGoodFrame::SERVER_INITIAL_METADATA:
217           h.type = chaotic_good::FrameType::kServerInitialMetadata;
218           break;
219         case fuzzer_input::ChaoticGoodFrame::SERVER_TRAILING_METADATA:
220           h.type = chaotic_good::FrameType::kServerTrailingMetadata;
221           break;
222         case fuzzer_input::ChaoticGoodFrame::CANCEL:
223           h.type = chaotic_good::FrameType::kCancel;
224           break;
225         default:
226           break;
227       }
228       break;
229     case fuzzer_input::ChaoticGoodFrame::kUnknownType:
230       h.type = static_cast<chaotic_good::FrameType>(frame.unknown_type());
231       break;
232     case fuzzer_input::ChaoticGoodFrame::FRAME_TYPE_NOT_SET:
233       h.type = chaotic_good::FrameType::kMessage;
234       break;
235   }
236   h.stream_id = frame.stream_id();
237   h.payload_connection_id = 0;
238   h.payload_length = 0;
239   auto proto_payload = [&](auto payload) {
240     std::string temp = payload.SerializeAsString();
241     h.payload_length = temp.length();
242     suffix.Append(Slice::FromCopiedString(temp));
243   };
244   switch (frame.payload_case()) {
245     case fuzzer_input::ChaoticGoodFrame::kPayloadNone:
246     case fuzzer_input::ChaoticGoodFrame::PAYLOAD_NOT_SET:
247       break;
248     case fuzzer_input::ChaoticGoodFrame::kPayloadRawBytes:
249       if (frame.payload_raw_bytes().empty()) break;
250       h.payload_length = frame.payload_raw_bytes().length();
251       suffix.Append(Slice::FromCopiedString(frame.payload_raw_bytes()));
252       break;
253     case fuzzer_input::ChaoticGoodFrame::kPayloadEmptyOfLength:
254       h.payload_length = frame.payload_empty_of_length();
255       suffix.Append(Slice::FromCopiedString(
256           std::string(frame.payload_empty_of_length(), 'a')));
257       break;
258     case fuzzer_input::ChaoticGoodFrame::kPayloadOtherConnectionId:
259       h.payload_connection_id =
260           frame.payload_other_connection_id().connection_id();
261       h.payload_length = frame.payload_other_connection_id().length();
262       break;
263     case fuzzer_input::ChaoticGoodFrame::kSettings:
264       proto_payload(frame.settings());
265       break;
266     case fuzzer_input::ChaoticGoodFrame::kClientMetadata:
267       proto_payload(frame.client_metadata());
268       break;
269     case fuzzer_input::ChaoticGoodFrame::kServerMetadata:
270       proto_payload(frame.server_metadata());
271       break;
272   }
273   uint8_t bytes[chaotic_good::FrameHeader::kFrameHeaderSize];
274   h.Serialize(bytes);
275   SliceBuffer out;
276   out.Append(Slice::FromCopiedBuffer(
277       bytes, chaotic_good::FrameHeader::kFrameHeaderSize));
278   out.Append(suffix);
279   return out;
280 }
281 
store32_little_endian(uint32_t value,unsigned char * buf)282 void store32_little_endian(uint32_t value, unsigned char* buf) {
283   buf[3] = static_cast<unsigned char>((value >> 24) & 0xFF);
284   buf[2] = static_cast<unsigned char>((value >> 16) & 0xFF);
285   buf[1] = static_cast<unsigned char>((value >> 8) & 0xFF);
286   buf[0] = static_cast<unsigned char>((value) & 0xFF);
287 }
288 
SliceFromSegment(const fuzzer_input::InputSegment & segment)289 grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
290   switch (segment.payload_case()) {
291     case fuzzer_input::InputSegment::kRawBytes:
292       return grpc_slice_from_copied_buffer(segment.raw_bytes().data(),
293                                            segment.raw_bytes().size());
294     case fuzzer_input::InputSegment::kData:
295       return SliceFromH2Frame(Http2DataFrame{
296           segment.data().stream_id(), segment.data().end_of_stream(),
297           SliceBufferFromBytes(segment.data().payload())});
298     case fuzzer_input::InputSegment::kHeader:
299       return SliceFromH2Frame(Http2HeaderFrame{
300           segment.header().stream_id(),
301           segment.header().end_headers(),
302           segment.header().end_stream(),
303           SliceBufferFromHeaderPayload(segment.header()),
304       });
305     case fuzzer_input::InputSegment::kContinuation:
306       return SliceFromH2Frame(Http2ContinuationFrame{
307           segment.continuation().stream_id(),
308           segment.continuation().end_headers(),
309           SliceBufferFromHeaderPayload(segment.header()),
310       });
311     case fuzzer_input::InputSegment::kRstStream:
312       return SliceFromH2Frame(Http2RstStreamFrame{
313           segment.rst_stream().stream_id(),
314           segment.rst_stream().error_code(),
315       });
316     case fuzzer_input::InputSegment::kSettings: {
317       std::vector<Http2SettingsFrame::Setting> settings;
318       for (const auto& setting : segment.settings().settings()) {
319         settings.push_back(Http2SettingsFrame::Setting{
320             static_cast<uint16_t>(setting.id()),
321             setting.value(),
322         });
323       }
324       return SliceFromH2Frame(Http2SettingsFrame{
325           segment.settings().ack(),
326           std::move(settings),
327       });
328     }
329     case fuzzer_input::InputSegment::kPing:
330       return SliceFromH2Frame(Http2PingFrame{
331           segment.ping().ack(),
332           segment.ping().opaque(),
333       });
334     case fuzzer_input::InputSegment::kGoaway:
335       return SliceFromH2Frame(Http2GoawayFrame{
336           segment.goaway().last_stream_id(), segment.goaway().error_code(),
337           Slice::FromCopiedString(segment.goaway().debug_data())});
338     case fuzzer_input::InputSegment::kWindowUpdate:
339       return SliceFromH2Frame(Http2WindowUpdateFrame{
340           segment.window_update().stream_id(),
341           segment.window_update().increment(),
342       });
343     case fuzzer_input::InputSegment::kSecurityFrame:
344       return SliceFromH2Frame(Http2SecurityFrame{
345           SliceBufferFromBytes(segment.security_frame().payload())});
346     case fuzzer_input::InputSegment::kClientPrefix:
347       return grpc_slice_from_static_string("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
348     case fuzzer_input::InputSegment::kRepeatedZeros: {
349       std::vector<char> zeros;
350       zeros.resize(std::min<size_t>(segment.repeated_zeros(), 128 * 1024), 0);
351       return grpc_slice_from_copied_buffer(zeros.data(), zeros.size());
352     }
353     case fuzzer_input::InputSegment::kChaoticGood: {
354       return ChaoticGoodFrame(segment.chaotic_good())
355           .JoinIntoSlice()
356           .TakeCSlice();
357     } break;
358     case fuzzer_input::InputSegment::kFakeTransportFrame: {
359       auto generate = [](absl::string_view payload) {
360         uint32_t length = payload.length();
361         std::vector<unsigned char> bytes;
362         bytes.resize(4);
363         store32_little_endian(length + 4, bytes.data());
364         for (auto c : payload) {
365           bytes.push_back(static_cast<unsigned char>(c));
366         }
367         return grpc_slice_from_copied_buffer(
368             reinterpret_cast<const char*>(bytes.data()), bytes.size());
369       };
370       switch (segment.fake_transport_frame().payload_case()) {
371         case fuzzer_input::FakeTransportFrame::kRawBytes:
372           return generate(segment.fake_transport_frame().raw_bytes());
373         case fuzzer_input::FakeTransportFrame::kMessageString:
374           switch (segment.fake_transport_frame().message_string()) {
375             default:
376               return generate("UNKNOWN");
377             case fuzzer_input::FakeTransportFrame::CLIENT_INIT:
378               return generate("CLIENT_INIT");
379             case fuzzer_input::FakeTransportFrame::SERVER_INIT:
380               return generate("SERVER_INIT");
381             case fuzzer_input::FakeTransportFrame::CLIENT_FINISHED:
382               return generate("CLIENT_FINISHED");
383             case fuzzer_input::FakeTransportFrame::SERVER_FINISHED:
384               return generate("SERVER_FINISHED");
385           }
386         case fuzzer_input::FakeTransportFrame::PAYLOAD_NOT_SET:
387           return generate("");
388       }
389     }
390     case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
391       break;
392   }
393   return grpc_empty_slice();
394 }
395 
396 struct QueuedRead {
QueuedReadgrpc_core::__anon9f658e4a0111::QueuedRead397   QueuedRead(int delay_ms, SliceBuffer slices)
398       : delay_ms(delay_ms), slices(std::move(slices)) {}
399   int delay_ms;
400   SliceBuffer slices;
401 };
402 
MakeSchedule(const fuzzer_input::NetworkInput & network_input)403 std::vector<QueuedRead> MakeSchedule(
404     const fuzzer_input::NetworkInput& network_input) {
405   std::vector<QueuedRead> schedule;
406   switch (network_input.value_case()) {
407     case fuzzer_input::NetworkInput::kSingleReadBytes: {
408       schedule.emplace_back(0, SliceBuffer(Slice::FromCopiedBuffer(
409                                    network_input.single_read_bytes().data(),
410                                    network_input.single_read_bytes().size())));
411     } break;
412     case fuzzer_input::NetworkInput::kInputSegments: {
413       int delay_ms = 0;
414       SliceBuffer building;
415       for (const auto& segment : network_input.input_segments().segments()) {
416         const int segment_delay = Clamp(segment.delay_ms(), 0, 1000);
417         if (segment_delay != 0) {
418           delay_ms += segment_delay;
419           if (building.Length() != 0) {
420             schedule.emplace_back(delay_ms, std::move(building));
421           }
422           building.Clear();
423         }
424         building.Append(Slice(SliceFromSegment(segment)));
425       }
426       if (building.Length() != 0) {
427         ++delay_ms;
428         schedule.emplace_back(delay_ms, std::move(building));
429       }
430     } break;
431     case fuzzer_input::NetworkInput::VALUE_NOT_SET:
432       break;
433   }
434   return schedule;
435 }
436 
437 }  // namespace
438 
ScheduleReads(const fuzzer_input::NetworkInput & network_input,std::shared_ptr<grpc_event_engine::experimental::MockEndpointController> mock_endpoint_controller,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)439 Duration ScheduleReads(
440     const fuzzer_input::NetworkInput& network_input,
441     std::shared_ptr<grpc_event_engine::experimental::MockEndpointController>
442         mock_endpoint_controller,
443     grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
444   int delay = 0;
445   for (const auto& q : MakeSchedule(network_input)) {
446     event_engine->RunAfterExactly(
447         std::chrono::milliseconds(q.delay_ms),
448         [mock_endpoint_controller,
449          slices = q.slices.JoinIntoSlice()]() mutable {
450           ExecCtx exec_ctx;
451           mock_endpoint_controller->TriggerReadEvent(
452               std::move(grpc_event_engine::experimental::internal::SliceCast<
453                         grpc_event_engine::experimental::Slice>(slices)));
454         });
455     delay = std::max(delay, q.delay_ms);
456   }
457   event_engine->RunAfterExactly(std::chrono::milliseconds(delay + 1),
458                                 [mock_endpoint_controller] {
459                                   ExecCtx exec_ctx;
460                                   mock_endpoint_controller->NoMoreReads();
461                                 });
462   return Duration::Milliseconds(delay + 2);
463 }
464 
465 namespace {
466 
ReadForever(std::shared_ptr<EventEngine::Endpoint> ep)467 void ReadForever(std::shared_ptr<EventEngine::Endpoint> ep) {
468   bool finished;
469   do {
470     auto buffer =
471         std::make_unique<grpc_event_engine::experimental::SliceBuffer>();
472     auto buffer_ptr = buffer.get();
473     finished = ep->Read(
474         [ep, buffer = std::move(buffer)](absl::Status status) mutable {
475           ExecCtx exec_ctx;
476           if (!status.ok()) return;
477           ReadForever(std::move(ep));
478         },
479         buffer_ptr, nullptr);
480   } while (finished);
481 }
482 
ScheduleWritesForReads(std::shared_ptr<EventEngine::Endpoint> ep,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,std::vector<QueuedRead> schedule)483 void ScheduleWritesForReads(
484     std::shared_ptr<EventEngine::Endpoint> ep,
485     grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
486     std::vector<QueuedRead> schedule) {
487   class Scheduler {
488    public:
489     Scheduler(std::shared_ptr<EventEngine::Endpoint> ep,
490               grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
491               std::vector<QueuedRead> schedule)
492         : ep_(std::move(ep)),
493           event_engine_(event_engine),
494           schedule_(std::move(schedule)),
495           it_(schedule_.begin()) {
496       ScheduleNext();
497     }
498 
499    private:
500     void ScheduleNext() {
501       if (it_ == schedule_.end()) {
502         delete this;
503         return;
504       }
505       event_engine_->RunAfterExactly(
506           Duration::Milliseconds(it_->delay_ms - delay_consumed_),
507           [this]() mutable {
508             ExecCtx exec_ctx;
509             delay_consumed_ = it_->delay_ms;
510             writing_.Clear();
511             writing_.Append(
512                 grpc_event_engine::experimental::internal::SliceCast<
513                     grpc_event_engine::experimental::Slice>(
514                     it_->slices.JoinIntoSlice()));
515             if (ep_->Write(
516                     [this](absl::Status status) {
517                       ExecCtx exec_ctx;
518                       FinishWrite(std::move(status));
519                     },
520                     &writing_, nullptr)) {
521               FinishWrite(absl::OkStatus());
522             }
523           });
524     }
525 
526     void FinishWrite(absl::Status status) {
527       if (!status.ok()) {
528         it_ = schedule_.end();
529       } else {
530         ++it_;
531       }
532       ScheduleNext();
533     }
534 
535     std::shared_ptr<EventEngine::Endpoint> ep_;
536     grpc_event_engine::experimental::FuzzingEventEngine* event_engine_;
537     std::vector<QueuedRead> schedule_;
538     std::vector<QueuedRead>::iterator it_;
539     grpc_event_engine::experimental::SliceBuffer writing_;
540     int delay_consumed_ = 0;
541   };
542   new Scheduler(std::move(ep), event_engine, std::move(schedule));
543 }
544 
545 }  // namespace
546 
ScheduleConnection(const fuzzer_input::NetworkInput & network_input,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,testing::FuzzingEnvironment environment,int port)547 Duration ScheduleConnection(
548     const fuzzer_input::NetworkInput& network_input,
549     grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
550     testing::FuzzingEnvironment environment, int port) {
551   ChannelArgs channel_args =
552       CoreConfiguration::Get()
553           .channel_args_preconditioning()
554           .PreconditionChannelArgs(
555               CreateChannelArgsFromFuzzingConfiguration(
556                   network_input.endpoint_config(), environment)
557                   .ToC()
558                   .get());
559   auto schedule = MakeSchedule(network_input);
560   Duration delay = Duration::Zero();
561   for (const auto& q : schedule) {
562     delay = std::max(
563         delay,
564         Duration::Milliseconds(q.delay_ms) +
565             Duration::NanosecondsRoundUp(
566                 (q.slices.Length() * event_engine->max_delay_write()).count()));
567   }
568   delay += Duration::Milliseconds(network_input.connect_delay_ms()) +
569            Duration::Milliseconds(network_input.connect_timeout_ms());
570   event_engine->RunAfterExactly(
571       Duration::Milliseconds(network_input.connect_delay_ms()),
572       [event_engine, channel_args,
573        connect_timeout_ms = network_input.connect_timeout_ms(),
574        schedule = std::move(schedule), port]() mutable {
575         ExecCtx exec_ctx;
576         event_engine->Connect(
577             [event_engine, schedule = std::move(schedule)](
578                 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
579                     endpoint) mutable {
580               ExecCtx exec_ctx;
581               if (!endpoint.ok()) {
582                 LOG(ERROR) << "Failed to connect: " << endpoint.status();
583                 return;
584               }
585               std::shared_ptr<EventEngine::Endpoint> ep =
586                   std::move(endpoint.value());
587               ReadForever(ep);
588               ScheduleWritesForReads(std::move(ep), event_engine,
589                                      std::move(schedule));
590             },
591             grpc_event_engine::experimental::ResolvedAddressMakeWild4(port),
592             grpc_event_engine::experimental::ChannelArgsEndpointConfig(
593                 channel_args),
594             channel_args.GetObject<ResourceQuota>()
595                 ->memory_quota()
596                 ->CreateMemoryAllocator("fuzzer"),
597             Duration::Milliseconds(connect_timeout_ms));
598       });
599   return delay;
600 }
601 
ScheduleWrites(const fuzzer_input::NetworkInput & network_input,std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> endpoint,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)602 void ScheduleWrites(
603     const fuzzer_input::NetworkInput& network_input,
604     std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
605         endpoint,
606     grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
607   auto schedule = MakeSchedule(network_input);
608   auto ep = std::shared_ptr<EventEngine::Endpoint>(std::move(endpoint));
609   ReadForever(ep);
610   ScheduleWritesForReads(ep, event_engine, std::move(schedule));
611 }
612 
613 }  // namespace grpc_core
614