1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/end2end/fuzzers/network_input.h"
16
17 #include <stddef.h>
18 #include <stdint.h>
19
20 #include <algorithm>
21 #include <chrono>
22 #include <string>
23 #include <utility>
24 #include <vector>
25
26 #include "absl/strings/string_view.h"
27 #include "absl/types/span.h"
28
29 #include <grpc/slice.h>
30
31 #include "src/core/ext/transport/chaotic_good/frame_header.h"
32 #include "src/core/ext/transport/chttp2/transport/frame.h"
33 #include "src/core/ext/transport/chttp2/transport/varint.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_args_preconditioning.h"
36 #include "src/core/lib/config/core_configuration.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/slice/slice.h"
42 #include "src/core/lib/slice/slice_buffer.h"
43 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
44 #include "test/core/util/mock_endpoint.h"
45
46 using grpc_event_engine::experimental::EventEngine;
47
48 namespace grpc_core {
49
50 namespace {
SliceFromH2Frame(Http2Frame frame)51 grpc_slice SliceFromH2Frame(Http2Frame frame) {
52 SliceBuffer buffer;
53 Serialize(absl::Span<Http2Frame>(&frame, 1), buffer);
54 return buffer.JoinIntoSlice().TakeCSlice();
55 }
56
SliceBufferFromBytes(const std::string & bytes)57 SliceBuffer SliceBufferFromBytes(const std::string& bytes) {
58 SliceBuffer buffer;
59 buffer.Append(Slice::FromCopiedString(bytes));
60 return buffer;
61 }
62
AppendLength(size_t length,std::vector<uint8_t> * bytes)63 void AppendLength(size_t length, std::vector<uint8_t>* bytes) {
64 VarintWriter<1> writer(length);
65 uint8_t buffer[8];
66 writer.Write(0, buffer);
67 bytes->insert(bytes->end(), buffer, buffer + writer.length());
68 }
69
SliceBufferFromSimpleHeaders(const fuzzer_input::SimpleHeaders & headers)70 SliceBuffer SliceBufferFromSimpleHeaders(
71 const fuzzer_input::SimpleHeaders& headers) {
72 std::vector<uint8_t> temp;
73 auto add_header = [&temp](absl::string_view key, absl::string_view value) {
74 temp.push_back(0);
75 AppendLength(key.length(), &temp);
76 temp.insert(temp.end(), key.begin(), key.end());
77 AppendLength(value.length(), &temp);
78 temp.insert(temp.end(), value.begin(), value.end());
79 };
80 if (headers.has_status()) {
81 add_header(":status", headers.status());
82 }
83 if (headers.has_scheme()) {
84 add_header(":scheme", headers.scheme());
85 }
86 if (headers.has_method()) {
87 add_header(":method", headers.method());
88 }
89 if (headers.has_authority()) {
90 add_header(":authority", headers.authority());
91 }
92 if (headers.has_path()) {
93 add_header(":path", headers.path());
94 }
95 for (const auto& header : headers.headers()) {
96 if (header.has_key() && header.has_value()) {
97 add_header(header.key(), header.value());
98 ;
99 }
100 if (header.has_raw_bytes()) {
101 for (auto c : header.raw_bytes()) {
102 temp.push_back(static_cast<uint8_t>(c));
103 }
104 }
105 }
106 if (headers.has_grpc_timeout()) {
107 add_header("grpc-timeout", headers.grpc_timeout());
108 }
109 if (headers.has_te()) {
110 add_header("te", headers.te());
111 }
112 if (headers.has_content_type()) {
113 add_header("content-type", headers.content_type());
114 }
115 if (headers.has_grpc_encoding()) {
116 add_header("grpc-encoding", headers.grpc_encoding());
117 }
118 if (headers.has_grpc_internal_encoding_request()) {
119 add_header("grpc-internal-encoding-request",
120 headers.grpc_internal_encoding_request());
121 }
122 if (headers.has_grpc_accept_encoding()) {
123 add_header("grpc-accept-encoding", headers.grpc_accept_encoding());
124 }
125 if (headers.has_user_agent()) {
126 add_header("user-agent", headers.user_agent());
127 }
128 if (headers.has_grpc_message()) {
129 add_header("grpc-message", headers.grpc_message());
130 }
131 if (headers.has_host()) {
132 add_header("host", headers.host());
133 }
134 if (headers.has_endpoint_load_metrics_bin()) {
135 add_header("endpoint-load-metrics-bin",
136 headers.endpoint_load_metrics_bin());
137 }
138 if (headers.has_grpc_server_stats_bin()) {
139 add_header("grpc-server-stats-bin", headers.grpc_server_stats_bin());
140 }
141 if (headers.has_grpc_trace_bin()) {
142 add_header("grpc-trace-bin", headers.grpc_trace_bin());
143 }
144 if (headers.has_grpc_tags_bin()) {
145 add_header("grpc-tags-bin", headers.grpc_tags_bin());
146 }
147 if (headers.has_x_envoy_peer_metadata()) {
148 add_header("x-envoy-peer-metadata", headers.x_envoy_peer_metadata());
149 }
150 if (headers.has_grpc_status()) {
151 add_header("grpc-status", headers.grpc_status());
152 }
153 if (headers.has_grpc_previous_rpc_attempts()) {
154 add_header("grpc-previous-rpc-attempts",
155 headers.grpc_previous_rpc_attempts());
156 }
157 if (headers.has_grpc_retry_pushback_ms()) {
158 add_header("grpc-retry-pushback-ms", headers.grpc_retry_pushback_ms());
159 }
160 if (headers.has_grpclb_client_stats()) {
161 add_header("grpclb_client_stats", headers.grpclb_client_stats());
162 }
163 if (headers.has_lb_token()) {
164 add_header("lb-token", headers.lb_token());
165 }
166 if (headers.has_lb_cost_bin()) {
167 add_header("lb-cost-bin", headers.lb_cost_bin());
168 }
169 if (headers.has_chaotic_good_connection_type()) {
170 add_header("chaotic-good-connection-type",
171 headers.chaotic_good_connection_type());
172 }
173 if (headers.has_chaotic_good_connection_id()) {
174 add_header("chaotic-good-connection-id",
175 headers.chaotic_good_connection_id());
176 }
177 if (headers.has_chaotic_good_alignment()) {
178 add_header("chaotic-good-alignment", headers.chaotic_good_alignment());
179 }
180 SliceBuffer buffer;
181 buffer.Append(Slice::FromCopiedBuffer(temp.data(), temp.size()));
182 return buffer;
183 }
184
185 template <typename T>
SliceBufferFromHeaderPayload(const T & payload)186 SliceBuffer SliceBufferFromHeaderPayload(const T& payload) {
187 switch (payload.payload_case()) {
188 case T::kRawBytes:
189 return SliceBufferFromBytes(payload.raw_bytes());
190 case T::kSimpleHeader:
191 return SliceBufferFromSimpleHeaders(payload.simple_header());
192 case T::PAYLOAD_NOT_SET:
193 break;
194 }
195 return SliceBuffer();
196 }
197
ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame & frame)198 SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
199 chaotic_good::FrameHeader h;
200 SliceBuffer suffix;
201 h.stream_id = frame.stream_id();
202 switch (frame.type()) {
203 case fuzzer_input::ChaoticGoodFrame::SETTINGS:
204 h.type = chaotic_good::FrameType::kSettings;
205 break;
206 case fuzzer_input::ChaoticGoodFrame::FRAGMENT:
207 h.type = chaotic_good::FrameType::kFragment;
208 break;
209 case fuzzer_input::ChaoticGoodFrame::CANCEL:
210 h.type = chaotic_good::FrameType::kCancel;
211 break;
212 default:
213 break;
214 }
215 switch (frame.headers_case()) {
216 case fuzzer_input::ChaoticGoodFrame::kHeadersNone:
217 case fuzzer_input::ChaoticGoodFrame::HEADERS_NOT_SET:
218 break;
219 case fuzzer_input::ChaoticGoodFrame::kHeadersRawBytes:
220 if (frame.headers_raw_bytes().empty()) break;
221 h.header_length = frame.headers_raw_bytes().size();
222 h.flags.Set(0, true);
223 suffix.Append(Slice::FromCopiedString(frame.headers_raw_bytes()));
224 break;
225 case fuzzer_input::ChaoticGoodFrame::kHeadersSimpleHeader: {
226 SliceBuffer append =
227 SliceBufferFromSimpleHeaders(frame.headers_simple_header());
228 if (append.Length() == 0) break;
229 h.header_length = append.Length();
230 h.flags.Set(0, true);
231 suffix.Append(append.JoinIntoSlice());
232 } break;
233 }
234 switch (frame.data_case()) {
235 case fuzzer_input::ChaoticGoodFrame::kDataNone:
236 case fuzzer_input::ChaoticGoodFrame::DATA_NOT_SET:
237 break;
238 case fuzzer_input::ChaoticGoodFrame::kDataSized:
239 h.flags.Set(1, true);
240 h.message_length = frame.data_sized().length();
241 h.message_padding = frame.data_sized().padding();
242 break;
243 }
244 switch (frame.trailers_case()) {
245 case fuzzer_input::ChaoticGoodFrame::kTrailersNone:
246 case fuzzer_input::ChaoticGoodFrame::TRAILERS_NOT_SET:
247 break;
248 case fuzzer_input::ChaoticGoodFrame::kTrailersRawBytes:
249 h.trailer_length = frame.trailers_raw_bytes().size();
250 h.flags.Set(2, true);
251 suffix.Append(Slice::FromCopiedString(frame.trailers_raw_bytes()));
252 break;
253 case fuzzer_input::ChaoticGoodFrame::kTrailersSimpleHeader: {
254 SliceBuffer append =
255 SliceBufferFromSimpleHeaders(frame.trailers_simple_header());
256 h.trailer_length = append.Length();
257 h.flags.Set(2, true);
258 suffix.Append(append.JoinIntoSlice());
259 } break;
260 }
261 uint8_t bytes[24];
262 h.Serialize(bytes);
263 SliceBuffer out;
264 out.Append(Slice::FromCopiedBuffer(bytes, 24));
265 out.Append(suffix);
266 return out;
267 }
268
SliceFromSegment(const fuzzer_input::InputSegment & segment)269 grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
270 switch (segment.payload_case()) {
271 case fuzzer_input::InputSegment::kRawBytes:
272 return grpc_slice_from_copied_buffer(segment.raw_bytes().data(),
273 segment.raw_bytes().size());
274 case fuzzer_input::InputSegment::kData:
275 return SliceFromH2Frame(Http2DataFrame{
276 segment.data().stream_id(), segment.data().end_of_stream(),
277 SliceBufferFromBytes(segment.data().payload())});
278 case fuzzer_input::InputSegment::kHeader:
279 return SliceFromH2Frame(Http2HeaderFrame{
280 segment.header().stream_id(),
281 segment.header().end_headers(),
282 segment.header().end_stream(),
283 SliceBufferFromHeaderPayload(segment.header()),
284 });
285 case fuzzer_input::InputSegment::kContinuation:
286 return SliceFromH2Frame(Http2ContinuationFrame{
287 segment.continuation().stream_id(),
288 segment.continuation().end_headers(),
289 SliceBufferFromHeaderPayload(segment.header()),
290 });
291 case fuzzer_input::InputSegment::kRstStream:
292 return SliceFromH2Frame(Http2RstStreamFrame{
293 segment.rst_stream().stream_id(),
294 segment.rst_stream().error_code(),
295 });
296 case fuzzer_input::InputSegment::kSettings: {
297 std::vector<Http2SettingsFrame::Setting> settings;
298 for (const auto& setting : segment.settings().settings()) {
299 settings.push_back(Http2SettingsFrame::Setting{
300 static_cast<uint16_t>(setting.id()),
301 setting.value(),
302 });
303 }
304 return SliceFromH2Frame(Http2SettingsFrame{
305 segment.settings().ack(),
306 std::move(settings),
307 });
308 }
309 case fuzzer_input::InputSegment::kPing:
310 return SliceFromH2Frame(Http2PingFrame{
311 segment.ping().ack(),
312 segment.ping().opaque(),
313 });
314 case fuzzer_input::InputSegment::kGoaway:
315 return SliceFromH2Frame(Http2GoawayFrame{
316 segment.goaway().last_stream_id(), segment.goaway().error_code(),
317 Slice::FromCopiedString(segment.goaway().debug_data())});
318 case fuzzer_input::InputSegment::kWindowUpdate:
319 return SliceFromH2Frame(Http2WindowUpdateFrame{
320 segment.window_update().stream_id(),
321 segment.window_update().increment(),
322 });
323 case fuzzer_input::InputSegment::kClientPrefix:
324 return grpc_slice_from_static_string("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
325 case fuzzer_input::InputSegment::kRepeatedZeros: {
326 std::vector<char> zeros;
327 zeros.resize(std::min<size_t>(segment.repeated_zeros(), 128 * 1024), 0);
328 return grpc_slice_from_copied_buffer(zeros.data(), zeros.size());
329 }
330 case fuzzer_input::InputSegment::kChaoticGood: {
331 return ChaoticGoodFrame(segment.chaotic_good())
332 .JoinIntoSlice()
333 .TakeCSlice();
334 } break;
335 case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
336 break;
337 }
338 return grpc_empty_slice();
339 }
340
341 struct QueuedRead {
QueuedReadgrpc_core::__anon46917a860111::QueuedRead342 QueuedRead(int delay_ms, SliceBuffer slices)
343 : delay_ms(delay_ms), slices(std::move(slices)) {}
344 int delay_ms;
345 SliceBuffer slices;
346 };
347
MakeSchedule(const fuzzer_input::NetworkInput & network_input)348 std::vector<QueuedRead> MakeSchedule(
349 const fuzzer_input::NetworkInput& network_input) {
350 std::vector<QueuedRead> schedule;
351 switch (network_input.value_case()) {
352 case fuzzer_input::NetworkInput::kSingleReadBytes: {
353 schedule.emplace_back(0, SliceBuffer(Slice::FromCopiedBuffer(
354 network_input.single_read_bytes().data(),
355 network_input.single_read_bytes().size())));
356 } break;
357 case fuzzer_input::NetworkInput::kInputSegments: {
358 int delay_ms = 0;
359 SliceBuffer building;
360 for (const auto& segment : network_input.input_segments().segments()) {
361 const int segment_delay = Clamp(segment.delay_ms(), 0, 1000);
362 if (segment_delay != 0) {
363 delay_ms += segment_delay;
364 if (building.Length() != 0) {
365 schedule.emplace_back(delay_ms, std::move(building));
366 }
367 building.Clear();
368 }
369 building.Append(Slice(SliceFromSegment(segment)));
370 }
371 if (building.Length() != 0) {
372 ++delay_ms;
373 schedule.emplace_back(delay_ms, std::move(building));
374 }
375 } break;
376 case fuzzer_input::NetworkInput::VALUE_NOT_SET:
377 break;
378 }
379 return schedule;
380 }
381
382 } // namespace
383
ScheduleReads(const fuzzer_input::NetworkInput & network_input,grpc_endpoint * mock_endpoint,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)384 Duration ScheduleReads(
385 const fuzzer_input::NetworkInput& network_input,
386 grpc_endpoint* mock_endpoint,
387 grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
388 int delay = 0;
389 for (const auto& q : MakeSchedule(network_input)) {
390 event_engine->RunAfterExactly(
391 std::chrono::milliseconds(q.delay_ms),
392 [mock_endpoint, slices = q.slices.JoinIntoSlice()]() mutable {
393 ExecCtx exec_ctx;
394 grpc_mock_endpoint_put_read(mock_endpoint, slices.TakeCSlice());
395 });
396 delay = std::max(delay, q.delay_ms);
397 }
398 event_engine->RunAfterExactly(
399 std::chrono::milliseconds(delay + 1), [mock_endpoint] {
400 ExecCtx exec_ctx;
401 grpc_mock_endpoint_finish_put_reads(mock_endpoint);
402 });
403 return Duration::Milliseconds(delay + 2);
404 }
405
406 namespace {
407
ReadForever(std::shared_ptr<EventEngine::Endpoint> ep)408 void ReadForever(std::shared_ptr<EventEngine::Endpoint> ep) {
409 bool finished;
410 do {
411 auto buffer =
412 std::make_unique<grpc_event_engine::experimental::SliceBuffer>();
413 auto buffer_ptr = buffer.get();
414 finished = ep->Read(
415 [ep, buffer = std::move(buffer)](absl::Status status) mutable {
416 ExecCtx exec_ctx;
417 if (!status.ok()) return;
418 ReadForever(std::move(ep));
419 },
420 buffer_ptr, nullptr);
421 } while (finished);
422 }
423
ScheduleWritesForReads(std::shared_ptr<EventEngine::Endpoint> ep,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,std::vector<QueuedRead> schedule)424 void ScheduleWritesForReads(
425 std::shared_ptr<EventEngine::Endpoint> ep,
426 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
427 std::vector<QueuedRead> schedule) {
428 class Scheduler {
429 public:
430 Scheduler(std::shared_ptr<EventEngine::Endpoint> ep,
431 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
432 std::vector<QueuedRead> schedule)
433 : ep_(std::move(ep)),
434 event_engine_(event_engine),
435 schedule_(std::move(schedule)),
436 it_(schedule_.begin()) {
437 ScheduleNext();
438 }
439
440 private:
441 void ScheduleNext() {
442 if (it_ == schedule_.end()) {
443 delete this;
444 return;
445 }
446 event_engine_->RunAfterExactly(
447 Duration::Milliseconds(it_->delay_ms - delay_consumed_),
448 [this]() mutable {
449 ExecCtx exec_ctx;
450 delay_consumed_ = it_->delay_ms;
451 writing_.Clear();
452 writing_.Append(
453 grpc_event_engine::experimental::internal::SliceCast<
454 grpc_event_engine::experimental::Slice>(
455 it_->slices.JoinIntoSlice()));
456 if (ep_->Write(
457 [this](absl::Status status) {
458 ExecCtx exec_ctx;
459 FinishWrite(std::move(status));
460 },
461 &writing_, nullptr)) {
462 FinishWrite(absl::OkStatus());
463 }
464 });
465 }
466
467 void FinishWrite(absl::Status status) {
468 if (!status.ok()) {
469 it_ = schedule_.end();
470 } else {
471 ++it_;
472 }
473 ScheduleNext();
474 }
475
476 std::shared_ptr<EventEngine::Endpoint> ep_;
477 grpc_event_engine::experimental::FuzzingEventEngine* event_engine_;
478 std::vector<QueuedRead> schedule_;
479 std::vector<QueuedRead>::iterator it_;
480 grpc_event_engine::experimental::SliceBuffer writing_;
481 int delay_consumed_ = 0;
482 };
483 new Scheduler(std::move(ep), event_engine, std::move(schedule));
484 }
485
486 } // namespace
487
ScheduleConnection(const fuzzer_input::NetworkInput & network_input,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,testing::FuzzingEnvironment environment,int port)488 Duration ScheduleConnection(
489 const fuzzer_input::NetworkInput& network_input,
490 grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
491 testing::FuzzingEnvironment environment, int port) {
492 ChannelArgs channel_args =
493 CoreConfiguration::Get()
494 .channel_args_preconditioning()
495 .PreconditionChannelArgs(
496 CreateChannelArgsFromFuzzingConfiguration(
497 network_input.endpoint_config(), environment)
498 .ToC()
499 .get());
500 auto schedule = MakeSchedule(network_input);
501 Duration delay = Duration::Zero();
502 for (const auto& q : schedule) {
503 delay = std::max(
504 delay,
505 Duration::Milliseconds(q.delay_ms) +
506 Duration::NanosecondsRoundUp(
507 (q.slices.Length() * event_engine->max_delay_write()).count()));
508 }
509 delay += Duration::Milliseconds(network_input.connect_delay_ms() +
510 network_input.connect_timeout_ms());
511 event_engine->RunAfterExactly(
512 Duration::Milliseconds(network_input.connect_delay_ms()),
513 [event_engine, channel_args,
514 connect_timeout_ms = network_input.connect_timeout_ms(),
515 schedule = std::move(schedule), port]() mutable {
516 ExecCtx exec_ctx;
517 event_engine->Connect(
518 [event_engine, schedule = std::move(schedule)](
519 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
520 endpoint) mutable {
521 ExecCtx exec_ctx;
522 if (!endpoint.ok()) {
523 gpr_log(GPR_ERROR, "Failed to connect: %s",
524 endpoint.status().ToString().c_str());
525 return;
526 }
527 std::shared_ptr<EventEngine::Endpoint> ep =
528 std::move(endpoint.value());
529 ReadForever(ep);
530 ScheduleWritesForReads(std::move(ep), event_engine,
531 std::move(schedule));
532 },
533 grpc_event_engine::experimental::ResolvedAddressMakeWild4(port),
534 grpc_event_engine::experimental::ChannelArgsEndpointConfig(
535 channel_args),
536 channel_args.GetObject<ResourceQuota>()
537 ->memory_quota()
538 ->CreateMemoryAllocator("fuzzer"),
539 Duration::Milliseconds(connect_timeout_ms));
540 });
541 return delay;
542 }
543
544 } // namespace grpc_core
545