• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/xds/grpc/xds_transport_grpc.h"
18 
19 #include <grpc/byte_buffer.h>
20 #include <grpc/byte_buffer_reader.h>
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/grpc.h>
23 #include <grpc/impl/channel_arg_names.h>
24 #include <grpc/impl/connectivity_state.h>
25 #include <grpc/impl/propagation_bits.h>
26 #include <grpc/slice.h>
27 #include <grpc/support/port_platform.h>
28 #include <string.h>
29 
30 #include <functional>
31 #include <memory>
32 #include <string_view>
33 #include <utility>
34 
35 #include "absl/log/check.h"
36 #include "absl/strings/str_cat.h"
37 #include "src/core/client_channel/client_channel_filter.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/channel/channel_fwd.h"
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/closure.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/pollset_set.h"
47 #include "src/core/lib/security/credentials/channel_creds_registry.h"
48 #include "src/core/lib/security/credentials/credentials.h"
49 #include "src/core/lib/slice/slice.h"
50 #include "src/core/lib/slice/slice_internal.h"
51 #include "src/core/lib/surface/call.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/init_internally.h"
54 #include "src/core/lib/surface/lame_client.h"
55 #include "src/core/lib/transport/connectivity_state.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/orphanable.h"
58 #include "src/core/util/ref_counted_ptr.h"
59 #include "src/core/util/time.h"
60 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
61 #include "src/core/xds/xds_client/xds_bootstrap.h"
62 
63 namespace grpc_core {
64 
65 //
66 // GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
67 //
68 
GrpcStreamingCall(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,Channel * channel,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)69 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
70     WeakRefCountedPtr<GrpcXdsTransportFactory> factory, Channel* channel,
71     const char* method,
72     std::unique_ptr<StreamingCall::EventHandler> event_handler)
73     : factory_(std::move(factory)), event_handler_(std::move(event_handler)) {
74   // Create call.
75   call_ = channel->CreateCall(
76       /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
77       factory_->interested_parties(), Slice::FromStaticString(method),
78       /*authority=*/absl::nullopt, Timestamp::InfFuture(),
79       /*registered_method=*/true);
80   CHECK_NE(call_, nullptr);
81   // Init data associated with the call.
82   grpc_metadata_array_init(&initial_metadata_recv_);
83   grpc_metadata_array_init(&trailing_metadata_recv_);
84   // Initialize closure to be used for sending messages.
85   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
86   // Start ops on the call.
87   grpc_call_error call_error;
88   grpc_op ops[2];
89   memset(ops, 0, sizeof(ops));
90   // Send initial metadata.
91   grpc_op* op = ops;
92   op->op = GRPC_OP_SEND_INITIAL_METADATA;
93   op->data.send_initial_metadata.count = 0;
94   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
95               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
96   op->reserved = nullptr;
97   ++op;
98   op->op = GRPC_OP_RECV_INITIAL_METADATA;
99   op->data.recv_initial_metadata.recv_initial_metadata =
100       &initial_metadata_recv_;
101   op->flags = 0;
102   op->reserved = nullptr;
103   ++op;
104   // Ref will be released in the callback
105   GRPC_CLOSURE_INIT(
106       &on_recv_initial_metadata_, OnRecvInitialMetadata,
107       this->Ref(DEBUG_LOCATION, "OnRecvInitialMetadata").release(), nullptr);
108   call_error = grpc_call_start_batch_and_execute(
109       call_, ops, static_cast<size_t>(op - ops), &on_recv_initial_metadata_);
110   CHECK_EQ(call_error, GRPC_CALL_OK);
111   // Start a batch for recv_trailing_metadata.
112   memset(ops, 0, sizeof(ops));
113   op = ops;
114   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
115   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
116   op->data.recv_status_on_client.status = &status_code_;
117   op->data.recv_status_on_client.status_details = &status_details_;
118   op->flags = 0;
119   op->reserved = nullptr;
120   ++op;
121   // This callback signals the end of the call, so it relies on the initial
122   // ref instead of a new ref. When it's invoked, it's the initial ref that is
123   // unreffed.
124   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
125   call_error = grpc_call_start_batch_and_execute(
126       call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
127   CHECK_EQ(call_error, GRPC_CALL_OK);
128   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
129 }
130 
131 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
~GrpcStreamingCall()132     ~GrpcStreamingCall() {
133   grpc_metadata_array_destroy(&trailing_metadata_recv_);
134   grpc_byte_buffer_destroy(send_message_payload_);
135   grpc_byte_buffer_destroy(recv_message_payload_);
136   CSliceUnref(status_details_);
137   CHECK_NE(call_, nullptr);
138   grpc_call_unref(call_);
139 }
140 
Orphan()141 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() {
142   CHECK_NE(call_, nullptr);
143   // If we are here because xds_client wants to cancel the call,
144   // OnStatusReceived() will complete the cancellation and clean up.
145   // Otherwise, we are here because xds_client has to orphan a failed call,
146   // in which case the following cancellation will be a no-op.
147   grpc_call_cancel_internal(call_);
148   // Note that the initial ref is held by OnStatusReceived(), so the
149   // corresponding unref happens there instead of here.
150 }
151 
SendMessage(std::string payload)152 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
153     std::string payload) {
154   // Create payload.
155   grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload));
156   send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1);
157   CSliceUnref(slice);
158   // Send the message.
159   grpc_op op;
160   memset(&op, 0, sizeof(op));
161   op.op = GRPC_OP_SEND_MESSAGE;
162   op.data.send_message.send_message = send_message_payload_;
163   Ref(DEBUG_LOCATION, "OnRequestSent").release();
164   grpc_call_error call_error =
165       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
166   CHECK_EQ(call_error, GRPC_CALL_OK);
167 }
168 
169 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
StartRecvMessage()170     StartRecvMessage() {
171   Ref(DEBUG_LOCATION, "StartRecvMessage").release();
172   grpc_op op;
173   memset(&op, 0, sizeof(op));
174   op.op = GRPC_OP_RECV_MESSAGE;
175   op.data.recv_message.recv_message = &recv_message_payload_;
176   CHECK_NE(call_, nullptr);
177   const grpc_call_error call_error =
178       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
179   CHECK_EQ(call_error, GRPC_CALL_OK);
180 }
181 
182 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRecvInitialMetadata(void * arg,grpc_error_handle)183     OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) {
184   RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
185   grpc_metadata_array_destroy(&self->initial_metadata_recv_);
186 }
187 
188 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRequestSent(void * arg,grpc_error_handle error)189     OnRequestSent(void* arg, grpc_error_handle error) {
190   RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
191   // Clean up the sent message.
192   grpc_byte_buffer_destroy(self->send_message_payload_);
193   self->send_message_payload_ = nullptr;
194   // Invoke request handler.
195   self->event_handler_->OnRequestSent(error.ok());
196 }
197 
198 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void * arg,grpc_error_handle)199     OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
200   RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
201   // If there was no payload, then we received status before we received
202   // another message, so we stop reading.
203   if (self->recv_message_payload_ != nullptr) {
204     // Process the response.
205     grpc_byte_buffer_reader bbr;
206     grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_);
207     grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
208     grpc_byte_buffer_reader_destroy(&bbr);
209     grpc_byte_buffer_destroy(self->recv_message_payload_);
210     self->recv_message_payload_ = nullptr;
211     self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice));
212     CSliceUnref(response_slice);
213   }
214 }
215 
216 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnStatusReceived(void * arg,grpc_error_handle)217     OnStatusReceived(void* arg, grpc_error_handle /*error*/) {
218   RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
219   self->event_handler_->OnStatusReceived(
220       absl::Status(static_cast<absl::StatusCode>(self->status_code_),
221                    StringViewFromSlice(self->status_details_)));
222 }
223 
224 //
225 // GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
226 //
227 
228 class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher final
229     : public AsyncConnectivityStateWatcherInterface {
230  public:
StateWatcher(RefCountedPtr<ConnectivityFailureWatcher> watcher)231   explicit StateWatcher(RefCountedPtr<ConnectivityFailureWatcher> watcher)
232       : watcher_(std::move(watcher)) {}
233 
234  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)235   void OnConnectivityStateChange(grpc_connectivity_state new_state,
236                                  const absl::Status& status) override {
237     if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
238       watcher_->OnConnectivityFailure(absl::Status(
239           status.code(),
240           absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
241     }
242   }
243 
244   RefCountedPtr<ConnectivityFailureWatcher> watcher_;
245 };
246 
247 //
248 // GrpcXdsTransportFactory::GrpcXdsTransport
249 //
250 
251 namespace {
252 
CreateXdsChannel(const ChannelArgs & args,const GrpcXdsServer & server)253 RefCountedPtr<Channel> CreateXdsChannel(const ChannelArgs& args,
254                                         const GrpcXdsServer& server) {
255   RefCountedPtr<grpc_channel_credentials> channel_creds =
256       CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
257           server.channel_creds_config());
258   return RefCountedPtr<Channel>(Channel::FromC(grpc_channel_create(
259       server.server_uri().c_str(), channel_creds.get(), args.ToC().get())));
260 }
261 
262 }  // namespace
263 
GrpcXdsTransport(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,absl::Status * status)264 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
265     WeakRefCountedPtr<GrpcXdsTransportFactory> factory,
266     const XdsBootstrap::XdsServer& server, absl::Status* status)
267     : XdsTransport(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
268                        ? "GrpcXdsTransport"
269                        : nullptr),
270       factory_(std::move(factory)),
271       key_(server.Key()) {
272   GRPC_TRACE_LOG(xds_client, INFO)
273       << "[GrpcXdsTransport " << this << "] created";
274   channel_ = CreateXdsChannel(factory_->args_,
275                               static_cast<const GrpcXdsServer&>(server));
276   CHECK(channel_ != nullptr);
277   if (channel_->IsLame()) {
278     *status = absl::UnavailableError("xds client has a lame channel");
279   }
280 }
281 
~GrpcXdsTransport()282 GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
283   GRPC_TRACE_LOG(xds_client, INFO)
284       << "[GrpcXdsTransport " << this << "] destroying";
285 }
286 
Orphaned()287 void GrpcXdsTransportFactory::GrpcXdsTransport::Orphaned() {
288   GRPC_TRACE_LOG(xds_client, INFO)
289       << "[GrpcXdsTransport " << this << "] orphaned";
290   {
291     MutexLock lock(&factory_->mu_);
292     auto it = factory_->transports_.find(key_);
293     if (it != factory_->transports_.end() && it->second == this) {
294       factory_->transports_.erase(it);
295     }
296   }
297   // Do an async hop before unreffing.  This avoids a deadlock upon
298   // shutdown in the case where the xDS channel is itself an xDS channel
299   // (e.g., when using one control plane to find another control plane).
300   grpc_event_engine::experimental::GetDefaultEventEngine()->Run(
301       [self = WeakRefAsSubclass<GrpcXdsTransport>()]() mutable {
302         ApplicationCallbackExecCtx application_exec_ctx;
303         ExecCtx exec_ctx;
304         self.reset();
305       });
306 }
307 
StartConnectivityFailureWatch(RefCountedPtr<ConnectivityFailureWatcher> watcher)308 void GrpcXdsTransportFactory::GrpcXdsTransport::StartConnectivityFailureWatch(
309     RefCountedPtr<ConnectivityFailureWatcher> watcher) {
310   if (channel_->IsLame()) return;
311   auto* state_watcher = new StateWatcher(watcher);
312   {
313     MutexLock lock(&mu_);
314     watchers_.emplace(watcher, state_watcher);
315   }
316   channel_->AddConnectivityWatcher(
317       GRPC_CHANNEL_IDLE,
318       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(state_watcher));
319 }
320 
StopConnectivityFailureWatch(const RefCountedPtr<ConnectivityFailureWatcher> & watcher)321 void GrpcXdsTransportFactory::GrpcXdsTransport::StopConnectivityFailureWatch(
322     const RefCountedPtr<ConnectivityFailureWatcher>& watcher) {
323   if (channel_->IsLame()) return;
324   StateWatcher* state_watcher = nullptr;
325   {
326     MutexLock lock(&mu_);
327     auto it = watchers_.find(watcher);
328     if (it == watchers_.end()) return;
329     state_watcher = it->second;
330     watchers_.erase(it);
331   }
332   channel_->RemoveConnectivityWatcher(state_watcher);
333 }
334 
335 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)336 GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall(
337     const char* method,
338     std::unique_ptr<StreamingCall::EventHandler> event_handler) {
339   return MakeOrphanable<GrpcStreamingCall>(
340       factory_.WeakRef(DEBUG_LOCATION, "StreamingCall"), channel_.get(), method,
341       std::move(event_handler));
342 }
343 
ResetBackoff()344 void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() {
345   channel_->ResetConnectionBackoff();
346 }
347 
348 //
349 // GrpcXdsTransportFactory
350 //
351 
352 namespace {
353 
ModifyChannelArgs(const ChannelArgs & args)354 ChannelArgs ModifyChannelArgs(const ChannelArgs& args) {
355   return args.Set(GRPC_ARG_KEEPALIVE_TIME_MS, Duration::Minutes(5).millis());
356 }
357 
358 }  // namespace
359 
GrpcXdsTransportFactory(const ChannelArgs & args)360 GrpcXdsTransportFactory::GrpcXdsTransportFactory(const ChannelArgs& args)
361     : args_(ModifyChannelArgs(args)),
362       interested_parties_(grpc_pollset_set_create()) {
363   // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
364   // destroyed.
365   InitInternally();
366 }
367 
~GrpcXdsTransportFactory()368 GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
369   grpc_pollset_set_destroy(interested_parties_);
370   // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
371   // is destroyed.
372   ShutdownInternally();
373 }
374 
375 RefCountedPtr<XdsTransportFactory::XdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server,absl::Status * status)376 GrpcXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
377                                       absl::Status* status) {
378   std::string key = server.Key();
379   RefCountedPtr<GrpcXdsTransport> transport;
380   MutexLock lock(&mu_);
381   auto it = transports_.find(key);
382   if (it != transports_.end()) {
383     transport = it->second->RefIfNonZero().TakeAsSubclass<GrpcXdsTransport>();
384   }
385   if (transport == nullptr) {
386     transport = MakeRefCounted<GrpcXdsTransport>(
387         WeakRefAsSubclass<GrpcXdsTransportFactory>(), server, status);
388     transports_.emplace(std::move(key), transport.get());
389   }
390   return transport;
391 }
392 
393 }  // namespace grpc_core
394