1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/grpc.h>
18
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <utility>
24
25 #include "absl/log/log.h"
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/time/time.h"
31 #include "absl/types/optional.h"
32 #include "envoy/service/discovery/v3/discovery.pb.h"
33 #include "src/core/lib/iomgr/timer_manager.h"
34 #include "src/core/util/orphanable.h"
35 #include "src/core/util/ref_counted_ptr.h"
36 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
37 #include "src/core/xds/grpc/xds_cluster.h"
38 #include "src/core/xds/grpc/xds_cluster_parser.h"
39 #include "src/core/xds/grpc/xds_endpoint.h"
40 #include "src/core/xds/grpc/xds_endpoint_parser.h"
41 #include "src/core/xds/grpc/xds_listener.h"
42 #include "src/core/xds/grpc/xds_listener_parser.h"
43 #include "src/core/xds/grpc/xds_route_config.h"
44 #include "src/core/xds/grpc/xds_route_config_parser.h"
45 #include "src/core/xds/xds_client/xds_bootstrap.h"
46 #include "src/core/xds/xds_client/xds_client.h"
47 #include "src/libfuzzer/libfuzzer_macro.h"
48 #include "test/core/event_engine/event_engine_test_utils.h"
49 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
50 #include "test/core/xds/xds_client_fuzzer.pb.h"
51 #include "test/core/xds/xds_client_test_peer.h"
52 #include "test/core/xds/xds_transport_fake.h"
53
54 using grpc_event_engine::experimental::FuzzingEventEngine;
55
56 namespace grpc_core {
57
58 class Fuzzer {
59 public:
Fuzzer(absl::string_view bootstrap_json,const fuzzing_event_engine::Actions & fuzzing_ee_actions)60 Fuzzer(absl::string_view bootstrap_json,
61 const fuzzing_event_engine::Actions& fuzzing_ee_actions) {
62 event_engine_ = std::make_shared<FuzzingEventEngine>(
63 FuzzingEventEngine::Options(), fuzzing_ee_actions);
64 grpc_timer_manager_set_start_threaded(false);
65 grpc_init();
66 auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
67 if (!bootstrap.ok()) {
68 LOG(ERROR) << "error creating bootstrap: " << bootstrap.status();
69 // Leave xds_client_ unset, so Act() will be a no-op.
70 return;
71 }
72 transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
73 []() { Crash("Multiple concurrent reads"); }, event_engine_);
74 transport_factory_->SetAutoCompleteMessagesFromClient(false);
75 transport_factory_->SetAbortOnUndrainedMessages(false);
76 xds_client_ = MakeRefCounted<XdsClient>(
77 std::move(*bootstrap), transport_factory_, event_engine_,
78 /*metrics_reporter=*/nullptr, "foo agent", "foo version");
79 }
80
~Fuzzer()81 ~Fuzzer() {
82 transport_factory_.reset();
83 xds_client_.reset();
84 event_engine_->FuzzingDone();
85 event_engine_->TickUntilIdle();
86 event_engine_->UnsetGlobalHooks();
87 grpc_event_engine::experimental::WaitForSingleOwner(
88 std::move(event_engine_));
89 grpc_shutdown_blocking();
90 }
91
Act(const xds_client_fuzzer::Action & action)92 void Act(const xds_client_fuzzer::Action& action) {
93 if (xds_client_ == nullptr) return;
94 switch (action.action_type_case()) {
95 case xds_client_fuzzer::Action::kStartWatch:
96 switch (action.start_watch().resource_type().resource_type_case()) {
97 case xds_client_fuzzer::ResourceType::kListener:
98 StartWatch(&listener_watchers_,
99 action.start_watch().resource_name());
100 break;
101 case xds_client_fuzzer::ResourceType::kRouteConfig:
102 StartWatch(&route_config_watchers_,
103 action.start_watch().resource_name());
104 break;
105 case xds_client_fuzzer::ResourceType::kCluster:
106 StartWatch(&cluster_watchers_,
107 action.start_watch().resource_name());
108 break;
109 case xds_client_fuzzer::ResourceType::kEndpoint:
110 StartWatch(&endpoint_watchers_,
111 action.start_watch().resource_name());
112 break;
113 case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
114 break;
115 }
116 break;
117 case xds_client_fuzzer::Action::kStopWatch:
118 switch (action.stop_watch().resource_type().resource_type_case()) {
119 case xds_client_fuzzer::ResourceType::kListener:
120 StopWatch(&listener_watchers_, action.stop_watch().resource_name());
121 break;
122 case xds_client_fuzzer::ResourceType::kRouteConfig:
123 StopWatch(&route_config_watchers_,
124 action.stop_watch().resource_name());
125 break;
126 case xds_client_fuzzer::ResourceType::kCluster:
127 StopWatch(&cluster_watchers_, action.stop_watch().resource_name());
128 break;
129 case xds_client_fuzzer::ResourceType::kEndpoint:
130 StopWatch(&endpoint_watchers_, action.stop_watch().resource_name());
131 break;
132 case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
133 break;
134 }
135 break;
136 case xds_client_fuzzer::Action::kDumpCsdsData:
137 testing::XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
138 break;
139 case xds_client_fuzzer::Action::kReportResourceCounts:
140 testing::XdsClientTestPeer(xds_client_.get())
141 .TestReportResourceCounts(
142 [](const testing::XdsClientTestPeer::ResourceCountLabels&
143 labels,
144 uint64_t count) {
145 LOG(INFO) << "xds_authority=\"" << labels.xds_authority
146 << "\", resource_type=\"" << labels.resource_type
147 << "\", cache_state=\"" << labels.cache_state
148 << "\" count=" << count;
149 });
150 break;
151 case xds_client_fuzzer::Action::kReportServerConnections:
152 testing::XdsClientTestPeer(xds_client_.get())
153 .TestReportServerConnections(
154 [](absl::string_view xds_server, bool connected) {
155 LOG(INFO) << "xds_server=\"" << xds_server
156 << "\" connected=" << connected;
157 });
158 break;
159 case xds_client_fuzzer::Action::kTriggerConnectionFailure:
160 TriggerConnectionFailure(
161 action.trigger_connection_failure().authority(),
162 ToAbslStatus(action.trigger_connection_failure().status()));
163 break;
164 case xds_client_fuzzer::Action::kReadMessageFromClient:
165 ReadMessageFromClient(action.read_message_from_client().stream_id(),
166 action.read_message_from_client().ok());
167 break;
168 case xds_client_fuzzer::Action::kSendMessageToClient:
169 SendMessageToClient(action.send_message_to_client().stream_id(),
170 action.send_message_to_client().response());
171 break;
172 case xds_client_fuzzer::Action::kSendStatusToClient:
173 SendStatusToClient(
174 action.send_status_to_client().stream_id(),
175 ToAbslStatus(action.send_status_to_client().status()));
176 break;
177 case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET:
178 break;
179 }
180 }
181
182 private:
183 template <typename ResourceTypeType>
184 class Watcher : public ResourceTypeType::WatcherInterface {
185 public:
186 using ResourceType = ResourceTypeType;
187
Watcher(std::string resource_name)188 explicit Watcher(std::string resource_name)
189 : resource_name_(std::move(resource_name)) {}
190
OnResourceChanged(absl::StatusOr<std::shared_ptr<const typename ResourceType::ResourceType>> resource,RefCountedPtr<XdsClient::ReadDelayHandle>)191 void OnResourceChanged(
192 absl::StatusOr<
193 std::shared_ptr<const typename ResourceType::ResourceType>>
194 resource,
195 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
196 override {
197 LOG(INFO) << "==> OnResourceChanged(" << ResourceType::Get()->type_url()
198 << " " << resource_name_ << "): "
199 << (resource.ok() ? (*resource)->ToString()
200 : resource.status().ToString());
201 }
202
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)203 void OnAmbientError(
204 absl::Status status,
205 RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
206 override {
207 LOG(INFO) << "==> OnAmbientError(" << ResourceType::Get()->type_url()
208 << " " << resource_name_ << "): " << status;
209 }
210
211 private:
212 std::string resource_name_;
213 };
214
215 using ListenerWatcher = Watcher<XdsListenerResourceType>;
216 using RouteConfigWatcher = Watcher<XdsRouteConfigResourceType>;
217 using ClusterWatcher = Watcher<XdsClusterResourceType>;
218 using EndpointWatcher = Watcher<XdsEndpointResourceType>;
219
220 template <typename WatcherType>
StartWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)221 void StartWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
222 std::string resource_name) {
223 LOG(INFO) << "### StartWatch("
224 << WatcherType::ResourceType::Get()->type_url() << " "
225 << resource_name << ")";
226 auto watcher = MakeRefCounted<WatcherType>(resource_name);
227 (*watchers)[resource_name].insert(watcher.get());
228 WatcherType::ResourceType::Get()->StartWatch(
229 xds_client_.get(), resource_name, std::move(watcher));
230 }
231
232 template <typename WatcherType>
StopWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)233 void StopWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
234 std::string resource_name) {
235 LOG(INFO) << "### StopWatch("
236 << WatcherType::ResourceType::Get()->type_url() << " "
237 << resource_name << ")";
238 auto& watchers_set = (*watchers)[resource_name];
239 auto it = watchers_set.begin();
240 if (it == watchers_set.end()) return;
241 WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(),
242 resource_name, *it);
243 watchers_set.erase(it);
244 }
245
ToAbslStatus(const xds_client_fuzzer::Status & status)246 static absl::Status ToAbslStatus(const xds_client_fuzzer::Status& status) {
247 return absl::Status(static_cast<absl::StatusCode>(status.code()),
248 status.message());
249 }
250
GetServer(const std::string & authority)251 const XdsBootstrap::XdsServer* GetServer(const std::string& authority) {
252 const GrpcXdsBootstrap& bootstrap =
253 static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap());
254 if (authority.empty()) return bootstrap.servers().front();
255 const auto* authority_entry =
256 static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
257 bootstrap.LookupAuthority(authority));
258 if (authority_entry == nullptr) return nullptr;
259 if (!authority_entry->servers().empty()) {
260 return authority_entry->servers().front();
261 }
262 return bootstrap.servers().front();
263 }
264
TriggerConnectionFailure(const std::string & authority,absl::Status status)265 void TriggerConnectionFailure(const std::string& authority,
266 absl::Status status) {
267 LOG(INFO) << "### TriggerConnectionFailure(" << authority
268 << "): " << status;
269 const auto* xds_server = GetServer(authority);
270 if (xds_server == nullptr) return;
271 transport_factory_->TriggerConnectionFailure(*xds_server,
272 std::move(status));
273 }
274
StreamIdMethod(const xds_client_fuzzer::StreamId & stream_id)275 static const char* StreamIdMethod(
276 const xds_client_fuzzer::StreamId& stream_id) {
277 switch (stream_id.method_case()) {
278 case xds_client_fuzzer::StreamId::kAds:
279 return FakeXdsTransportFactory::kAdsMethod;
280 case xds_client_fuzzer::StreamId::kLrs:
281 return FakeXdsTransportFactory::kLrsMethod;
282 case xds_client_fuzzer::StreamId::METHOD_NOT_SET:
283 return nullptr;
284 }
285 }
286
GetStream(const xds_client_fuzzer::StreamId & stream_id)287 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> GetStream(
288 const xds_client_fuzzer::StreamId& stream_id) {
289 const auto* xds_server = GetServer(stream_id.authority());
290 if (xds_server == nullptr) return nullptr;
291 const char* method = StreamIdMethod(stream_id);
292 if (method == nullptr) return nullptr;
293 return transport_factory_->WaitForStream(*xds_server, method);
294 }
295
StreamIdString(const xds_client_fuzzer::StreamId & stream_id)296 static std::string StreamIdString(
297 const xds_client_fuzzer::StreamId& stream_id) {
298 return absl::StrCat("{authority=\"", stream_id.authority(),
299 "\", method=", StreamIdMethod(stream_id), "}");
300 }
301
ReadMessageFromClient(const xds_client_fuzzer::StreamId & stream_id,bool ok)302 void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id,
303 bool ok) {
304 LOG(INFO) << "### ReadMessageFromClient(" << StreamIdString(stream_id)
305 << "): " << (ok ? "true" : "false");
306 auto stream = GetStream(stream_id);
307 if (stream == nullptr) return;
308 LOG(INFO) << " stream=" << stream.get();
309 auto message = stream->WaitForMessageFromClient();
310 if (message.has_value()) {
311 LOG(INFO) << " completing send_message";
312 stream->CompleteSendMessageFromClient(ok);
313 }
314 }
315
SendMessageToClient(const xds_client_fuzzer::StreamId & stream_id,const envoy::service::discovery::v3::DiscoveryResponse & response)316 void SendMessageToClient(
317 const xds_client_fuzzer::StreamId& stream_id,
318 const envoy::service::discovery::v3::DiscoveryResponse& response) {
319 LOG(INFO) << "### SendMessageToClient(" << StreamIdString(stream_id) << ")";
320 auto stream = GetStream(stream_id);
321 if (stream == nullptr) return;
322 LOG(INFO) << " stream=" << stream.get();
323 stream->SendMessageToClient(response.SerializeAsString());
324 }
325
SendStatusToClient(const xds_client_fuzzer::StreamId & stream_id,absl::Status status)326 void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id,
327 absl::Status status) {
328 LOG(INFO) << "### SendStatusToClient(" << StreamIdString(stream_id)
329 << "): " << status;
330 auto stream = GetStream(stream_id);
331 if (stream == nullptr) return;
332 LOG(INFO) << " stream=" << stream.get();
333 stream->MaybeSendStatusToClient(std::move(status));
334 }
335
336 std::shared_ptr<FuzzingEventEngine> event_engine_;
337 RefCountedPtr<XdsClient> xds_client_;
338 RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
339
340 // Maps of currently active watchers for each resource type, keyed by
341 // resource name.
342 std::map<std::string, std::set<ListenerWatcher*>> listener_watchers_;
343 std::map<std::string, std::set<RouteConfigWatcher*>> route_config_watchers_;
344 std::map<std::string, std::set<ClusterWatcher*>> cluster_watchers_;
345 std::map<std::string, std::set<EndpointWatcher*>> endpoint_watchers_;
346 };
347
348 } // namespace grpc_core
349
350 bool squelch = true;
351
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg & message)352 DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg& message) {
353 grpc_core::Fuzzer fuzzer(message.bootstrap(),
354 message.fuzzing_event_engine_actions());
355 for (int i = 0; i < message.actions_size(); i++) {
356 fuzzer.Act(message.actions(i));
357 }
358 }
359