• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/grpc.h>
18 
19 #include <map>
20 #include <memory>
21 #include <set>
22 #include <string>
23 #include <utility>
24 
25 #include "absl/log/log.h"
26 #include "absl/status/status.h"
27 #include "absl/status/statusor.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/strings/string_view.h"
30 #include "absl/time/time.h"
31 #include "absl/types/optional.h"
32 #include "envoy/service/discovery/v3/discovery.pb.h"
33 #include "src/core/lib/iomgr/timer_manager.h"
34 #include "src/core/util/orphanable.h"
35 #include "src/core/util/ref_counted_ptr.h"
36 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
37 #include "src/core/xds/grpc/xds_cluster.h"
38 #include "src/core/xds/grpc/xds_cluster_parser.h"
39 #include "src/core/xds/grpc/xds_endpoint.h"
40 #include "src/core/xds/grpc/xds_endpoint_parser.h"
41 #include "src/core/xds/grpc/xds_listener.h"
42 #include "src/core/xds/grpc/xds_listener_parser.h"
43 #include "src/core/xds/grpc/xds_route_config.h"
44 #include "src/core/xds/grpc/xds_route_config_parser.h"
45 #include "src/core/xds/xds_client/xds_bootstrap.h"
46 #include "src/core/xds/xds_client/xds_client.h"
47 #include "src/libfuzzer/libfuzzer_macro.h"
48 #include "test/core/event_engine/event_engine_test_utils.h"
49 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
50 #include "test/core/xds/xds_client_fuzzer.pb.h"
51 #include "test/core/xds/xds_client_test_peer.h"
52 #include "test/core/xds/xds_transport_fake.h"
53 
54 using grpc_event_engine::experimental::FuzzingEventEngine;
55 
56 namespace grpc_core {
57 
58 class Fuzzer {
59  public:
Fuzzer(absl::string_view bootstrap_json,const fuzzing_event_engine::Actions & fuzzing_ee_actions)60   Fuzzer(absl::string_view bootstrap_json,
61          const fuzzing_event_engine::Actions& fuzzing_ee_actions) {
62     event_engine_ = std::make_shared<FuzzingEventEngine>(
63         FuzzingEventEngine::Options(), fuzzing_ee_actions);
64     grpc_timer_manager_set_start_threaded(false);
65     grpc_init();
66     auto bootstrap = GrpcXdsBootstrap::Create(bootstrap_json);
67     if (!bootstrap.ok()) {
68       LOG(ERROR) << "error creating bootstrap: " << bootstrap.status();
69       // Leave xds_client_ unset, so Act() will be a no-op.
70       return;
71     }
72     transport_factory_ = MakeRefCounted<FakeXdsTransportFactory>(
73         []() { Crash("Multiple concurrent reads"); }, event_engine_);
74     transport_factory_->SetAutoCompleteMessagesFromClient(false);
75     transport_factory_->SetAbortOnUndrainedMessages(false);
76     xds_client_ = MakeRefCounted<XdsClient>(
77         std::move(*bootstrap), transport_factory_, event_engine_,
78         /*metrics_reporter=*/nullptr, "foo agent", "foo version");
79   }
80 
~Fuzzer()81   ~Fuzzer() {
82     transport_factory_.reset();
83     xds_client_.reset();
84     event_engine_->FuzzingDone();
85     event_engine_->TickUntilIdle();
86     event_engine_->UnsetGlobalHooks();
87     grpc_event_engine::experimental::WaitForSingleOwner(
88         std::move(event_engine_));
89     grpc_shutdown_blocking();
90   }
91 
Act(const xds_client_fuzzer::Action & action)92   void Act(const xds_client_fuzzer::Action& action) {
93     if (xds_client_ == nullptr) return;
94     switch (action.action_type_case()) {
95       case xds_client_fuzzer::Action::kStartWatch:
96         switch (action.start_watch().resource_type().resource_type_case()) {
97           case xds_client_fuzzer::ResourceType::kListener:
98             StartWatch(&listener_watchers_,
99                        action.start_watch().resource_name());
100             break;
101           case xds_client_fuzzer::ResourceType::kRouteConfig:
102             StartWatch(&route_config_watchers_,
103                        action.start_watch().resource_name());
104             break;
105           case xds_client_fuzzer::ResourceType::kCluster:
106             StartWatch(&cluster_watchers_,
107                        action.start_watch().resource_name());
108             break;
109           case xds_client_fuzzer::ResourceType::kEndpoint:
110             StartWatch(&endpoint_watchers_,
111                        action.start_watch().resource_name());
112             break;
113           case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
114             break;
115         }
116         break;
117       case xds_client_fuzzer::Action::kStopWatch:
118         switch (action.stop_watch().resource_type().resource_type_case()) {
119           case xds_client_fuzzer::ResourceType::kListener:
120             StopWatch(&listener_watchers_, action.stop_watch().resource_name());
121             break;
122           case xds_client_fuzzer::ResourceType::kRouteConfig:
123             StopWatch(&route_config_watchers_,
124                       action.stop_watch().resource_name());
125             break;
126           case xds_client_fuzzer::ResourceType::kCluster:
127             StopWatch(&cluster_watchers_, action.stop_watch().resource_name());
128             break;
129           case xds_client_fuzzer::ResourceType::kEndpoint:
130             StopWatch(&endpoint_watchers_, action.stop_watch().resource_name());
131             break;
132           case xds_client_fuzzer::ResourceType::RESOURCE_TYPE_NOT_SET:
133             break;
134         }
135         break;
136       case xds_client_fuzzer::Action::kDumpCsdsData:
137         testing::XdsClientTestPeer(xds_client_.get()).TestDumpClientConfig();
138         break;
139       case xds_client_fuzzer::Action::kReportResourceCounts:
140         testing::XdsClientTestPeer(xds_client_.get())
141             .TestReportResourceCounts(
142                 [](const testing::XdsClientTestPeer::ResourceCountLabels&
143                        labels,
144                    uint64_t count) {
145                   LOG(INFO) << "xds_authority=\"" << labels.xds_authority
146                             << "\", resource_type=\"" << labels.resource_type
147                             << "\", cache_state=\"" << labels.cache_state
148                             << "\" count=" << count;
149                 });
150         break;
151       case xds_client_fuzzer::Action::kReportServerConnections:
152         testing::XdsClientTestPeer(xds_client_.get())
153             .TestReportServerConnections(
154                 [](absl::string_view xds_server, bool connected) {
155                   LOG(INFO) << "xds_server=\"" << xds_server
156                             << "\" connected=" << connected;
157                 });
158         break;
159       case xds_client_fuzzer::Action::kTriggerConnectionFailure:
160         TriggerConnectionFailure(
161             action.trigger_connection_failure().authority(),
162             ToAbslStatus(action.trigger_connection_failure().status()));
163         break;
164       case xds_client_fuzzer::Action::kReadMessageFromClient:
165         ReadMessageFromClient(action.read_message_from_client().stream_id(),
166                               action.read_message_from_client().ok());
167         break;
168       case xds_client_fuzzer::Action::kSendMessageToClient:
169         SendMessageToClient(action.send_message_to_client().stream_id(),
170                             action.send_message_to_client().response());
171         break;
172       case xds_client_fuzzer::Action::kSendStatusToClient:
173         SendStatusToClient(
174             action.send_status_to_client().stream_id(),
175             ToAbslStatus(action.send_status_to_client().status()));
176         break;
177       case xds_client_fuzzer::Action::ACTION_TYPE_NOT_SET:
178         break;
179     }
180   }
181 
182  private:
183   template <typename ResourceTypeType>
184   class Watcher : public ResourceTypeType::WatcherInterface {
185    public:
186     using ResourceType = ResourceTypeType;
187 
Watcher(std::string resource_name)188     explicit Watcher(std::string resource_name)
189         : resource_name_(std::move(resource_name)) {}
190 
OnResourceChanged(absl::StatusOr<std::shared_ptr<const typename ResourceType::ResourceType>> resource,RefCountedPtr<XdsClient::ReadDelayHandle>)191     void OnResourceChanged(
192         absl::StatusOr<
193             std::shared_ptr<const typename ResourceType::ResourceType>>
194             resource,
195         RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
196         override {
197       LOG(INFO) << "==> OnResourceChanged(" << ResourceType::Get()->type_url()
198                 << " " << resource_name_ << "): "
199                 << (resource.ok() ? (*resource)->ToString()
200                                   : resource.status().ToString());
201     }
202 
OnAmbientError(absl::Status status,RefCountedPtr<XdsClient::ReadDelayHandle>)203     void OnAmbientError(
204         absl::Status status,
205         RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
206         override {
207       LOG(INFO) << "==> OnAmbientError(" << ResourceType::Get()->type_url()
208                 << " " << resource_name_ << "): " << status;
209     }
210 
211    private:
212     std::string resource_name_;
213   };
214 
215   using ListenerWatcher = Watcher<XdsListenerResourceType>;
216   using RouteConfigWatcher = Watcher<XdsRouteConfigResourceType>;
217   using ClusterWatcher = Watcher<XdsClusterResourceType>;
218   using EndpointWatcher = Watcher<XdsEndpointResourceType>;
219 
220   template <typename WatcherType>
StartWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)221   void StartWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
222                   std::string resource_name) {
223     LOG(INFO) << "### StartWatch("
224               << WatcherType::ResourceType::Get()->type_url() << " "
225               << resource_name << ")";
226     auto watcher = MakeRefCounted<WatcherType>(resource_name);
227     (*watchers)[resource_name].insert(watcher.get());
228     WatcherType::ResourceType::Get()->StartWatch(
229         xds_client_.get(), resource_name, std::move(watcher));
230   }
231 
232   template <typename WatcherType>
StopWatch(std::map<std::string,std::set<WatcherType * >> * watchers,std::string resource_name)233   void StopWatch(std::map<std::string, std::set<WatcherType*>>* watchers,
234                  std::string resource_name) {
235     LOG(INFO) << "### StopWatch("
236               << WatcherType::ResourceType::Get()->type_url() << " "
237               << resource_name << ")";
238     auto& watchers_set = (*watchers)[resource_name];
239     auto it = watchers_set.begin();
240     if (it == watchers_set.end()) return;
241     WatcherType::ResourceType::Get()->CancelWatch(xds_client_.get(),
242                                                   resource_name, *it);
243     watchers_set.erase(it);
244   }
245 
ToAbslStatus(const xds_client_fuzzer::Status & status)246   static absl::Status ToAbslStatus(const xds_client_fuzzer::Status& status) {
247     return absl::Status(static_cast<absl::StatusCode>(status.code()),
248                         status.message());
249   }
250 
GetServer(const std::string & authority)251   const XdsBootstrap::XdsServer* GetServer(const std::string& authority) {
252     const GrpcXdsBootstrap& bootstrap =
253         static_cast<const GrpcXdsBootstrap&>(xds_client_->bootstrap());
254     if (authority.empty()) return bootstrap.servers().front();
255     const auto* authority_entry =
256         static_cast<const GrpcXdsBootstrap::GrpcAuthority*>(
257             bootstrap.LookupAuthority(authority));
258     if (authority_entry == nullptr) return nullptr;
259     if (!authority_entry->servers().empty()) {
260       return authority_entry->servers().front();
261     }
262     return bootstrap.servers().front();
263   }
264 
TriggerConnectionFailure(const std::string & authority,absl::Status status)265   void TriggerConnectionFailure(const std::string& authority,
266                                 absl::Status status) {
267     LOG(INFO) << "### TriggerConnectionFailure(" << authority
268               << "): " << status;
269     const auto* xds_server = GetServer(authority);
270     if (xds_server == nullptr) return;
271     transport_factory_->TriggerConnectionFailure(*xds_server,
272                                                  std::move(status));
273   }
274 
StreamIdMethod(const xds_client_fuzzer::StreamId & stream_id)275   static const char* StreamIdMethod(
276       const xds_client_fuzzer::StreamId& stream_id) {
277     switch (stream_id.method_case()) {
278       case xds_client_fuzzer::StreamId::kAds:
279         return FakeXdsTransportFactory::kAdsMethod;
280       case xds_client_fuzzer::StreamId::kLrs:
281         return FakeXdsTransportFactory::kLrsMethod;
282       case xds_client_fuzzer::StreamId::METHOD_NOT_SET:
283         return nullptr;
284     }
285   }
286 
GetStream(const xds_client_fuzzer::StreamId & stream_id)287   RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall> GetStream(
288       const xds_client_fuzzer::StreamId& stream_id) {
289     const auto* xds_server = GetServer(stream_id.authority());
290     if (xds_server == nullptr) return nullptr;
291     const char* method = StreamIdMethod(stream_id);
292     if (method == nullptr) return nullptr;
293     return transport_factory_->WaitForStream(*xds_server, method);
294   }
295 
StreamIdString(const xds_client_fuzzer::StreamId & stream_id)296   static std::string StreamIdString(
297       const xds_client_fuzzer::StreamId& stream_id) {
298     return absl::StrCat("{authority=\"", stream_id.authority(),
299                         "\", method=", StreamIdMethod(stream_id), "}");
300   }
301 
ReadMessageFromClient(const xds_client_fuzzer::StreamId & stream_id,bool ok)302   void ReadMessageFromClient(const xds_client_fuzzer::StreamId& stream_id,
303                              bool ok) {
304     LOG(INFO) << "### ReadMessageFromClient(" << StreamIdString(stream_id)
305               << "): " << (ok ? "true" : "false");
306     auto stream = GetStream(stream_id);
307     if (stream == nullptr) return;
308     LOG(INFO) << "    stream=" << stream.get();
309     auto message = stream->WaitForMessageFromClient();
310     if (message.has_value()) {
311       LOG(INFO) << "    completing send_message";
312       stream->CompleteSendMessageFromClient(ok);
313     }
314   }
315 
SendMessageToClient(const xds_client_fuzzer::StreamId & stream_id,const envoy::service::discovery::v3::DiscoveryResponse & response)316   void SendMessageToClient(
317       const xds_client_fuzzer::StreamId& stream_id,
318       const envoy::service::discovery::v3::DiscoveryResponse& response) {
319     LOG(INFO) << "### SendMessageToClient(" << StreamIdString(stream_id) << ")";
320     auto stream = GetStream(stream_id);
321     if (stream == nullptr) return;
322     LOG(INFO) << "    stream=" << stream.get();
323     stream->SendMessageToClient(response.SerializeAsString());
324   }
325 
SendStatusToClient(const xds_client_fuzzer::StreamId & stream_id,absl::Status status)326   void SendStatusToClient(const xds_client_fuzzer::StreamId& stream_id,
327                           absl::Status status) {
328     LOG(INFO) << "### SendStatusToClient(" << StreamIdString(stream_id)
329               << "): " << status;
330     auto stream = GetStream(stream_id);
331     if (stream == nullptr) return;
332     LOG(INFO) << "    stream=" << stream.get();
333     stream->MaybeSendStatusToClient(std::move(status));
334   }
335 
336   std::shared_ptr<FuzzingEventEngine> event_engine_;
337   RefCountedPtr<XdsClient> xds_client_;
338   RefCountedPtr<FakeXdsTransportFactory> transport_factory_;
339 
340   // Maps of currently active watchers for each resource type, keyed by
341   // resource name.
342   std::map<std::string, std::set<ListenerWatcher*>> listener_watchers_;
343   std::map<std::string, std::set<RouteConfigWatcher*>> route_config_watchers_;
344   std::map<std::string, std::set<ClusterWatcher*>> cluster_watchers_;
345   std::map<std::string, std::set<EndpointWatcher*>> endpoint_watchers_;
346 };
347 
348 }  // namespace grpc_core
349 
350 bool squelch = true;
351 
DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg & message)352 DEFINE_PROTO_FUZZER(const xds_client_fuzzer::Msg& message) {
353   grpc_core::Fuzzer fuzzer(message.bootstrap(),
354                            message.fuzzing_event_engine_actions());
355   for (int i = 0; i < message.actions_size(); i++) {
356     fuzzer.Act(message.actions(i));
357   }
358 }
359