• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "test/core/xds/xds_transport_fake.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/support/port_platform.h>
21 
22 #include <functional>
23 #include <memory>
24 #include <string>
25 #include <thread>
26 #include <type_traits>
27 #include <utility>
28 
29 #include "absl/log/check.h"
30 #include "absl/log/log.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/util/orphanable.h"
33 #include "src/core/util/ref_counted_ptr.h"
34 #include "src/core/xds/xds_client/xds_bootstrap.h"
35 #include "test/core/test_util/test_config.h"
36 
37 namespace grpc_core {
38 
39 //
40 // FakeXdsTransportFactory::FakeStreamingCall
41 //
42 
~FakeStreamingCall()43 FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() {
44   // Tests should not fail to read any messages from the client.
45   {
46     MutexLock lock(&mu_);
47     if (transport_->abort_on_undrained_messages()) {
48       for (const auto& message : from_client_messages_) {
49         LOG(ERROR) << "[" << transport_->server()->server_uri() << "] " << this
50                    << " From client message left in queue: " << message;
51       }
52       CHECK(from_client_messages_.empty());
53     }
54   }
55   // Can't call event_handler_->OnStatusReceived() or unref event_handler_
56   // synchronously, since those operations will trigger code in
57   // XdsClient that acquires its mutex, but it was already holding its
58   // mutex when it called us, so it would deadlock.
59   event_engine_->Run([event_handler = std::move(event_handler_),
60                       status_sent = status_sent_]() mutable {
61     ExecCtx exec_ctx;
62     if (!status_sent) event_handler->OnStatusReceived(absl::OkStatus());
63     event_handler.reset();
64   });
65 }
66 
Orphan()67 void FakeXdsTransportFactory::FakeStreamingCall::Orphan() {
68   {
69     MutexLock lock(&mu_);
70     orphaned_ = true;
71   }
72   transport_->RemoveStream(method_, this);
73   Unref();
74 }
75 
SendMessage(std::string payload)76 void FakeXdsTransportFactory::FakeStreamingCall::SendMessage(
77     std::string payload) {
78   MutexLock lock(&mu_);
79   CHECK(!orphaned_);
80   from_client_messages_.push_back(std::move(payload));
81   if (transport_->auto_complete_messages_from_client()) {
82     CompleteSendMessageFromClientLocked(/*ok=*/true);
83   }
84 }
85 
HaveMessageFromClient()86 bool FakeXdsTransportFactory::FakeStreamingCall::HaveMessageFromClient() {
87   MutexLock lock(&mu_);
88   return !from_client_messages_.empty();
89 }
90 
91 absl::optional<std::string>
WaitForMessageFromClient()92 FakeXdsTransportFactory::FakeStreamingCall::WaitForMessageFromClient() {
93   while (true) {
94     {
95       MutexLock lock(&mu_);
96       if (!from_client_messages_.empty()) {
97         std::string payload = std::move(from_client_messages_.front());
98         from_client_messages_.pop_front();
99         return payload;
100       }
101       if (event_engine_->IsIdle()) return absl::nullopt;
102     }
103     event_engine_->Tick();
104   }
105 }
106 
107 void FakeXdsTransportFactory::FakeStreamingCall::
CompleteSendMessageFromClientLocked(bool ok)108     CompleteSendMessageFromClientLocked(bool ok) {
109   // Can't call event_handler_->OnRequestSent() synchronously, since that
110   // operation will trigger code in XdsClient that acquires its mutex, but it
111   // was already holding its mutex when it called us, so it would deadlock.
112   event_engine_->Run([event_handler = event_handler_->Ref(), ok]() mutable {
113     ExecCtx exec_ctx;
114     event_handler->OnRequestSent(ok);
115     event_handler.reset();
116   });
117 }
118 
CompleteSendMessageFromClient(bool ok)119 void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
120     bool ok) {
121   CHECK(!transport_->auto_complete_messages_from_client());
122   MutexLock lock(&mu_);
123   CompleteSendMessageFromClientLocked(ok);
124 }
125 
StartRecvMessage()126 void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
127   MutexLock lock(&mu_);
128   if (num_pending_reads_ > 0) {
129     transport_->factory()->too_many_pending_reads_callback_();
130   }
131   ++reads_started_;
132   ++num_pending_reads_;
133   if (!to_client_messages_.empty()) {
134     // Dispatch pending message (if there's one) on a separate thread to avoid
135     // recursion
136     event_engine_->Run([call = RefAsSubclass<FakeStreamingCall>()]() {
137       call->MaybeDeliverMessageToClient();
138     });
139   }
140 }
141 
SendMessageToClient(absl::string_view payload)142 void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
143     absl::string_view payload) {
144   {
145     MutexLock lock(&mu_);
146     to_client_messages_.emplace_back(payload);
147   }
148   MaybeDeliverMessageToClient();
149 }
150 
MaybeDeliverMessageToClient()151 void FakeXdsTransportFactory::FakeStreamingCall::MaybeDeliverMessageToClient() {
152   RefCountedPtr<RefCountedEventHandler> event_handler;
153   std::string message;
154   // Loop terminates with a break inside
155   while (true) {
156     {
157       MutexLock lock(&mu_);
158       if (num_pending_reads_ == 0 || to_client_messages_.empty()) {
159         break;
160       }
161       --num_pending_reads_;
162       message = std::move(to_client_messages_.front());
163       to_client_messages_.pop_front();
164       event_handler = event_handler_;
165     }
166     ExecCtx exec_ctx;
167     event_handler->OnRecvMessage(message);
168   }
169 }
170 
MaybeSendStatusToClient(absl::Status status)171 void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
172     absl::Status status) {
173   ExecCtx exec_ctx;
174   RefCountedPtr<RefCountedEventHandler> event_handler;
175   {
176     MutexLock lock(&mu_);
177     if (status_sent_) return;
178     status_sent_ = true;
179     event_handler = event_handler_->Ref();
180   }
181   event_handler->OnStatusReceived(std::move(status));
182 }
183 
WaitForReadsStarted(size_t expected)184 bool FakeXdsTransportFactory::FakeStreamingCall::WaitForReadsStarted(
185     size_t expected) {
186   while (true) {
187     {
188       MutexLock lock(&mu_);
189       if (reads_started_ == expected) return true;
190       if (event_engine_->IsIdle()) return false;
191     }
192     event_engine_->Tick();
193   }
194 }
195 
IsOrphaned()196 bool FakeXdsTransportFactory::FakeStreamingCall::IsOrphaned() {
197   MutexLock lock(&mu_);
198   return orphaned_;
199 }
200 
201 //
202 // FakeXdsTransportFactory::FakeXdsTransport
203 //
204 
TriggerConnectionFailure(absl::Status status)205 void FakeXdsTransportFactory::FakeXdsTransport::TriggerConnectionFailure(
206     absl::Status status) {
207   std::set<RefCountedPtr<ConnectivityFailureWatcher>> watchers;
208   {
209     MutexLock lock(&mu_);
210     watchers = watchers_;
211   }
212   ExecCtx exec_ctx;
213   for (const auto& watcher : watchers) {
214     watcher->OnConnectivityFailure(status);
215   }
216 }
217 
Orphaned()218 void FakeXdsTransportFactory::FakeXdsTransport::Orphaned() {
219   {
220     MutexLock lock(&factory_->mu_);
221     auto it = factory_->transport_map_.find(server_.Key());
222     if (it != factory_->transport_map_.end() && it->second == this) {
223       factory_->transport_map_.erase(it);
224     }
225   }
226   factory_.reset();
227   {
228     MutexLock lock(&mu_);
229     // Can't destroy watchers synchronously, since that operation will trigger
230     // code in XdsClient that acquires its mutex, but it was already holding
231     // its mutex when it called us, so it would deadlock.
232     event_engine_->Run([watchers = std::move(watchers_)]() mutable {
233       ExecCtx exec_ctx;
234       watchers.clear();
235     });
236   }
237 }
238 
239 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const char * method)240 FakeXdsTransportFactory::FakeXdsTransport::WaitForStream(const char* method) {
241   while (true) {
242     {
243       MutexLock lock(&mu_);
244       auto it = active_calls_.find(method);
245       if (it != active_calls_.end() && it->second != nullptr) return it->second;
246       if (event_engine_->IsIdle()) return nullptr;
247     }
248     event_engine_->Tick();
249   }
250 }
251 
RemoveStream(const char * method,FakeStreamingCall * call)252 void FakeXdsTransportFactory::FakeXdsTransport::RemoveStream(
253     const char* method, FakeStreamingCall* call) {
254   MutexLock lock(&mu_);
255   auto it = active_calls_.find(method);
256   if (it != active_calls_.end() && it->second.get() == call) {
257     active_calls_.erase(it);
258   }
259 }
260 
StartConnectivityFailureWatch(RefCountedPtr<ConnectivityFailureWatcher> watcher)261 void FakeXdsTransportFactory::FakeXdsTransport::StartConnectivityFailureWatch(
262     RefCountedPtr<ConnectivityFailureWatcher> watcher) {
263   MutexLock lock(&mu_);
264   watchers_.insert(std::move(watcher));
265 }
266 
StopConnectivityFailureWatch(const RefCountedPtr<ConnectivityFailureWatcher> & watcher)267 void FakeXdsTransportFactory::FakeXdsTransport::StopConnectivityFailureWatch(
268     const RefCountedPtr<ConnectivityFailureWatcher>& watcher) {
269   MutexLock lock(&mu_);
270   watchers_.erase(watcher);
271 }
272 
273 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)274 FakeXdsTransportFactory::FakeXdsTransport::CreateStreamingCall(
275     const char* method,
276     std::unique_ptr<StreamingCall::EventHandler> event_handler) {
277   auto call = MakeOrphanable<FakeStreamingCall>(
278       WeakRefAsSubclass<FakeXdsTransport>(), method, std::move(event_handler));
279   MutexLock lock(&mu_);
280   active_calls_[method] = call->Ref().TakeAsSubclass<FakeStreamingCall>();
281   return call;
282 }
283 
284 //
285 // FakeXdsTransportFactory
286 //
287 
288 constexpr char FakeXdsTransportFactory::kAdsMethod[];
289 constexpr char FakeXdsTransportFactory::kLrsMethod[];
290 
291 RefCountedPtr<XdsTransportFactory::XdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server,absl::Status *)292 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
293                                       absl::Status* /*status*/) {
294   std::string key = server.Key();
295   MutexLock lock(&mu_);
296   auto transport = GetTransportLocked(key);
297   if (transport == nullptr) {
298     transport = MakeRefCounted<FakeXdsTransport>(
299         WeakRefAsSubclass<FakeXdsTransportFactory>(), server,
300         auto_complete_messages_from_client_, abort_on_undrained_messages_);
301     transport_map_.emplace(std::move(key), transport.get());
302   }
303   return transport;
304 }
305 
TriggerConnectionFailure(const XdsBootstrap::XdsServer & server,absl::Status status)306 void FakeXdsTransportFactory::TriggerConnectionFailure(
307     const XdsBootstrap::XdsServer& server, absl::Status status) {
308   auto transport = GetTransport(server);
309   if (transport == nullptr) return;
310   transport->TriggerConnectionFailure(std::move(status));
311 }
312 
SetAutoCompleteMessagesFromClient(bool value)313 void FakeXdsTransportFactory::SetAutoCompleteMessagesFromClient(bool value) {
314   MutexLock lock(&mu_);
315   auto_complete_messages_from_client_ = value;
316 }
317 
SetAbortOnUndrainedMessages(bool value)318 void FakeXdsTransportFactory::SetAbortOnUndrainedMessages(bool value) {
319   MutexLock lock(&mu_);
320   abort_on_undrained_messages_ = value;
321 }
322 
323 RefCountedPtr<FakeXdsTransportFactory::FakeStreamingCall>
WaitForStream(const XdsBootstrap::XdsServer & server,const char * method)324 FakeXdsTransportFactory::WaitForStream(const XdsBootstrap::XdsServer& server,
325                                        const char* method) {
326   auto transport = GetTransport(server);
327   if (transport == nullptr) return nullptr;
328   return transport->WaitForStream(method);
329 }
330 
Orphaned()331 void FakeXdsTransportFactory::Orphaned() { event_engine_.reset(); }
332 
333 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server)334 FakeXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server) {
335   std::string key = server.Key();
336   MutexLock lock(&mu_);
337   return GetTransportLocked(key);
338 }
339 
340 RefCountedPtr<FakeXdsTransportFactory::FakeXdsTransport>
GetTransportLocked(const std::string & key)341 FakeXdsTransportFactory::GetTransportLocked(const std::string& key) {
342   auto it = transport_map_.find(key);
343   if (it == transport_map_.end()) return nullptr;
344   return it->second->RefIfNonZero().TakeAsSubclass<FakeXdsTransport>();
345 }
346 
347 }  // namespace grpc_core
348