1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/end2end/fuzzers/network_input.h"
16
17 #include <grpc/slice.h>
18 #include <stddef.h>
19 #include <stdint.h>
20
21 #include <algorithm>
22 #include <chrono>
23 #include <string>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/log/log.h"
28 #include "absl/strings/string_view.h"
29 #include "absl/types/span.h"
30 #include "src/core/config/core_configuration.h"
31 #include "src/core/ext/transport/chaotic_good/frame_header.h"
32 #include "src/core/ext/transport/chttp2/transport/frame.h"
33 #include "src/core/ext/transport/chttp2/transport/varint.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_args_preconditioning.h"
36 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
37 #include "src/core/lib/event_engine/tcp_socket_utils.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_buffer.h"
41 #include "src/core/util/useful.h"
42 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
43 #include "test/core/test_util/mock_endpoint.h"
44
45 using grpc_event_engine::experimental::EventEngine;
46
47 namespace grpc_core {
48
49 namespace {
SliceFromH2Frame(Http2Frame frame)50 grpc_slice SliceFromH2Frame(Http2Frame frame) {
51 SliceBuffer buffer;
52 Serialize(absl::Span<Http2Frame>(&frame, 1), buffer);
53 return buffer.JoinIntoSlice().TakeCSlice();
54 }
55
SliceBufferFromBytes(const std::string & bytes)56 SliceBuffer SliceBufferFromBytes(const std::string& bytes) {
57 SliceBuffer buffer;
58 buffer.Append(Slice::FromCopiedString(bytes));
59 return buffer;
60 }
61
AppendLength(size_t length,std::vector<uint8_t> * bytes)62 void AppendLength(size_t length, std::vector<uint8_t>* bytes) {
63 VarintWriter<1> writer(length);
64 uint8_t buffer[8];
65 writer.Write(0, buffer);
66 bytes->insert(bytes->end(), buffer, buffer + writer.length());
67 }
68
SliceBufferFromSimpleHeaders(const fuzzer_input::SimpleHeaders & headers)69 SliceBuffer SliceBufferFromSimpleHeaders(
70 const fuzzer_input::SimpleHeaders& headers) {
71 std::vector<uint8_t> temp;
72 auto add_header = [&temp](absl::string_view key, absl::string_view value) {
73 temp.push_back(0);
74 AppendLength(key.length(), &temp);
75 temp.insert(temp.end(), key.begin(), key.end());
76 AppendLength(value.length(), &temp);
77 temp.insert(temp.end(), value.begin(), value.end());
78 };
79 if (headers.has_status()) {
80 add_header(":status", headers.status());
81 }
82 if (headers.has_scheme()) {
83 add_header(":scheme", headers.scheme());
84 }
85 if (headers.has_method()) {
86 add_header(":method", headers.method());
87 }
88 if (headers.has_authority()) {
89 add_header(":authority", headers.authority());
90 }
91 if (headers.has_path()) {
92 add_header(":path", headers.path());
93 }
94 for (const auto& header : headers.headers()) {
95 if (header.has_key() && header.has_value()) {
96 add_header(header.key(), header.value());
97 ;
98 }
99 if (header.has_raw_bytes()) {
100 for (auto c : header.raw_bytes()) {
101 temp.push_back(static_cast<uint8_t>(c));
102 }
103 }
104 }
105 if (headers.has_grpc_timeout()) {
106 add_header("grpc-timeout", headers.grpc_timeout());
107 }
108 if (headers.has_te()) {
109 add_header("te", headers.te());
110 }
111 if (headers.has_content_type()) {
112 add_header("content-type", headers.content_type());
113 }
114 if (headers.has_grpc_encoding()) {
115 add_header("grpc-encoding", headers.grpc_encoding());
116 }
117 if (headers.has_grpc_internal_encoding_request()) {
118 add_header("grpc-internal-encoding-request",
119 headers.grpc_internal_encoding_request());
120 }
121 if (headers.has_grpc_accept_encoding()) {
122 add_header("grpc-accept-encoding", headers.grpc_accept_encoding());
123 }
124 if (headers.has_user_agent()) {
125 add_header("user-agent", headers.user_agent());
126 }
127 if (headers.has_grpc_message()) {
128 add_header("grpc-message", headers.grpc_message());
129 }
130 if (headers.has_host()) {
131 add_header("host", headers.host());
132 }
133 if (headers.has_endpoint_load_metrics_bin()) {
134 add_header("endpoint-load-metrics-bin",
135 headers.endpoint_load_metrics_bin());
136 }
137 if (headers.has_grpc_server_stats_bin()) {
138 add_header("grpc-server-stats-bin", headers.grpc_server_stats_bin());
139 }
140 if (headers.has_grpc_trace_bin()) {
141 add_header("grpc-trace-bin", headers.grpc_trace_bin());
142 }
143 if (headers.has_grpc_tags_bin()) {
144 add_header("grpc-tags-bin", headers.grpc_tags_bin());
145 }
146 if (headers.has_x_envoy_peer_metadata()) {
147 add_header("x-envoy-peer-metadata", headers.x_envoy_peer_metadata());
148 }
149 if (headers.has_grpc_status()) {
150 add_header("grpc-status", headers.grpc_status());
151 }
152 if (headers.has_grpc_previous_rpc_attempts()) {
153 add_header("grpc-previous-rpc-attempts",
154 headers.grpc_previous_rpc_attempts());
155 }
156 if (headers.has_grpc_retry_pushback_ms()) {
157 add_header("grpc-retry-pushback-ms", headers.grpc_retry_pushback_ms());
158 }
159 if (headers.has_grpclb_client_stats()) {
160 add_header("grpclb_client_stats", headers.grpclb_client_stats());
161 }
162 if (headers.has_lb_token()) {
163 add_header("lb-token", headers.lb_token());
164 }
165 if (headers.has_lb_cost_bin()) {
166 add_header("lb-cost-bin", headers.lb_cost_bin());
167 }
168 if (headers.has_chaotic_good_connection_type()) {
169 add_header("chaotic-good-connection-type",
170 headers.chaotic_good_connection_type());
171 }
172 if (headers.has_chaotic_good_connection_id()) {
173 add_header("chaotic-good-connection-id",
174 headers.chaotic_good_connection_id());
175 }
176 if (headers.has_chaotic_good_alignment()) {
177 add_header("chaotic-good-alignment", headers.chaotic_good_alignment());
178 }
179 SliceBuffer buffer;
180 buffer.Append(Slice::FromCopiedBuffer(temp.data(), temp.size()));
181 return buffer;
182 }
183
184 template <typename T>
SliceBufferFromHeaderPayload(const T & payload)185 SliceBuffer SliceBufferFromHeaderPayload(const T& payload) {
186 switch (payload.payload_case()) {
187 case T::kRawBytes:
188 return SliceBufferFromBytes(payload.raw_bytes());
189 case T::kSimpleHeader:
190 return SliceBufferFromSimpleHeaders(payload.simple_header());
191 case T::PAYLOAD_NOT_SET:
192 break;
193 }
194 return SliceBuffer();
195 }
196
ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame & frame)197 SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
198 chaotic_good::FrameHeader h;
199 SliceBuffer suffix;
200 h.stream_id = frame.stream_id();
201 switch (frame.frame_type_case()) {
202 case fuzzer_input::ChaoticGoodFrame::kKnownType:
203 switch (frame.known_type()) {
204 case fuzzer_input::ChaoticGoodFrame::SETTINGS:
205 h.type = chaotic_good::FrameType::kSettings;
206 break;
207 case fuzzer_input::ChaoticGoodFrame::CLIENT_INITIAL_METADATA:
208 h.type = chaotic_good::FrameType::kClientInitialMetadata;
209 break;
210 case fuzzer_input::ChaoticGoodFrame::MESSAGE:
211 h.type = chaotic_good::FrameType::kMessage;
212 break;
213 case fuzzer_input::ChaoticGoodFrame::CLIENT_END_OF_STREAM:
214 h.type = chaotic_good::FrameType::kClientEndOfStream;
215 break;
216 case fuzzer_input::ChaoticGoodFrame::SERVER_INITIAL_METADATA:
217 h.type = chaotic_good::FrameType::kServerInitialMetadata;
218 break;
219 case fuzzer_input::ChaoticGoodFrame::SERVER_TRAILING_METADATA:
220 h.type = chaotic_good::FrameType::kServerTrailingMetadata;
221 break;
222 case fuzzer_input::ChaoticGoodFrame::CANCEL:
223 h.type = chaotic_good::FrameType::kCancel;
224 break;
225 default:
226 break;
227 }
228 break;
229 case fuzzer_input::ChaoticGoodFrame::kUnknownType:
230 h.type = static_cast<chaotic_good::FrameType>(frame.unknown_type());
231 break;
232 case fuzzer_input::ChaoticGoodFrame::FRAME_TYPE_NOT_SET:
233 h.type = chaotic_good::FrameType::kMessage;
234 break;
235 }
236 h.stream_id = frame.stream_id();
237 h.payload_connection_id = 0;
238 h.payload_length = 0;
239 auto proto_payload = [&](auto payload) {
240 std::string temp = payload.SerializeAsString();
241 h.payload_length = temp.length();
242 suffix.Append(Slice::FromCopiedString(temp));
243 };
244 switch (frame.payload_case()) {
245 case fuzzer_input::ChaoticGoodFrame::kPayloadNone:
246 case fuzzer_input::ChaoticGoodFrame::PAYLOAD_NOT_SET:
247 break;
248 case fuzzer_input::ChaoticGoodFrame::kPayloadRawBytes:
249 if (frame.payload_raw_bytes().empty()) break;
250 h.payload_length = frame.payload_raw_bytes().length();
251 suffix.Append(Slice::FromCopiedString(frame.payload_raw_bytes()));
252 break;
253 case fuzzer_input::ChaoticGoodFrame::kPayloadEmptyOfLength:
254 h.payload_length = frame.payload_empty_of_length();
255 suffix.Append(Slice::FromCopiedString(
256 std::string(frame.payload_empty_of_length(), 'a')));
257 break;
258 case fuzzer_input::ChaoticGoodFrame::kPayloadOtherConnectionId:
259 h.payload_connection_id =
260 frame.payload_other_connection_id().connection_id();
261 h.payload_length = frame.payload_other_connection_id().length();
262 break;
263 case fuzzer_input::ChaoticGoodFrame::kSettings:
264 proto_payload(frame.settings());
265 break;
266 case fuzzer_input::ChaoticGoodFrame::kClientMetadata:
267 proto_payload(frame.client_metadata());
268 break;
269 case fuzzer_input::ChaoticGoodFrame::kServerMetadata:
270 proto_payload(frame.server_metadata());
271 break;
272 }
273 uint8_t bytes[chaotic_good::FrameHeader::kFrameHeaderSize];
274 h.Serialize(bytes);
275 SliceBuffer out;
276 out.Append(Slice::FromCopiedBuffer(
277 bytes, chaotic_good::FrameHeader::kFrameHeaderSize));
278 out.Append(suffix);
279 return out;
280 }
281
store32_little_endian(uint32_t value,unsigned char * buf)282 void store32_little_endian(uint32_t value, unsigned char* buf) {
283 buf[3] = static_cast<unsigned char>((value >> 24) & 0xFF);
284 buf[2] = static_cast<unsigned char>((value >> 16) & 0xFF);
285 buf[1] = static_cast<unsigned char>((value >> 8) & 0xFF);
286 buf[0] = static_cast<unsigned char>((value) & 0xFF);
287 }
288
SliceFromSegment(const fuzzer_input::InputSegment & segment)289 grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
290 switch (segment.payload_case()) {
291 case fuzzer_input::InputSegment::kRawBytes:
292 return grpc_slice_from_copied_buffer(segment.raw_bytes().data(),
293 segment.raw_bytes().size());
294 case fuzzer_input::InputSegment::kData:
295 return SliceFromH2Frame(Http2DataFrame{
296 segment.data().stream_id(), segment.data().end_of_stream(),
297 SliceBufferFromBytes(segment.data().payload())});
298 case fuzzer_input::InputSegment::kHeader:
299 return SliceFromH2Frame(Http2HeaderFrame{
300 segment.header().stream_id(),
301 segment.header().end_headers(),
302 segment.header().end_stream(),
303 SliceBufferFromHeaderPayload(segment.header()),
304 });
305 case fuzzer_input::InputSegment::kContinuation:
306 return SliceFromH2Frame(Http2ContinuationFrame{
307 segment.continuation().stream_id(),
308 segment.continuation().end_headers(),
309 SliceBufferFromHeaderPayload(segment.header()),
310 });
311 case fuzzer_input::InputSegment::kRstStream:
312 return SliceFromH2Frame(Http2RstStreamFrame{
313 segment.rst_stream().stream_id(),
314 segment.rst_stream().error_code(),
315 });
316 case fuzzer_input::InputSegment::kSettings: {
317 std::vector<Http2SettingsFrame::Setting> settings;
318 for (const auto& setting : segment.settings().settings()) {
319 settings.push_back(Http2SettingsFrame::Setting{
320 static_cast<uint16_t>(setting.id()),
321 setting.value(),
322 });
323 }
324 return SliceFromH2Frame(Http2SettingsFrame{
325 segment.settings().ack(),
326 std::move(settings),
327 });
328 }
329 case fuzzer_input::InputSegment::kPing:
330 return SliceFromH2Frame(Http2PingFrame{
331 segment.ping().ack(),
332 segment.ping().opaque(),
333 });
334 case fuzzer_input::InputSegment::kGoaway:
335 return SliceFromH2Frame(Http2GoawayFrame{
336 segment.goaway().last_stream_id(), segment.goaway().error_code(),
337 Slice::FromCopiedString(segment.goaway().debug_data())});
338 case fuzzer_input::InputSegment::kWindowUpdate:
339 return SliceFromH2Frame(Http2WindowUpdateFrame{
340 segment.window_update().stream_id(),
341 segment.window_update().increment(),
342 });
343 case fuzzer_input::InputSegment::kSecurityFrame:
344 return SliceFromH2Frame(Http2SecurityFrame{
345 SliceBufferFromBytes(segment.security_frame().payload())});
346 case fuzzer_input::InputSegment::kClientPrefix:
347 return grpc_slice_from_static_string("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
348 case fuzzer_input::InputSegment::kRepeatedZeros: {
349 std::vector<char> zeros;
350 zeros.resize(std::min<size_t>(segment.repeated_zeros(), 128 * 1024), 0);
351 return grpc_slice_from_copied_buffer(zeros.data(), zeros.size());
352 }
353 case fuzzer_input::InputSegment::kChaoticGood: {
354 return ChaoticGoodFrame(segment.chaotic_good())
355 .JoinIntoSlice()
356 .TakeCSlice();
357 } break;
358 case fuzzer_input::InputSegment::kFakeTransportFrame: {
359 auto generate = [](absl::string_view payload) {
360 uint32_t length = payload.length();
361 std::vector<unsigned char> bytes;
362 bytes.resize(4);
363 store32_little_endian(length + 4, bytes.data());
364 for (auto c : payload) {
365 bytes.push_back(static_cast<unsigned char>(c));
366 }
367 return grpc_slice_from_copied_buffer(
368 reinterpret_cast<const char*>(bytes.data()), bytes.size());
369 };
370 switch (segment.fake_transport_frame().payload_case()) {
371 case fuzzer_input::FakeTransportFrame::kRawBytes:
372 return generate(segment.fake_transport_frame().raw_bytes());
373 case fuzzer_input::FakeTransportFrame::kMessageString:
374 switch (segment.fake_transport_frame().message_string()) {
375 default:
376 return generate("UNKNOWN");
377 case fuzzer_input::FakeTransportFrame::CLIENT_INIT:
378 return generate("CLIENT_INIT");
379 case fuzzer_input::FakeTransportFrame::SERVER_INIT:
380 return generate("SERVER_INIT");
381 case fuzzer_input::FakeTransportFrame::CLIENT_FINISHED:
382 return generate("CLIENT_FINISHED");
383 case fuzzer_input::FakeTransportFrame::SERVER_FINISHED:
384 return generate("SERVER_FINISHED");
385 }
386 case fuzzer_input::FakeTransportFrame::PAYLOAD_NOT_SET:
387 return generate("");
388 }
389 }
390 case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
391 break;
392 }
393 return grpc_empty_slice();
394 }
395
396 struct QueuedRead {
QueuedReadgrpc_core::__anon9f658e4a0111::QueuedRead397 QueuedRead(int delay_ms, SliceBuffer slices)
398 : delay_ms(delay_ms), slices(std::move(slices)) {}
399 int delay_ms;
400 SliceBuffer slices;
401 };
402
MakeSchedule(const fuzzer_input::NetworkInput & network_input)403 std::vector<QueuedRead> MakeSchedule(
404 const fuzzer_input::NetworkInput& network_input) {
405 std::vector<QueuedRead> schedule;
406 switch (network_input.value_case()) {
407 case fuzzer_input::NetworkInput::kSingleReadBytes: {
408 schedule.emplace_back(0, SliceBuffer(Slice::FromCopiedBuffer(
409 network_input.single_read_bytes().data(),
410 network_input.single_read_bytes().size())));
411 } break;
412 case fuzzer_input::NetworkInput::kInputSegments: {
413 int delay_ms = 0;
414 SliceBuffer building;
415 for (const auto& segment : network_input.input_segments().segments()) {
416 const int segment_delay = Clamp(segment.delay_ms(), 0, 1000);
417 if (segment_delay != 0) {
418 delay_ms += segment_delay;
419 if (building.Length() != 0) {
420 schedule.emplace_back(delay_ms, std::move(building));
421 }
422 building.Clear();
423 }
424 building.Append(Slice(SliceFromSegment(segment)));
425 }
426 if (building.Length() != 0) {
427 ++delay_ms;
428 schedule.emplace_back(delay_ms, std::move(building));
429 }
430 } break;
431 case fuzzer_input::NetworkInput::VALUE_NOT_SET:
432 break;
433 }
434 return schedule;
435 }
436
437 } // namespace
438
ScheduleReads(const fuzzer_input::NetworkInput & network_input,std::shared_ptr<grpc_event_engine::experimental::MockEndpointController> mock_endpoint_controller,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)439 Duration ScheduleReads(
440 const fuzzer_input::NetworkInput& network_input,
441 std::shared_ptr<grpc_event_engine::experimental::MockEndpointController>
442 mock_endpoint_controller,
443 grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
444 int delay = 0;
445 for (const auto& q : MakeSchedule(network_input)) {
446 event_engine->RunAfterExactly(
447 std::chrono::milliseconds(q.delay_ms),
448 [mock_endpoint_controller,
449 slices = q.slices.JoinIntoSlice()]() mutable {
450 ExecCtx exec_ctx;
451 mock_endpoint_controller->TriggerReadEvent(
452 std::move(grpc_event_engine::experimental::internal::SliceCast<
453 grpc_event_engine::experimental::Slice>(slices)));
454 });
455 delay = std::max(delay, q.delay_ms);
456 }
457 event_engine->RunAfterExactly(std::chrono::milliseconds(delay + 1),
458 [mock_endpoint_controller] {
459 ExecCtx exec_ctx;
460 mock_endpoint_controller->NoMoreReads();
461 });
462 return Duration::Milliseconds(delay + 2);
463 }
464
465 namespace {
466
ReadForever(std::shared_ptr<EventEngine::Endpoint> ep)467 void ReadForever(std::shared_ptr<EventEngine::Endpoint> ep) {
468 bool finished;
469 do {
470 auto buffer =
471 std::make_unique<grpc_event_engine::experimental::SliceBuffer>();
472 auto buffer_ptr = buffer.get();
473 finished = ep->Read(
474 [ep, buffer = std::move(buffer)](absl::Status status) mutable {
475 ExecCtx exec_ctx;
476 if (!status.ok()) return;
477 ReadForever(std::move(ep));
478 },
479 buffer_ptr, nullptr);
480 } while (finished);
481 }
482
ScheduleWritesForReads(std::shared_ptr<EventEngine::Endpoint> ep,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,std::vector<QueuedRead> schedule)483 void ScheduleWritesForReads(
484 std::shared_ptr<EventEngine::Endpoint> ep,
485 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
486 std::vector<QueuedRead> schedule) {
487 class Scheduler {
488 public:
489 Scheduler(std::shared_ptr<EventEngine::Endpoint> ep,
490 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
491 std::vector<QueuedRead> schedule)
492 : ep_(std::move(ep)),
493 event_engine_(event_engine),
494 schedule_(std::move(schedule)),
495 it_(schedule_.begin()) {
496 ScheduleNext();
497 }
498
499 private:
500 void ScheduleNext() {
501 if (it_ == schedule_.end()) {
502 delete this;
503 return;
504 }
505 event_engine_->RunAfterExactly(
506 Duration::Milliseconds(it_->delay_ms - delay_consumed_),
507 [this]() mutable {
508 ExecCtx exec_ctx;
509 delay_consumed_ = it_->delay_ms;
510 writing_.Clear();
511 writing_.Append(
512 grpc_event_engine::experimental::internal::SliceCast<
513 grpc_event_engine::experimental::Slice>(
514 it_->slices.JoinIntoSlice()));
515 if (ep_->Write(
516 [this](absl::Status status) {
517 ExecCtx exec_ctx;
518 FinishWrite(std::move(status));
519 },
520 &writing_, nullptr)) {
521 FinishWrite(absl::OkStatus());
522 }
523 });
524 }
525
526 void FinishWrite(absl::Status status) {
527 if (!status.ok()) {
528 it_ = schedule_.end();
529 } else {
530 ++it_;
531 }
532 ScheduleNext();
533 }
534
535 std::shared_ptr<EventEngine::Endpoint> ep_;
536 grpc_event_engine::experimental::FuzzingEventEngine* event_engine_;
537 std::vector<QueuedRead> schedule_;
538 std::vector<QueuedRead>::iterator it_;
539 grpc_event_engine::experimental::SliceBuffer writing_;
540 int delay_consumed_ = 0;
541 };
542 new Scheduler(std::move(ep), event_engine, std::move(schedule));
543 }
544
545 } // namespace
546
ScheduleConnection(const fuzzer_input::NetworkInput & network_input,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,testing::FuzzingEnvironment environment,int port)547 Duration ScheduleConnection(
548 const fuzzer_input::NetworkInput& network_input,
549 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
550 testing::FuzzingEnvironment environment, int port) {
551 ChannelArgs channel_args =
552 CoreConfiguration::Get()
553 .channel_args_preconditioning()
554 .PreconditionChannelArgs(
555 CreateChannelArgsFromFuzzingConfiguration(
556 network_input.endpoint_config(), environment)
557 .ToC()
558 .get());
559 auto schedule = MakeSchedule(network_input);
560 Duration delay = Duration::Zero();
561 for (const auto& q : schedule) {
562 delay = std::max(
563 delay,
564 Duration::Milliseconds(q.delay_ms) +
565 Duration::NanosecondsRoundUp(
566 (q.slices.Length() * event_engine->max_delay_write()).count()));
567 }
568 delay += Duration::Milliseconds(network_input.connect_delay_ms()) +
569 Duration::Milliseconds(network_input.connect_timeout_ms());
570 event_engine->RunAfterExactly(
571 Duration::Milliseconds(network_input.connect_delay_ms()),
572 [event_engine, channel_args,
573 connect_timeout_ms = network_input.connect_timeout_ms(),
574 schedule = std::move(schedule), port]() mutable {
575 ExecCtx exec_ctx;
576 event_engine->Connect(
577 [event_engine, schedule = std::move(schedule)](
578 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
579 endpoint) mutable {
580 ExecCtx exec_ctx;
581 if (!endpoint.ok()) {
582 LOG(ERROR) << "Failed to connect: " << endpoint.status();
583 return;
584 }
585 std::shared_ptr<EventEngine::Endpoint> ep =
586 std::move(endpoint.value());
587 ReadForever(ep);
588 ScheduleWritesForReads(std::move(ep), event_engine,
589 std::move(schedule));
590 },
591 grpc_event_engine::experimental::ResolvedAddressMakeWild4(port),
592 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
593 channel_args),
594 channel_args.GetObject<ResourceQuota>()
595 ->memory_quota()
596 ->CreateMemoryAllocator("fuzzer"),
597 Duration::Milliseconds(connect_timeout_ms));
598 });
599 return delay;
600 }
601
ScheduleWrites(const fuzzer_input::NetworkInput & network_input,std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> endpoint,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)602 void ScheduleWrites(
603 const fuzzer_input::NetworkInput& network_input,
604 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
605 endpoint,
606 grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
607 auto schedule = MakeSchedule(network_input);
608 auto ep = std::shared_ptr<EventEngine::Endpoint>(std::move(endpoint));
609 ReadForever(ep);
610 ScheduleWritesForReads(ep, event_engine, std::move(schedule));
611 }
612
613 } // namespace grpc_core
614