• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/end2end/fuzzers/connector_fuzzer.h"
16 
17 #include "src/core/lib/address_utils/parse_address.h"
18 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
19 #include "src/core/lib/event_engine/default_event_engine.h"
20 #include "src/core/lib/event_engine/tcp_socket_utils.h"
21 #include "src/core/lib/iomgr/executor.h"
22 #include "src/core/lib/iomgr/timer_manager.h"
23 #include "src/core/util/env.h"
24 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
25 #include "test/core/end2end/fuzzers/network_input.h"
26 #include "test/core/test_util/fuzz_config_vars.h"
27 #include "test/core/test_util/test_config.h"
28 
29 bool squelch = true;
30 bool leak_check = true;
31 
32 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
33 using ::grpc_event_engine::experimental::EventEngine;
34 using ::grpc_event_engine::experimental::FuzzingEventEngine;
35 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
36 using ::grpc_event_engine::experimental::MockEndpointController;
37 using ::grpc_event_engine::experimental::SetEventEngineFactory;
38 using ::grpc_event_engine::experimental::URIToResolvedAddress;
39 
40 namespace grpc_core {
41 namespace {
42 
43 class ConnectorFuzzer {
44  public:
ConnectorFuzzer(const fuzzer_input::Msg & msg,absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector> ()> make_security_connector,absl::FunctionRef<OrphanablePtr<SubchannelConnector> ()> make_connector)45   ConnectorFuzzer(
46       const fuzzer_input::Msg& msg,
47       absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
48           make_security_connector,
49       absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector)
50       : make_security_connector_(make_security_connector),
51         engine_([actions = msg.event_engine_actions()]() {
52           SetEventEngineFactory([actions]() -> std::unique_ptr<EventEngine> {
53             return std::make_unique<FuzzingEventEngine>(
54                 FuzzingEventEngine::Options(), actions);
55           });
56           return std::dynamic_pointer_cast<FuzzingEventEngine>(
57               GetDefaultEventEngine());
58         }()),
59         mock_endpoint_controller_(MockEndpointController::Create(engine_)),
60         connector_(make_connector()) {
61     CHECK(engine_);
62     for (const auto& input : msg.network_input()) {
63       network_inputs_.push(input);
64     }
65     grpc_timer_manager_set_start_threaded(false);
66     grpc_init();
67     ExecCtx exec_ctx;
68     Executor::SetThreadingAll(false);
69     listener_ =
70         engine_
71             ->CreateListener(
72                 [this](std::unique_ptr<EventEngine::Endpoint> endpoint,
__anon29e65f610402(std::unique_ptr<EventEngine::Endpoint> endpoint, MemoryAllocator) 73                        MemoryAllocator) {
74                   if (network_inputs_.empty()) return;
75                   ScheduleWrites(network_inputs_.front(), std::move(endpoint),
76                                  engine_.get());
77                   network_inputs_.pop();
78                 },
__anon29e65f610502(absl::Status) 79                 [](absl::Status) {}, ChannelArgsEndpointConfig(ChannelArgs{}),
80                 std::make_unique<MemoryQuota>("foo"))
81             .value();
82     if (msg.has_shutdown_connector() &&
83         msg.shutdown_connector().delay_ms() > 0) {
84       auto shutdown_connector = msg.shutdown_connector();
85       const auto delay = Duration::Milliseconds(shutdown_connector.delay_ms());
86       engine_->RunAfterExactly(delay, [this, shutdown_connector = std::move(
__anon29e65f610602() 87                                                  shutdown_connector)]() {
88         if (connector_ == nullptr) return;
89         connector_->Shutdown(absl::Status(
90             static_cast<absl::StatusCode>(shutdown_connector.shutdown_status()),
91             shutdown_connector.shutdown_message()));
92       });
93     }
94     // Abbreviated runtime for interpreting API actions, since we simply don't
95     // support many here.
96     uint64_t when_ms = 0;
97     for (const auto& action : msg.api_actions()) {
98       switch (action.type_case()) {
99         default:
100           break;
101         case api_fuzzer::Action::kSleepMs:
102           when_ms += action.sleep_ms();
103           break;
104         case api_fuzzer::Action::kResizeResourceQuota:
105           engine_->RunAfterExactly(
106               Duration::Milliseconds(when_ms),
__anon29e65f610702() 107               [this, new_size = action.resize_resource_quota()]() {
108                 ExecCtx exec_ctx;
109                 resource_quota_->memory_quota()->SetSize(new_size);
110               });
111           when_ms += 1;
112           break;
113       }
114     }
115   }
116 
~ConnectorFuzzer()117   ~ConnectorFuzzer() {
118     listener_.reset();
119     connector_.reset();
120     mock_endpoint_controller_.reset();
121     engine_->TickUntilIdle();
122     grpc_shutdown_blocking();
123     engine_->UnsetGlobalHooks();
124   }
125 
Run()126   void Run() {
127     grpc_resolved_address addr;
128     CHECK(grpc_parse_uri(URI::Parse("ipv4:127.0.0.1:1234").value(), &addr));
129     CHECK_OK(
130         listener_->Bind(URIToResolvedAddress("ipv4:127.0.0.1:1234").value()));
131     CHECK_OK(listener_->Start());
132     OrphanablePtr<grpc_endpoint> endpoint(
133         mock_endpoint_controller_->TakeCEndpoint());
134     SubchannelConnector::Result result;
135     bool done = false;
136     auto channel_args = ChannelArgs{}.SetObject<EventEngine>(engine_).SetObject(
137         resource_quota_);
138     auto security_connector = make_security_connector_();
139     if (security_connector != nullptr) {
140       channel_args = channel_args.SetObject(std::move(security_connector));
141     }
142     connector_->Connect(
143         SubchannelConnector::Args{&addr, nullptr,
144                                   Timestamp::Now() + Duration::Seconds(20),
145                                   channel_args},
146         &result, NewClosure([&done, &result](grpc_error_handle status) {
147           done = true;
148           if (status.ok()) result.transport->Orphan();
149         }));
150 
151     while (!done) {
152       engine_->Tick();
153       grpc_timer_manager_tick();
154     }
155   }
156 
157  private:
158   RefCountedPtr<ResourceQuota> resource_quota_ =
159       MakeRefCounted<ResourceQuota>("fuzzer");
160   absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
161       make_security_connector_;
162   std::shared_ptr<FuzzingEventEngine> engine_;
163   std::queue<fuzzer_input::NetworkInput> network_inputs_;
164   std::shared_ptr<MockEndpointController> mock_endpoint_controller_;
165   std::unique_ptr<EventEngine::Listener> listener_;
166   OrphanablePtr<SubchannelConnector> connector_;
167 };
168 
169 }  // namespace
170 
RunConnectorFuzzer(const fuzzer_input::Msg & msg,absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector> ()> make_security_connector,absl::FunctionRef<OrphanablePtr<SubchannelConnector> ()> make_connector)171 void RunConnectorFuzzer(
172     const fuzzer_input::Msg& msg,
173     absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
174         make_security_connector,
175     absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector) {
176   if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
177     grpc_disable_all_absl_logs();
178   }
179   static const int once = []() {
180     ForceEnableExperiment("event_engine_client", true);
181     ForceEnableExperiment("event_engine_listener", true);
182     return 42;
183   }();
184   CHECK_EQ(once, 42);  // avoid unused variable warning
185   ApplyFuzzConfigVars(msg.config_vars());
186   TestOnlyReloadExperimentsFromConfigVariables();
187   ConnectorFuzzer(msg, make_security_connector, make_connector).Run();
188 }
189 
190 }  // namespace grpc_core
191