1 // 2 // Copyright 2022 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 18 #define GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 19 20 #include <grpc/support/port_platform.h> 21 #include <stddef.h> 22 23 #include <deque> 24 #include <functional> 25 #include <map> 26 #include <memory> 27 #include <string> 28 #include <utility> 29 30 #include "absl/base/thread_annotations.h" 31 #include "absl/status/status.h" 32 #include "absl/strings/string_view.h" 33 #include "absl/time/time.h" 34 #include "absl/types/optional.h" 35 #include "src/core/util/orphanable.h" 36 #include "src/core/util/ref_counted.h" 37 #include "src/core/util/ref_counted_ptr.h" 38 #include "src/core/util/sync.h" 39 #include "src/core/xds/xds_client/xds_bootstrap.h" 40 #include "src/core/xds/xds_client/xds_transport.h" 41 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h" 42 43 namespace grpc_core { 44 45 class FakeXdsTransportFactory : public XdsTransportFactory { 46 private: 47 class FakeXdsTransport; 48 49 public: 50 static constexpr char kAdsMethod[] = 51 "/envoy.service.discovery.v3.AggregatedDiscoveryService/" 52 "StreamAggregatedResources"; 53 static constexpr char kLrsMethod[] = 54 "/envoy.service.load_stats.v3.LoadReportingService/StreamLoadStats"; 55 56 class FakeStreamingCall : public XdsTransport::StreamingCall { 57 public: FakeStreamingCall(WeakRefCountedPtr<FakeXdsTransport> transport,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)58 FakeStreamingCall( 59 WeakRefCountedPtr<FakeXdsTransport> transport, const char* method, 60 std::unique_ptr<StreamingCall::EventHandler> event_handler) 61 : transport_(std::move(transport)), 62 method_(method), 63 event_engine_(transport_->factory()->event_engine_), 64 event_handler_(MakeRefCounted<RefCountedEventHandler>( 65 std::move(event_handler))) {} 66 67 ~FakeStreamingCall() override; 68 69 void Orphan() override; 70 71 bool IsOrphaned(); 72 73 void StartRecvMessage() override; 74 75 using StreamingCall::Ref; // Make it public. 76 77 bool HaveMessageFromClient(); 78 absl::optional<std::string> WaitForMessageFromClient(); 79 80 // If FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient() 81 // was called to set the value to false before the creation of the 82 // transport that underlies this stream, then this must be called 83 // to invoke EventHandler::OnRequestSent() for every message read 84 // via WaitForMessageFromClient(). 85 void CompleteSendMessageFromClient(bool ok = true); 86 87 void SendMessageToClient(absl::string_view payload); 88 void MaybeSendStatusToClient(absl::Status status); 89 90 bool WaitForReadsStarted(size_t expected); 91 92 private: 93 class RefCountedEventHandler : public RefCounted<RefCountedEventHandler> { 94 public: RefCountedEventHandler(std::unique_ptr<StreamingCall::EventHandler> event_handler)95 explicit RefCountedEventHandler( 96 std::unique_ptr<StreamingCall::EventHandler> event_handler) 97 : event_handler_(std::move(event_handler)) {} 98 OnRequestSent(bool ok)99 void OnRequestSent(bool ok) { event_handler_->OnRequestSent(ok); } OnRecvMessage(absl::string_view payload)100 void OnRecvMessage(absl::string_view payload) { 101 event_handler_->OnRecvMessage(payload); 102 } OnStatusReceived(absl::Status status)103 void OnStatusReceived(absl::Status status) { 104 event_handler_->OnStatusReceived(std::move(status)); 105 } 106 107 private: 108 std::unique_ptr<StreamingCall::EventHandler> event_handler_; 109 }; 110 111 void SendMessage(std::string payload) override; 112 113 void CompleteSendMessageFromClientLocked(bool ok) 114 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 115 void MaybeDeliverMessageToClient(); 116 117 WeakRefCountedPtr<FakeXdsTransport> transport_; 118 const char* method_; 119 std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> 120 event_engine_; 121 122 Mutex mu_; 123 RefCountedPtr<RefCountedEventHandler> event_handler_ ABSL_GUARDED_BY(&mu_); 124 std::deque<std::string> from_client_messages_ ABSL_GUARDED_BY(&mu_); 125 bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; 126 bool orphaned_ ABSL_GUARDED_BY(&mu_) = false; 127 size_t reads_started_ ABSL_GUARDED_BY(&mu_) = 0; 128 size_t num_pending_reads_ ABSL_GUARDED_BY(&mu_) = 0; 129 std::deque<std::string> to_client_messages_ ABSL_GUARDED_BY(&mu_); 130 }; 131 FakeXdsTransportFactory(std::function<void ()> too_many_pending_reads_callback,std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> event_engine)132 explicit FakeXdsTransportFactory( 133 std::function<void()> too_many_pending_reads_callback, 134 std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> 135 event_engine) 136 : event_engine_(std::move(event_engine)), 137 too_many_pending_reads_callback_( 138 std::move(too_many_pending_reads_callback)) {} 139 140 void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, 141 absl::Status status); 142 143 // By default, FakeStreamingCall will automatically invoke 144 // EventHandler::OnRequestSent() upon reading a request from the client. 145 // If this is set to false, that behavior will be inhibited, and 146 // EventHandler::OnRequestSent() will not be called until the test 147 // explicitly calls FakeStreamingCall::CompleteSendMessageFromClient(). 148 // 149 // This value affects all transports created after this call is 150 // complete. Any transport that already exists prior to this call 151 // will not be affected. 152 void SetAutoCompleteMessagesFromClient(bool value); 153 154 // By default, FakeStreamingCall will automatically crash on 155 // destruction if there are messages from the client that have not 156 // been drained from the queue. If this is set to false, that 157 // behavior will be inhibited. 158 // 159 // This value affects all transports created after this call is 160 // complete. Any transport that already exists prior to this call 161 // will not be affected. 162 void SetAbortOnUndrainedMessages(bool value); 163 164 RefCountedPtr<FakeStreamingCall> WaitForStream( 165 const XdsBootstrap::XdsServer& server, const char* method); 166 167 void Orphaned() override; 168 169 private: 170 class FakeXdsTransport : public XdsTransport { 171 public: FakeXdsTransport(WeakRefCountedPtr<FakeXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,bool auto_complete_messages_from_client,bool abort_on_undrained_messages)172 FakeXdsTransport(WeakRefCountedPtr<FakeXdsTransportFactory> factory, 173 const XdsBootstrap::XdsServer& server, 174 bool auto_complete_messages_from_client, 175 bool abort_on_undrained_messages) 176 : factory_(std::move(factory)), 177 server_(server), 178 auto_complete_messages_from_client_( 179 auto_complete_messages_from_client), 180 abort_on_undrained_messages_(abort_on_undrained_messages), 181 event_engine_(factory_->event_engine_) {} 182 183 void Orphaned() override; 184 auto_complete_messages_from_client()185 bool auto_complete_messages_from_client() const { 186 return auto_complete_messages_from_client_; 187 } 188 abort_on_undrained_messages()189 bool abort_on_undrained_messages() const { 190 return abort_on_undrained_messages_; 191 } 192 193 void TriggerConnectionFailure(absl::Status status); 194 195 RefCountedPtr<FakeStreamingCall> WaitForStream(const char* method); 196 197 void RemoveStream(const char* method, FakeStreamingCall* call); 198 factory()199 FakeXdsTransportFactory* factory() const { return factory_.get(); } 200 server()201 const XdsBootstrap::XdsServer* server() const { return &server_; } 202 203 private: 204 void StartConnectivityFailureWatch( 205 RefCountedPtr<ConnectivityFailureWatcher> watcher) override; 206 void StopConnectivityFailureWatch( 207 const RefCountedPtr<ConnectivityFailureWatcher>& watcher) override; 208 209 OrphanablePtr<StreamingCall> CreateStreamingCall( 210 const char* method, 211 std::unique_ptr<StreamingCall::EventHandler> event_handler) override; 212 ResetBackoff()213 void ResetBackoff() override {} 214 215 WeakRefCountedPtr<FakeXdsTransportFactory> factory_; 216 const XdsBootstrap::XdsServer& server_; 217 const bool auto_complete_messages_from_client_; 218 const bool abort_on_undrained_messages_; 219 std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> 220 event_engine_; 221 222 Mutex mu_; 223 std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers_ 224 ABSL_GUARDED_BY(&mu_); 225 std::map<std::string /*method*/, RefCountedPtr<FakeStreamingCall>> 226 active_calls_ ABSL_GUARDED_BY(&mu_); 227 }; 228 229 // Returns an existing transport or creates a new one. 230 RefCountedPtr<XdsTransport> GetTransport( 231 const XdsBootstrap::XdsServer& server, absl::Status* /*status*/) override; 232 233 // Returns an existing transport, if any, or nullptr. 234 RefCountedPtr<FakeXdsTransport> GetTransport( 235 const XdsBootstrap::XdsServer& server); 236 237 RefCountedPtr<FakeXdsTransport> GetTransportLocked(const std::string& key) 238 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); 239 240 std::shared_ptr<grpc_event_engine::experimental::FuzzingEventEngine> 241 event_engine_; 242 243 Mutex mu_; 244 std::map<std::string /*XdsServer key*/, FakeXdsTransport*> transport_map_ 245 ABSL_GUARDED_BY(&mu_); 246 bool auto_complete_messages_from_client_ ABSL_GUARDED_BY(&mu_) = true; 247 bool abort_on_undrained_messages_ ABSL_GUARDED_BY(&mu_) = true; 248 std::function<void()> too_many_pending_reads_callback_; 249 }; 250 251 } // namespace grpc_core 252 253 #endif // GRPC_TEST_CORE_XDS_XDS_TRANSPORT_FAKE_H 254