1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/core/xds/xds_transport_fake.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <thread>
26 #include <type_traits>
27 #include <utility>
28
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/util/orphanable.h"
33 #include "src/core/util/ref_counted_ptr.h"
34 #include "src/core/xds/xds_client/xds_bootstrap.h"
35 #include "test/core/test_util/test_config.h"
36
37 namespace grpc_core {
38
39 //
40 // FakeXdsTransportFactory::FakeStreamingCall
41 //
42
~FakeStreamingCall()43 FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
44 // Tests should not fail to read any messages from the client.
45 {
46 MutexLock lock(&mu_);
47 if (transport_->abort_on_undrained_messages()) {
48 for (const auto& message : from_client_messages_) {
49 LOG(ERROR) << "[" << transport_->server()->server_uri() << "] " << this
50 << " From client message left in queue: " << message;
51 }
52 CHECK(from_client_messages_.empty());
53 }
54 }
55 // Can't call event_handler_->OnStatusReceived() or unref event_handler_
56 // synchronously, since those operations will trigger code in
57 // XdsClient that acquires its mutex, but it was already holding its
58 // mutex when it called us, so it would deadlock.
59 event_engine_->Run([event_handler = std::move(event_handler_),
60 status_sent = status_sent_]() mutable {
61 ExecCtx exec_ctx;
62 if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
63 event_handler.reset();
64 });
65 }
66
Orphan()67 void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
68 {
69 MutexLock lock(&mu_);
70 orphaned_ = true;
71 }
72 transport_->RemoveStream(method_, this);
73 Unref();
74 }
75
SendMessage(std::string payload)76 void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
77 std::string payload) {
78 MutexLock lock(&mu_);
79 CHECK(!orphaned_);
80 from_client_messages_.push_back(std::move(payload));
81 if (transport_->auto_complete_messages_from_client()) {
82 CompleteSendMessageFromClientLocked(/*ok=*/true);
83 }
84 }
85
HaveMessageFromClient()86 bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
87 MutexLock lock(&mu_);
88 return !from_client_messages_.empty();
89 }
90
91 absl::optional<std::string>
WaitForMessageFromClient()92 FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient() {
93 while (true) {
94 {
95 MutexLock lock(&mu_);
96 if (!from_client_messages_.empty()) {
97 std::string payload = std::move(from_client_messages_.front());
98 from_client_messages_.pop_front();
99 return payload;
100 }
101 if (event_engine_->IsIdle()) return absl::nullopt;
102 }
103 event_engine_->Tick();
104 }
105 }
106
107 void FakeXdsTransportFactory::FakeStreamingCall::
CompleteSendMessageFromClientLocked(bool ok)108 CompleteSendMessageFromClientLocked(bool ok) {
109 // Can't call event_handler_->OnRequestSent() synchronously, since that
110 // operation will trigger code in XdsClient that acquires its mutex, but it
111 // was already holding its mutex when it called us, so it would deadlock.
112 event_engine_->Run([event_handler = event_handler_->Ref(), ok]() mutable {
113 ExecCtx exec_ctx;
114 event_handler->OnRequestSent(ok);
115 event_handler.reset();
116 });
117 }
118
CompleteSendMessageFromClient(bool ok)119 void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
120 bool ok) {
121 CHECK(!transport_->auto_complete_messages_from_client());
122 MutexLock lock(&mu_);
123 CompleteSendMessageFromClientLocked(ok);
124 }
125
StartRecvMessage()126 void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
127 MutexLock lock(&mu_);
128 if (num_pending_reads_ > 0) {
129 transport_->factory()->too_many_pending_reads_callback_();
130 }
131 ++reads_started_;
132 ++num_pending_reads_;
133 if (!to_client_messages_.empty()) {
134 // Dispatch pending message (if there's one) on a separate thread to avoid
135 // recursion
136 event_engine_->Run([call = RefAsSubclass<FakeStreamingCall>()]() {
137 call->MaybeDeliverMessageToClient();
138 });
139 }
140 }
141
SendMessageToClient(absl::string_view payload)142 void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
143 absl::string_view payload) {
144 {
145 MutexLock lock(&mu_);
146 to_client_messages_.emplace_back(payload);
147 }
148 MaybeDeliverMessageToClient();
149 }
150
MaybeDeliverMessageToClient()151 void FakeXdsTransportFactory::FakeStreamingCall::MaybeDeliverMessageToClient() {
152 RefCountedPtr<RefCountedEventHandler> event_handler;
153 std::string message;
154 // Loop terminates with a break inside
155 while (true) {
156 {
157 MutexLock lock(&mu_);
158 if (num_pending_reads_ == 0 || to_client_messages_.empty()) {
159 break;
160 }
161 --num_pending_reads_;
162 message = std::move(to_client_messages_.front());
163 to_client_messages_.pop_front();
164 event_handler = event_handler_;
165 }
166 ExecCtx exec_ctx;
167 event_handler->OnRecvMessage(message);
168 }
169 }
170
MaybeSendStatusToClient(absl::Status status)171 void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
172 absl::Status status) {
173 ExecCtx exec_ctx;
174 RefCountedPtr<RefCountedEventHandler> event_handler;
175 {
176 MutexLock lock(&mu_);
177 if (status_sent_) return;
178 status_sent_ = true;
179 event_handler = event_handler_->Ref();
180 }
181 event_handler->OnStatusReceived(std::move(status));
182 }
183
WaitForReadsStarted(size_t expected)184 bool FakeXdsTransportFactory::FakeStreamingCall::WaitForReadsStarted(
185 size_t expected) {
186 while (true) {
187 {
188 MutexLock lock(&mu_);
189 if (reads_started_ == expected) return true;
190 if (event_engine_->IsIdle()) return false;
191 }
192 event_engine_->Tick();
193 }
194 }
195
IsOrphaned()196 bool FakeXdsTransportFactory::FakeStreamingCall::IsOrphaned() {
197 MutexLock lock(&mu_);
198 return orphaned_;
199 }
200
201 //
202 // FakeXdsTransportFactory::FakeXdsTransport
203 //
204
TriggerConnectionFailure(absl::Status status)205 void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
206 absl::Status status) {
207 std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers;
208 {
209 MutexLock lock(&mu_);
210 watchers = watchers_;
211 }
212 ExecCtx exec_ctx;
213 for (const auto& watcher : watchers) {
214 watcher->OnConnectivityFailure(status);
215 }
216 }
217
Orphaned()218 void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
219 {
220 MutexLock lock(&factory_->mu_);
221 auto it = factory_->transport_map_.find(server_.Key());
222 if (it != factory_->transport_map_.end() && it->second == this) {
223 factory_->transport_map_.erase(it);
224 }
225 }
226 factory_.reset();
227 {
228 MutexLock lock(&mu_);
229 // Can't destroy watchers synchronously, since that operation will trigger
230 // code in XdsClient that acquires its mutex, but it was already holding
231 // its mutex when it called us, so it would deadlock.
232 event_engine_->Run([watchers = std::move(watchers_)]() mutable {
233 ExecCtx exec_ctx;
234 watchers.clear();
235 });
236 }
237 }
238
239 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const char * method)240 FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(const char* method) {
241 while (true) {
242 {
243 MutexLock lock(&mu_);
244 auto it = active_calls_.find(method);
245 if (it != active_calls_.end() && it->second != nullptr) return it->second;
246 if (event_engine_->IsIdle()) return nullptr;
247 }
248 event_engine_->Tick();
249 }
250 }
251
RemoveStream(const char * method,FakeStreamingCall * call)252 void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
253 const char* method, FakeStreamingCall* call) {
254 MutexLock lock(&mu_);
255 auto it = active_calls_.find(method);
256 if (it != active_calls_.end() && it->second.get() == call) {
257 active_calls_.erase(it);
258 }
259 }
260
StartConnectivityFailureWatch(RefCountedPtr<ConnectivityFailureWatcher> watcher)261 void FakeXdsTransportFactory::FakeXdsTransport::StartConnectivityFailureWatch(
262 RefCountedPtr<ConnectivityFailureWatcher> watcher) {
263 MutexLock lock(&mu_);
264 watchers_.insert(std::move(watcher));
265 }
266
StopConnectivityFailureWatch(const RefCountedPtr<ConnectivityFailureWatcher> & watcher)267 void FakeXdsTransportFactory::FakeXdsTransport::StopConnectivityFailureWatch(
268 const RefCountedPtr<ConnectivityFailureWatcher>& watcher) {
269 MutexLock lock(&mu_);
270 watchers_.erase(watcher);
271 }
272
273 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)274 FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
275 const char* method,
276 std::unique_ptr<StreamingCall::EventHandler> event_handler) {
277 auto call = MakeOrphanable<FakeStreamingCall>(
278 WeakRefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
279 MutexLock lock(&mu_);
280 active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
281 return call;
282 }
283
284 //
285 // FakeXdsTransportFactory
286 //
287
288 constexpr char FakeXdsTransportFactory::kAdsMethod[];
289 constexpr char FakeXdsTransportFactory::kLrsMethod[];
290
291 RefCountedPtr<XdsTransportFactory::XdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server,absl::Status *)292 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
293 absl::Status* /*status*/) {
294 std::string key = server.Key();
295 MutexLock lock(&mu_);
296 auto transport = GetTransportLocked(key);
297 if (transport == nullptr) {
298 transport = MakeRefCounted<FakeXdsTransport>(
299 WeakRefAsSubclass<FakeXdsTransportFactory>(), server,
300 auto_complete_messages_from_client_, abort_on_undrained_messages_);
301 transport_map_.emplace(std::move(key), transport.get());
302 }
303 return transport;
304 }
305
TriggerConnectionFailure(const XdsBootstrap::XdsServer & server,absl::Status status)306 void FakeXdsTransportFactory::TriggerConnectionFailure(
307 const XdsBootstrap::XdsServer& server, absl::Status status) {
308 auto transport = GetTransport(server);
309 if (transport == nullptr) return;
310 transport->TriggerConnectionFailure(std::move(status));
311 }
312
SetAutoCompleteMessagesFromClient(bool value)313 void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
314 MutexLock lock(&mu_);
315 auto_complete_messages_from_client_ = value;
316 }
317
SetAbortOnUndrainedMessages(bool value)318 void FakeXdsTransportFactory::SetAbortOnUndrainedMessages(bool value) {
319 MutexLock lock(&mu_);
320 abort_on_undrained_messages_ = value;
321 }
322
323 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const XdsBootstrap::XdsServer & server,const char * method)324 FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
325 const char* method) {
326 auto transport = GetTransport(server);
327 if (transport == nullptr) return nullptr;
328 return transport->WaitForStream(method);
329 }
330
Orphaned()331 void FakeXdsTransportFactory::Orphaned() { event_engine_.reset(); }
332
333 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server)334 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
335 std::string key = server.Key();
336 MutexLock lock(&mu_);
337 return GetTransportLocked(key);
338 }
339
340 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransportLocked(const std::string & key)341 FakeXdsTransportFactory::GetTransportLocked(const std::string& key) {
342 auto it = transport_map_.find(key);
343 if (it == transport_map_.end()) return nullptr;
344 return it->second->RefIfNonZero().TakeAsSubclass<FakeXdsTransport>();
345 }
346
347 } // namespace grpc_core
348