1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/end2end/fuzzers/connector_fuzzer.h"
16
17 #include "src/core/lib/address_utils/parse_address.h"
18 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
19 #include "src/core/lib/event_engine/default_event_engine.h"
20 #include "src/core/lib/event_engine/tcp_socket_utils.h"
21 #include "src/core/lib/iomgr/executor.h"
22 #include "src/core/lib/iomgr/timer_manager.h"
23 #include "src/core/util/env.h"
24 #include "test/core/end2end/fuzzers/fuzzer_input.pb.h"
25 #include "test/core/end2end/fuzzers/network_input.h"
26 #include "test/core/test_util/fuzz_config_vars.h"
27 #include "test/core/test_util/test_config.h"
28
29 bool squelch = true;
30 bool leak_check = true;
31
32 using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig;
33 using ::grpc_event_engine::experimental::EventEngine;
34 using ::grpc_event_engine::experimental::FuzzingEventEngine;
35 using ::grpc_event_engine::experimental::GetDefaultEventEngine;
36 using ::grpc_event_engine::experimental::MockEndpointController;
37 using ::grpc_event_engine::experimental::SetEventEngineFactory;
38 using ::grpc_event_engine::experimental::URIToResolvedAddress;
39
40 namespace grpc_core {
41 namespace {
42
43 class ConnectorFuzzer {
44 public:
ConnectorFuzzer(const fuzzer_input::Msg & msg,absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector> ()> make_security_connector,absl::FunctionRef<OrphanablePtr<SubchannelConnector> ()> make_connector)45 ConnectorFuzzer(
46 const fuzzer_input::Msg& msg,
47 absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
48 make_security_connector,
49 absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector)
50 : make_security_connector_(make_security_connector),
51 engine_([actions = msg.event_engine_actions()]() {
52 SetEventEngineFactory([actions]() -> std::unique_ptr<EventEngine> {
53 return std::make_unique<FuzzingEventEngine>(
54 FuzzingEventEngine::Options(), actions);
55 });
56 return std::dynamic_pointer_cast<FuzzingEventEngine>(
57 GetDefaultEventEngine());
58 }()),
59 mock_endpoint_controller_(MockEndpointController::Create(engine_)),
60 connector_(make_connector()) {
61 CHECK(engine_);
62 for (const auto& input : msg.network_input()) {
63 network_inputs_.push(input);
64 }
65 grpc_timer_manager_set_start_threaded(false);
66 grpc_init();
67 ExecCtx exec_ctx;
68 Executor::SetThreadingAll(false);
69 listener_ =
70 engine_
71 ->CreateListener(
72 [this](std::unique_ptr<EventEngine::Endpoint> endpoint,
__anon29e65f610402(std::unique_ptr<EventEngine::Endpoint> endpoint, MemoryAllocator) 73 MemoryAllocator) {
74 if (network_inputs_.empty()) return;
75 ScheduleWrites(network_inputs_.front(), std::move(endpoint),
76 engine_.get());
77 network_inputs_.pop();
78 },
__anon29e65f610502(absl::Status) 79 [](absl::Status) {}, ChannelArgsEndpointConfig(ChannelArgs{}),
80 std::make_unique<MemoryQuota>("foo"))
81 .value();
82 if (msg.has_shutdown_connector() &&
83 msg.shutdown_connector().delay_ms() > 0) {
84 auto shutdown_connector = msg.shutdown_connector();
85 const auto delay = Duration::Milliseconds(shutdown_connector.delay_ms());
86 engine_->RunAfterExactly(delay, [this, shutdown_connector = std::move(
__anon29e65f610602() 87 shutdown_connector)]() {
88 if (connector_ == nullptr) return;
89 connector_->Shutdown(absl::Status(
90 static_cast<absl::StatusCode>(shutdown_connector.shutdown_status()),
91 shutdown_connector.shutdown_message()));
92 });
93 }
94 // Abbreviated runtime for interpreting API actions, since we simply don't
95 // support many here.
96 uint64_t when_ms = 0;
97 for (const auto& action : msg.api_actions()) {
98 switch (action.type_case()) {
99 default:
100 break;
101 case api_fuzzer::Action::kSleepMs:
102 when_ms += action.sleep_ms();
103 break;
104 case api_fuzzer::Action::kResizeResourceQuota:
105 engine_->RunAfterExactly(
106 Duration::Milliseconds(when_ms),
__anon29e65f610702() 107 [this, new_size = action.resize_resource_quota()]() {
108 ExecCtx exec_ctx;
109 resource_quota_->memory_quota()->SetSize(new_size);
110 });
111 when_ms += 1;
112 break;
113 }
114 }
115 }
116
~ConnectorFuzzer()117 ~ConnectorFuzzer() {
118 listener_.reset();
119 connector_.reset();
120 mock_endpoint_controller_.reset();
121 engine_->TickUntilIdle();
122 grpc_shutdown_blocking();
123 engine_->UnsetGlobalHooks();
124 }
125
Run()126 void Run() {
127 grpc_resolved_address addr;
128 CHECK(grpc_parse_uri(URI::Parse("ipv4:127.0.0.1:1234").value(), &addr));
129 CHECK_OK(
130 listener_->Bind(URIToResolvedAddress("ipv4:127.0.0.1:1234").value()));
131 CHECK_OK(listener_->Start());
132 OrphanablePtr<grpc_endpoint> endpoint(
133 mock_endpoint_controller_->TakeCEndpoint());
134 SubchannelConnector::Result result;
135 bool done = false;
136 auto channel_args = ChannelArgs{}.SetObject<EventEngine>(engine_).SetObject(
137 resource_quota_);
138 auto security_connector = make_security_connector_();
139 if (security_connector != nullptr) {
140 channel_args = channel_args.SetObject(std::move(security_connector));
141 }
142 connector_->Connect(
143 SubchannelConnector::Args{&addr, nullptr,
144 Timestamp::Now() + Duration::Seconds(20),
145 channel_args},
146 &result, NewClosure([&done, &result](grpc_error_handle status) {
147 done = true;
148 if (status.ok()) result.transport->Orphan();
149 }));
150
151 while (!done) {
152 engine_->Tick();
153 grpc_timer_manager_tick();
154 }
155 }
156
157 private:
158 RefCountedPtr<ResourceQuota> resource_quota_ =
159 MakeRefCounted<ResourceQuota>("fuzzer");
160 absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
161 make_security_connector_;
162 std::shared_ptr<FuzzingEventEngine> engine_;
163 std::queue<fuzzer_input::NetworkInput> network_inputs_;
164 std::shared_ptr<MockEndpointController> mock_endpoint_controller_;
165 std::unique_ptr<EventEngine::Listener> listener_;
166 OrphanablePtr<SubchannelConnector> connector_;
167 };
168
169 } // namespace
170
RunConnectorFuzzer(const fuzzer_input::Msg & msg,absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector> ()> make_security_connector,absl::FunctionRef<OrphanablePtr<SubchannelConnector> ()> make_connector)171 void RunConnectorFuzzer(
172 const fuzzer_input::Msg& msg,
173 absl::FunctionRef<RefCountedPtr<grpc_channel_security_connector>()>
174 make_security_connector,
175 absl::FunctionRef<OrphanablePtr<SubchannelConnector>()> make_connector) {
176 if (squelch && !GetEnv("GRPC_TRACE_FUZZER").has_value()) {
177 grpc_disable_all_absl_logs();
178 }
179 static const int once = []() {
180 ForceEnableExperiment("event_engine_client", true);
181 ForceEnableExperiment("event_engine_listener", true);
182 return 42;
183 }();
184 CHECK_EQ(once, 42); // avoid unused variable warning
185 ApplyFuzzConfigVars(msg.config_vars());
186 TestOnlyReloadExperimentsFromConfigVariables();
187 ConnectorFuzzer(msg, make_security_connector, make_connector).Run();
188 }
189
190 } // namespace grpc_core
191