• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
18 #define GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
19 
20 #include <grpc/support/port_platform.h>
21 #include <stddef.h>
22 
23 #include <deque>
24 #include <functional>
25 #include <map>
26 #include <memory>
27 #include <string>
28 #include <utility>
29 
30 #include "absl/base/thread_annotations.h"
31 #include "absl/status/status.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/time/time.h"
34 #include "absl/types/optional.h"
35 #include "src/core/util/orphanable.h"
36 #include "src/core/util/ref_counted.h"
37 #include "src/core/util/ref_counted_ptr.h"
38 #include "src/core/util/sync.h"
39 #include "src/core/xds/xds_client/xds_bootstrap.h"
40 #include "src/core/xds/xds_client/xds_transport.h"
41 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
42 
43 namespace grpc_core {
44 
45 class FakeXdsTransportFactory : public XdsTransportFactory {
46  private:
47   class FakeXdsTransport;
48 
49  public:
50   static constexpr char kAdsMethod[] =
51       "/envoy.service.discovery.v3.AggregatedDiscoveryService/"
52       "StreamAggregatedResources";
53   static constexpr char kLrsMethod[] =
54       "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats";
55 
56   class FakeStreamingCall : public XdsTransport::StreamingCall {
57    public:
FakeStreamingCall(WeakRefCountedPtr<FakeXdsTransport> transport,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)58     FakeStreamingCall(
59         WeakRefCountedPtr<FakeXdsTransport> transport, const char* method,
60         std::unique_ptr<StreamingCall::EventHandler> event_handler)
61         : transport_(std::move(transport)),
62           method_(method),
63           event_engine_(transport_->factory()->event_engine_),
64           event_handler_(MakeRefCounted<RefCountedEventHandler>(
65               std::move(event_handler))) {}
66 
67     ~FakeStreamingCall() override;
68 
69     void Orphan() override;
70 
71     bool IsOrphaned();
72 
73     void StartRecvMessage() override;
74 
75     using StreamingCall::Ref;  // Make it public.
76 
77     bool HaveMessageFromClient();
78     absl::optional<std::string> WaitForMessageFromClient();
79 
80     // If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient()
81     // was called to set the value to false before the creation of the
82     // transport that underlies this stream, then this must be called
83     // to invoke EventHandler::OnRequestSent() for every message read
84     // via WaitForMessageFromClient().
85     void CompleteSendMessageFromClient(bool ok = true);
86 
87     void SendMessageToClient(absl::string_view payload);
88     void MaybeSendStatusToClient(absl::Status status);
89 
90     bool WaitForReadsStarted(size_t expected);
91 
92    private:
93     class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> {
94      public:
RefCountedEventHandler(std::unique_ptr<StreamingCall::EventHandler> event_handler)95       explicit RefCountedEventHandler(
96           std::unique_ptr<StreamingCall::EventHandler> event_handler)
97           : event_handler_(std::move(event_handler)) {}
98 
OnRequestSent(bool ok)99       void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); }
OnRecvMessage(absl::string_view payload)100       void OnRecvMessage(absl::string_view payload) {
101         event_handler_->OnRecvMessage(payload);
102       }
OnStatusReceived(absl::Status status)103       void OnStatusReceived(absl::Status status) {
104         event_handler_->OnStatusReceived(std::move(status));
105       }
106 
107      private:
108       std::unique_ptr<StreamingCall::EventHandler> event_handler_;
109     };
110 
111     void SendMessage(std::string payload) override;
112 
113     void CompleteSendMessageFromClientLocked(bool ok)
114         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
115     void MaybeDeliverMessageToClient();
116 
117     WeakRefCountedPtr<FakeXdsTransport> transport_;
118     const char* method_;
119     std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
120         event_engine_;
121 
122     Mutex mu_;
123     RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_);
124     std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_);
125     bool status_sent_ ABSL_GUARDED_BY(&mu_) = false;
126     bool orphaned_ ABSL_GUARDED_BY(&mu_) = false;
127     size_t reads_started_ ABSL_GUARDED_BY(&mu_) = 0;
128     size_t num_pending_reads_ ABSL_GUARDED_BY(&mu_) = 0;
129     std::deque<std::string> to_client_messages_ ABSL_GUARDED_BY(&mu_);
130   };
131 
FakeXdsTransportFactory(std::function<void ()> too_many_pending_reads_callback,std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> event_engine)132   explicit FakeXdsTransportFactory(
133       std::function<void()> too_many_pending_reads_callback,
134       std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
135           event_engine)
136       : event_engine_(std::move(event_engine)),
137         too_many_pending_reads_callback_(
138             std::move(too_many_pending_reads_callback)) {}
139 
140   void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server,
141                                 absl::Status status);
142 
143   // By default, FakeStreamingCall will automatically invoke
144   // EventHandler::OnRequestSent() upon reading a request from the client.
145   // If this is set to false, that behavior will be inhibited, and
146   // EventHandler::OnRequestSent() will not be called until the test
147   // explicitly calls FakeStreamingCall::CompleteSendMessageFromClient().
148   //
149   // This value affects all transports created after this call is
150   // complete.  Any transport that already exists prior to this call
151   // will not be affected.
152   void SetAutoCompleteMessagesFromClient(bool value);
153 
154   // By default, FakeStreamingCall will automatically crash on
155   // destruction if there are messages from the client that have not
156   // been drained from the queue.  If this is set to false, that
157   // behavior will be inhibited.
158   //
159   // This value affects all transports created after this call is
160   // complete.  Any transport that already exists prior to this call
161   // will not be affected.
162   void SetAbortOnUndrainedMessages(bool value);
163 
164   RefCountedPtr<FakeStreamingCall> WaitForStream(
165       const XdsBootstrap::XdsServer& server, const char* method);
166 
167   void Orphaned() override;
168 
169  private:
170   class FakeXdsTransport : public XdsTransport {
171    public:
FakeXdsTransport(WeakRefCountedPtr<FakeXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,bool auto_complete_messages_from_client,bool abort_on_undrained_messages)172     FakeXdsTransport(WeakRefCountedPtr<FakeXdsTransportFactory> factory,
173                      const XdsBootstrap::XdsServer& server,
174                      bool auto_complete_messages_from_client,
175                      bool abort_on_undrained_messages)
176         : factory_(std::move(factory)),
177           server_(server),
178           auto_complete_messages_from_client_(
179               auto_complete_messages_from_client),
180           abort_on_undrained_messages_(abort_on_undrained_messages),
181           event_engine_(factory_->event_engine_) {}
182 
183     void Orphaned() override;
184 
auto_complete_messages_from_client()185     bool auto_complete_messages_from_client() const {
186       return auto_complete_messages_from_client_;
187     }
188 
abort_on_undrained_messages()189     bool abort_on_undrained_messages() const {
190       return abort_on_undrained_messages_;
191     }
192 
193     void TriggerConnectionFailure(absl::Status status);
194 
195     RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method);
196 
197     void RemoveStream(const char* method, FakeStreamingCall* call);
198 
factory()199     FakeXdsTransportFactory* factory() const { return factory_.get(); }
200 
server()201     const XdsBootstrap::XdsServer* server() const { return &server_; }
202 
203    private:
204     void StartConnectivityFailureWatch(
205         RefCountedPtr<ConnectivityFailureWatcher> watcher) override;
206     void StopConnectivityFailureWatch(
207         const RefCountedPtr<ConnectivityFailureWatcher>& watcher) override;
208 
209     OrphanablePtr<StreamingCall> CreateStreamingCall(
210         const char* method,
211         std::unique_ptr<StreamingCall::EventHandler> event_handler) override;
212 
ResetBackoff()213     void ResetBackoff() override {}
214 
215     WeakRefCountedPtr<FakeXdsTransportFactory> factory_;
216     const XdsBootstrap::XdsServer& server_;
217     const bool auto_complete_messages_from_client_;
218     const bool abort_on_undrained_messages_;
219     std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
220         event_engine_;
221 
222     Mutex mu_;
223     std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers_
224         ABSL_GUARDED_BY(&mu_);
225     std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>>
226         active_calls_ ABSL_GUARDED_BY(&mu_);
227   };
228 
229   // Returns an existing transport or creates a new one.
230   RefCountedPtr<XdsTransport> GetTransport(
231       const XdsBootstrap::XdsServer& server, absl::Status* /*status*/) override;
232 
233   // Returns an existing transport, if any, or nullptr.
234   RefCountedPtr<FakeXdsTransport> GetTransport(
235       const XdsBootstrap::XdsServer& server);
236 
237   RefCountedPtr<FakeXdsTransport> GetTransportLocked(const std::string& key)
238       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
239 
240   std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine>
241       event_engine_;
242 
243   Mutex mu_;
244   std::map<std::string /*XdsServer key*/, FakeXdsTransport*> transport_map_
245       ABSL_GUARDED_BY(&mu_);
246   bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true;
247   bool abort_on_undrained_messages_ ABSL_GUARDED_BY(&mu_) = true;
248   std::function<void()> too_many_pending_reads_callback_;
249 };
250 
251 }  // namespace grpc_core
252 
253 #endif  // GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H
254