• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/end2end/fuzzers/network_input.h"
16 
17 #include <stddef.h>
18 #include <stdint.h>
19 
20 #include <algorithm>
21 #include <chrono>
22 #include <string>
23 #include <utility>
24 #include <vector>
25 
26 #include "absl/strings/string_view.h"
27 #include "absl/types/span.h"
28 
29 #include <grpc/slice.h>
30 
31 #include "src/core/ext/transport/chaotic_good/frame_header.h"
32 #include "src/core/ext/transport/chttp2/transport/frame.h"
33 #include "src/core/ext/transport/chttp2/transport/varint.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_args_preconditioning.h"
36 #include "src/core/lib/config/core_configuration.h"
37 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
38 #include "src/core/lib/event_engine/tcp_socket_utils.h"
39 #include "src/core/lib/gpr/useful.h"
40 #include "src/core/lib/iomgr/exec_ctx.h"
41 #include "src/core/lib/slice/slice.h"
42 #include "src/core/lib/slice/slice_buffer.h"
43 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
44 #include "test/core/util/mock_endpoint.h"
45 
46 using grpc_event_engine::experimental::EventEngine;
47 
48 namespace grpc_core {
49 
50 namespace {
SliceFromH2Frame(Http2Frame frame)51 grpc_slice SliceFromH2Frame(Http2Frame frame) {
52   SliceBuffer buffer;
53   Serialize(absl::Span<Http2Frame>(&frame, 1), buffer);
54   return buffer.JoinIntoSlice().TakeCSlice();
55 }
56 
SliceBufferFromBytes(const std::string & bytes)57 SliceBuffer SliceBufferFromBytes(const std::string& bytes) {
58   SliceBuffer buffer;
59   buffer.Append(Slice::FromCopiedString(bytes));
60   return buffer;
61 }
62 
AppendLength(size_t length,std::vector<uint8_t> * bytes)63 void AppendLength(size_t length, std::vector<uint8_t>* bytes) {
64   VarintWriter<1> writer(length);
65   uint8_t buffer[8];
66   writer.Write(0, buffer);
67   bytes->insert(bytes->end(), buffer, buffer + writer.length());
68 }
69 
SliceBufferFromSimpleHeaders(const fuzzer_input::SimpleHeaders & headers)70 SliceBuffer SliceBufferFromSimpleHeaders(
71     const fuzzer_input::SimpleHeaders& headers) {
72   std::vector<uint8_t> temp;
73   auto add_header = [&temp](absl::string_view key, absl::string_view value) {
74     temp.push_back(0);
75     AppendLength(key.length(), &temp);
76     temp.insert(temp.end(), key.begin(), key.end());
77     AppendLength(value.length(), &temp);
78     temp.insert(temp.end(), value.begin(), value.end());
79   };
80   if (headers.has_status()) {
81     add_header(":status", headers.status());
82   }
83   if (headers.has_scheme()) {
84     add_header(":scheme", headers.scheme());
85   }
86   if (headers.has_method()) {
87     add_header(":method", headers.method());
88   }
89   if (headers.has_authority()) {
90     add_header(":authority", headers.authority());
91   }
92   if (headers.has_path()) {
93     add_header(":path", headers.path());
94   }
95   for (const auto& header : headers.headers()) {
96     if (header.has_key() && header.has_value()) {
97       add_header(header.key(), header.value());
98       ;
99     }
100     if (header.has_raw_bytes()) {
101       for (auto c : header.raw_bytes()) {
102         temp.push_back(static_cast<uint8_t>(c));
103       }
104     }
105   }
106   if (headers.has_grpc_timeout()) {
107     add_header("grpc-timeout", headers.grpc_timeout());
108   }
109   if (headers.has_te()) {
110     add_header("te", headers.te());
111   }
112   if (headers.has_content_type()) {
113     add_header("content-type", headers.content_type());
114   }
115   if (headers.has_grpc_encoding()) {
116     add_header("grpc-encoding", headers.grpc_encoding());
117   }
118   if (headers.has_grpc_internal_encoding_request()) {
119     add_header("grpc-internal-encoding-request",
120                headers.grpc_internal_encoding_request());
121   }
122   if (headers.has_grpc_accept_encoding()) {
123     add_header("grpc-accept-encoding", headers.grpc_accept_encoding());
124   }
125   if (headers.has_user_agent()) {
126     add_header("user-agent", headers.user_agent());
127   }
128   if (headers.has_grpc_message()) {
129     add_header("grpc-message", headers.grpc_message());
130   }
131   if (headers.has_host()) {
132     add_header("host", headers.host());
133   }
134   if (headers.has_endpoint_load_metrics_bin()) {
135     add_header("endpoint-load-metrics-bin",
136                headers.endpoint_load_metrics_bin());
137   }
138   if (headers.has_grpc_server_stats_bin()) {
139     add_header("grpc-server-stats-bin", headers.grpc_server_stats_bin());
140   }
141   if (headers.has_grpc_trace_bin()) {
142     add_header("grpc-trace-bin", headers.grpc_trace_bin());
143   }
144   if (headers.has_grpc_tags_bin()) {
145     add_header("grpc-tags-bin", headers.grpc_tags_bin());
146   }
147   if (headers.has_x_envoy_peer_metadata()) {
148     add_header("x-envoy-peer-metadata", headers.x_envoy_peer_metadata());
149   }
150   if (headers.has_grpc_status()) {
151     add_header("grpc-status", headers.grpc_status());
152   }
153   if (headers.has_grpc_previous_rpc_attempts()) {
154     add_header("grpc-previous-rpc-attempts",
155                headers.grpc_previous_rpc_attempts());
156   }
157   if (headers.has_grpc_retry_pushback_ms()) {
158     add_header("grpc-retry-pushback-ms", headers.grpc_retry_pushback_ms());
159   }
160   if (headers.has_grpclb_client_stats()) {
161     add_header("grpclb_client_stats", headers.grpclb_client_stats());
162   }
163   if (headers.has_lb_token()) {
164     add_header("lb-token", headers.lb_token());
165   }
166   if (headers.has_lb_cost_bin()) {
167     add_header("lb-cost-bin", headers.lb_cost_bin());
168   }
169   if (headers.has_chaotic_good_connection_type()) {
170     add_header("chaotic-good-connection-type",
171                headers.chaotic_good_connection_type());
172   }
173   if (headers.has_chaotic_good_connection_id()) {
174     add_header("chaotic-good-connection-id",
175                headers.chaotic_good_connection_id());
176   }
177   if (headers.has_chaotic_good_alignment()) {
178     add_header("chaotic-good-alignment", headers.chaotic_good_alignment());
179   }
180   SliceBuffer buffer;
181   buffer.Append(Slice::FromCopiedBuffer(temp.data(), temp.size()));
182   return buffer;
183 }
184 
185 template <typename T>
SliceBufferFromHeaderPayload(const T & payload)186 SliceBuffer SliceBufferFromHeaderPayload(const T& payload) {
187   switch (payload.payload_case()) {
188     case T::kRawBytes:
189       return SliceBufferFromBytes(payload.raw_bytes());
190     case T::kSimpleHeader:
191       return SliceBufferFromSimpleHeaders(payload.simple_header());
192     case T::PAYLOAD_NOT_SET:
193       break;
194   }
195   return SliceBuffer();
196 }
197 
ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame & frame)198 SliceBuffer ChaoticGoodFrame(const fuzzer_input::ChaoticGoodFrame& frame) {
199   chaotic_good::FrameHeader h;
200   SliceBuffer suffix;
201   h.stream_id = frame.stream_id();
202   switch (frame.type()) {
203     case fuzzer_input::ChaoticGoodFrame::SETTINGS:
204       h.type = chaotic_good::FrameType::kSettings;
205       break;
206     case fuzzer_input::ChaoticGoodFrame::FRAGMENT:
207       h.type = chaotic_good::FrameType::kFragment;
208       break;
209     case fuzzer_input::ChaoticGoodFrame::CANCEL:
210       h.type = chaotic_good::FrameType::kCancel;
211       break;
212     default:
213       break;
214   }
215   switch (frame.headers_case()) {
216     case fuzzer_input::ChaoticGoodFrame::kHeadersNone:
217     case fuzzer_input::ChaoticGoodFrame::HEADERS_NOT_SET:
218       break;
219     case fuzzer_input::ChaoticGoodFrame::kHeadersRawBytes:
220       if (frame.headers_raw_bytes().empty()) break;
221       h.header_length = frame.headers_raw_bytes().size();
222       h.flags.Set(0, true);
223       suffix.Append(Slice::FromCopiedString(frame.headers_raw_bytes()));
224       break;
225     case fuzzer_input::ChaoticGoodFrame::kHeadersSimpleHeader: {
226       SliceBuffer append =
227           SliceBufferFromSimpleHeaders(frame.headers_simple_header());
228       if (append.Length() == 0) break;
229       h.header_length = append.Length();
230       h.flags.Set(0, true);
231       suffix.Append(append.JoinIntoSlice());
232     } break;
233   }
234   switch (frame.data_case()) {
235     case fuzzer_input::ChaoticGoodFrame::kDataNone:
236     case fuzzer_input::ChaoticGoodFrame::DATA_NOT_SET:
237       break;
238     case fuzzer_input::ChaoticGoodFrame::kDataSized:
239       h.flags.Set(1, true);
240       h.message_length = frame.data_sized().length();
241       h.message_padding = frame.data_sized().padding();
242       break;
243   }
244   switch (frame.trailers_case()) {
245     case fuzzer_input::ChaoticGoodFrame::kTrailersNone:
246     case fuzzer_input::ChaoticGoodFrame::TRAILERS_NOT_SET:
247       break;
248     case fuzzer_input::ChaoticGoodFrame::kTrailersRawBytes:
249       h.trailer_length = frame.trailers_raw_bytes().size();
250       h.flags.Set(2, true);
251       suffix.Append(Slice::FromCopiedString(frame.trailers_raw_bytes()));
252       break;
253     case fuzzer_input::ChaoticGoodFrame::kTrailersSimpleHeader: {
254       SliceBuffer append =
255           SliceBufferFromSimpleHeaders(frame.trailers_simple_header());
256       h.trailer_length = append.Length();
257       h.flags.Set(2, true);
258       suffix.Append(append.JoinIntoSlice());
259     } break;
260   }
261   uint8_t bytes[24];
262   h.Serialize(bytes);
263   SliceBuffer out;
264   out.Append(Slice::FromCopiedBuffer(bytes, 24));
265   out.Append(suffix);
266   return out;
267 }
268 
SliceFromSegment(const fuzzer_input::InputSegment & segment)269 grpc_slice SliceFromSegment(const fuzzer_input::InputSegment& segment) {
270   switch (segment.payload_case()) {
271     case fuzzer_input::InputSegment::kRawBytes:
272       return grpc_slice_from_copied_buffer(segment.raw_bytes().data(),
273                                            segment.raw_bytes().size());
274     case fuzzer_input::InputSegment::kData:
275       return SliceFromH2Frame(Http2DataFrame{
276           segment.data().stream_id(), segment.data().end_of_stream(),
277           SliceBufferFromBytes(segment.data().payload())});
278     case fuzzer_input::InputSegment::kHeader:
279       return SliceFromH2Frame(Http2HeaderFrame{
280           segment.header().stream_id(),
281           segment.header().end_headers(),
282           segment.header().end_stream(),
283           SliceBufferFromHeaderPayload(segment.header()),
284       });
285     case fuzzer_input::InputSegment::kContinuation:
286       return SliceFromH2Frame(Http2ContinuationFrame{
287           segment.continuation().stream_id(),
288           segment.continuation().end_headers(),
289           SliceBufferFromHeaderPayload(segment.header()),
290       });
291     case fuzzer_input::InputSegment::kRstStream:
292       return SliceFromH2Frame(Http2RstStreamFrame{
293           segment.rst_stream().stream_id(),
294           segment.rst_stream().error_code(),
295       });
296     case fuzzer_input::InputSegment::kSettings: {
297       std::vector<Http2SettingsFrame::Setting> settings;
298       for (const auto& setting : segment.settings().settings()) {
299         settings.push_back(Http2SettingsFrame::Setting{
300             static_cast<uint16_t>(setting.id()),
301             setting.value(),
302         });
303       }
304       return SliceFromH2Frame(Http2SettingsFrame{
305           segment.settings().ack(),
306           std::move(settings),
307       });
308     }
309     case fuzzer_input::InputSegment::kPing:
310       return SliceFromH2Frame(Http2PingFrame{
311           segment.ping().ack(),
312           segment.ping().opaque(),
313       });
314     case fuzzer_input::InputSegment::kGoaway:
315       return SliceFromH2Frame(Http2GoawayFrame{
316           segment.goaway().last_stream_id(), segment.goaway().error_code(),
317           Slice::FromCopiedString(segment.goaway().debug_data())});
318     case fuzzer_input::InputSegment::kWindowUpdate:
319       return SliceFromH2Frame(Http2WindowUpdateFrame{
320           segment.window_update().stream_id(),
321           segment.window_update().increment(),
322       });
323     case fuzzer_input::InputSegment::kClientPrefix:
324       return grpc_slice_from_static_string("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
325     case fuzzer_input::InputSegment::kRepeatedZeros: {
326       std::vector<char> zeros;
327       zeros.resize(std::min<size_t>(segment.repeated_zeros(), 128 * 1024), 0);
328       return grpc_slice_from_copied_buffer(zeros.data(), zeros.size());
329     }
330     case fuzzer_input::InputSegment::kChaoticGood: {
331       return ChaoticGoodFrame(segment.chaotic_good())
332           .JoinIntoSlice()
333           .TakeCSlice();
334     } break;
335     case fuzzer_input::InputSegment::PAYLOAD_NOT_SET:
336       break;
337   }
338   return grpc_empty_slice();
339 }
340 
341 struct QueuedRead {
QueuedReadgrpc_core::__anon46917a860111::QueuedRead342   QueuedRead(int delay_ms, SliceBuffer slices)
343       : delay_ms(delay_ms), slices(std::move(slices)) {}
344   int delay_ms;
345   SliceBuffer slices;
346 };
347 
MakeSchedule(const fuzzer_input::NetworkInput & network_input)348 std::vector<QueuedRead> MakeSchedule(
349     const fuzzer_input::NetworkInput& network_input) {
350   std::vector<QueuedRead> schedule;
351   switch (network_input.value_case()) {
352     case fuzzer_input::NetworkInput::kSingleReadBytes: {
353       schedule.emplace_back(0, SliceBuffer(Slice::FromCopiedBuffer(
354                                    network_input.single_read_bytes().data(),
355                                    network_input.single_read_bytes().size())));
356     } break;
357     case fuzzer_input::NetworkInput::kInputSegments: {
358       int delay_ms = 0;
359       SliceBuffer building;
360       for (const auto& segment : network_input.input_segments().segments()) {
361         const int segment_delay = Clamp(segment.delay_ms(), 0, 1000);
362         if (segment_delay != 0) {
363           delay_ms += segment_delay;
364           if (building.Length() != 0) {
365             schedule.emplace_back(delay_ms, std::move(building));
366           }
367           building.Clear();
368         }
369         building.Append(Slice(SliceFromSegment(segment)));
370       }
371       if (building.Length() != 0) {
372         ++delay_ms;
373         schedule.emplace_back(delay_ms, std::move(building));
374       }
375     } break;
376     case fuzzer_input::NetworkInput::VALUE_NOT_SET:
377       break;
378   }
379   return schedule;
380 }
381 
382 }  // namespace
383 
ScheduleReads(const fuzzer_input::NetworkInput & network_input,grpc_endpoint * mock_endpoint,grpc_event_engine::experimental::FuzzingEventEngine * event_engine)384 Duration ScheduleReads(
385     const fuzzer_input::NetworkInput& network_input,
386     grpc_endpoint* mock_endpoint,
387     grpc_event_engine::experimental::FuzzingEventEngine* event_engine) {
388   int delay = 0;
389   for (const auto& q : MakeSchedule(network_input)) {
390     event_engine->RunAfterExactly(
391         std::chrono::milliseconds(q.delay_ms),
392         [mock_endpoint, slices = q.slices.JoinIntoSlice()]() mutable {
393           ExecCtx exec_ctx;
394           grpc_mock_endpoint_put_read(mock_endpoint, slices.TakeCSlice());
395         });
396     delay = std::max(delay, q.delay_ms);
397   }
398   event_engine->RunAfterExactly(
399       std::chrono::milliseconds(delay + 1), [mock_endpoint] {
400         ExecCtx exec_ctx;
401         grpc_mock_endpoint_finish_put_reads(mock_endpoint);
402       });
403   return Duration::Milliseconds(delay + 2);
404 }
405 
406 namespace {
407 
ReadForever(std::shared_ptr<EventEngine::Endpoint> ep)408 void ReadForever(std::shared_ptr<EventEngine::Endpoint> ep) {
409   bool finished;
410   do {
411     auto buffer =
412         std::make_unique<grpc_event_engine::experimental::SliceBuffer>();
413     auto buffer_ptr = buffer.get();
414     finished = ep->Read(
415         [ep, buffer = std::move(buffer)](absl::Status status) mutable {
416           ExecCtx exec_ctx;
417           if (!status.ok()) return;
418           ReadForever(std::move(ep));
419         },
420         buffer_ptr, nullptr);
421   } while (finished);
422 }
423 
ScheduleWritesForReads(std::shared_ptr<EventEngine::Endpoint> ep,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,std::vector<QueuedRead> schedule)424 void ScheduleWritesForReads(
425     std::shared_ptr<EventEngine::Endpoint> ep,
426     grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
427     std::vector<QueuedRead> schedule) {
428   class Scheduler {
429    public:
430     Scheduler(std::shared_ptr<EventEngine::Endpoint> ep,
431               grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
432               std::vector<QueuedRead> schedule)
433         : ep_(std::move(ep)),
434           event_engine_(event_engine),
435           schedule_(std::move(schedule)),
436           it_(schedule_.begin()) {
437       ScheduleNext();
438     }
439 
440    private:
441     void ScheduleNext() {
442       if (it_ == schedule_.end()) {
443         delete this;
444         return;
445       }
446       event_engine_->RunAfterExactly(
447           Duration::Milliseconds(it_->delay_ms - delay_consumed_),
448           [this]() mutable {
449             ExecCtx exec_ctx;
450             delay_consumed_ = it_->delay_ms;
451             writing_.Clear();
452             writing_.Append(
453                 grpc_event_engine::experimental::internal::SliceCast<
454                     grpc_event_engine::experimental::Slice>(
455                     it_->slices.JoinIntoSlice()));
456             if (ep_->Write(
457                     [this](absl::Status status) {
458                       ExecCtx exec_ctx;
459                       FinishWrite(std::move(status));
460                     },
461                     &writing_, nullptr)) {
462               FinishWrite(absl::OkStatus());
463             }
464           });
465     }
466 
467     void FinishWrite(absl::Status status) {
468       if (!status.ok()) {
469         it_ = schedule_.end();
470       } else {
471         ++it_;
472       }
473       ScheduleNext();
474     }
475 
476     std::shared_ptr<EventEngine::Endpoint> ep_;
477     grpc_event_engine::experimental::FuzzingEventEngine* event_engine_;
478     std::vector<QueuedRead> schedule_;
479     std::vector<QueuedRead>::iterator it_;
480     grpc_event_engine::experimental::SliceBuffer writing_;
481     int delay_consumed_ = 0;
482   };
483   new Scheduler(std::move(ep), event_engine, std::move(schedule));
484 }
485 
486 }  // namespace
487 
ScheduleConnection(const fuzzer_input::NetworkInput & network_input,grpc_event_engine::experimental::FuzzingEventEngine * event_engine,testing::FuzzingEnvironment environment,int port)488 Duration ScheduleConnection(
489     const fuzzer_input::NetworkInput& network_input,
490     grpc_event_engine::experimental::FuzzingEventEngine* event_engine,
491     testing::FuzzingEnvironment environment, int port) {
492   ChannelArgs channel_args =
493       CoreConfiguration::Get()
494           .channel_args_preconditioning()
495           .PreconditionChannelArgs(
496               CreateChannelArgsFromFuzzingConfiguration(
497                   network_input.endpoint_config(), environment)
498                   .ToC()
499                   .get());
500   auto schedule = MakeSchedule(network_input);
501   Duration delay = Duration::Zero();
502   for (const auto& q : schedule) {
503     delay = std::max(
504         delay,
505         Duration::Milliseconds(q.delay_ms) +
506             Duration::NanosecondsRoundUp(
507                 (q.slices.Length() * event_engine->max_delay_write()).count()));
508   }
509   delay += Duration::Milliseconds(network_input.connect_delay_ms() +
510                                   network_input.connect_timeout_ms());
511   event_engine->RunAfterExactly(
512       Duration::Milliseconds(network_input.connect_delay_ms()),
513       [event_engine, channel_args,
514        connect_timeout_ms = network_input.connect_timeout_ms(),
515        schedule = std::move(schedule), port]() mutable {
516         ExecCtx exec_ctx;
517         event_engine->Connect(
518             [event_engine, schedule = std::move(schedule)](
519                 absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>>
520                     endpoint) mutable {
521               ExecCtx exec_ctx;
522               if (!endpoint.ok()) {
523                 gpr_log(GPR_ERROR, "Failed to connect: %s",
524                         endpoint.status().ToString().c_str());
525                 return;
526               }
527               std::shared_ptr<EventEngine::Endpoint> ep =
528                   std::move(endpoint.value());
529               ReadForever(ep);
530               ScheduleWritesForReads(std::move(ep), event_engine,
531                                      std::move(schedule));
532             },
533             grpc_event_engine::experimental::ResolvedAddressMakeWild4(port),
534             grpc_event_engine::experimental::ChannelArgsEndpointConfig(
535                 channel_args),
536             channel_args.GetObject<ResourceQuota>()
537                 ->memory_quota()
538                 ->CreateMemoryAllocator("fuzzer"),
539             Duration::Milliseconds(connect_timeout_ms));
540       });
541   return delay;
542 }
543 
544 }  // namespace grpc_core
545