1 //
2 // Copyright 2022 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/xds/grpc/xds_transport_grpc.h"
18
19 #include <grpc/byte_buffer.h>
20 #include <grpc/byte_buffer_reader.h>
21 #include <grpc/event_engine/event_engine.h>
22 #include <grpc/grpc.h>
23 #include <grpc/impl/channel_arg_names.h>
24 #include <grpc/impl/connectivity_state.h>
25 #include <grpc/impl/propagation_bits.h>
26 #include <grpc/slice.h>
27 #include <grpc/support/port_platform.h>
28 #include <string.h>
29
30 #include <functional>
31 #include <memory>
32 #include <string_view>
33 #include <utility>
34
35 #include "absl/log/check.h"
36 #include "absl/strings/str_cat.h"
37 #include "src/core/client_channel/client_channel_filter.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/lib/channel/channel_args.h"
40 #include "src/core/lib/channel/channel_fwd.h"
41 #include "src/core/lib/channel/channel_stack.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/event_engine/default_event_engine.h"
44 #include "src/core/lib/iomgr/closure.h"
45 #include "src/core/lib/iomgr/exec_ctx.h"
46 #include "src/core/lib/iomgr/pollset_set.h"
47 #include "src/core/lib/security/credentials/channel_creds_registry.h"
48 #include "src/core/lib/security/credentials/credentials.h"
49 #include "src/core/lib/slice/slice.h"
50 #include "src/core/lib/slice/slice_internal.h"
51 #include "src/core/lib/surface/call.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/init_internally.h"
54 #include "src/core/lib/surface/lame_client.h"
55 #include "src/core/lib/transport/connectivity_state.h"
56 #include "src/core/util/debug_location.h"
57 #include "src/core/util/orphanable.h"
58 #include "src/core/util/ref_counted_ptr.h"
59 #include "src/core/util/time.h"
60 #include "src/core/xds/grpc/xds_bootstrap_grpc.h"
61 #include "src/core/xds/xds_client/xds_bootstrap.h"
62
63 namespace grpc_core {
64
65 //
66 // GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
67 //
68
GrpcStreamingCall(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,Channel * channel,const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)69 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
70 WeakRefCountedPtr<GrpcXdsTransportFactory> factory, Channel* channel,
71 const char* method,
72 std::unique_ptr<StreamingCall::EventHandler> event_handler)
73 : factory_(std::move(factory)), event_handler_(std::move(event_handler)) {
74 // Create call.
75 call_ = channel->CreateCall(
76 /*parent_call=*/nullptr, GRPC_PROPAGATE_DEFAULTS, /*cq=*/nullptr,
77 factory_->interested_parties(), Slice::FromStaticString(method),
78 /*authority=*/absl::nullopt, Timestamp::InfFuture(),
79 /*registered_method=*/true);
80 CHECK_NE(call_, nullptr);
81 // Init data associated with the call.
82 grpc_metadata_array_init(&initial_metadata_recv_);
83 grpc_metadata_array_init(&trailing_metadata_recv_);
84 // Initialize closure to be used for sending messages.
85 GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
86 // Start ops on the call.
87 grpc_call_error call_error;
88 grpc_op ops[2];
89 memset(ops, 0, sizeof(ops));
90 // Send initial metadata.
91 grpc_op* op = ops;
92 op->op = GRPC_OP_SEND_INITIAL_METADATA;
93 op->data.send_initial_metadata.count = 0;
94 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
95 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
96 op->reserved = nullptr;
97 ++op;
98 op->op = GRPC_OP_RECV_INITIAL_METADATA;
99 op->data.recv_initial_metadata.recv_initial_metadata =
100 &initial_metadata_recv_;
101 op->flags = 0;
102 op->reserved = nullptr;
103 ++op;
104 // Ref will be released in the callback
105 GRPC_CLOSURE_INIT(
106 &on_recv_initial_metadata_, OnRecvInitialMetadata,
107 this->Ref(DEBUG_LOCATION, "OnRecvInitialMetadata").release(), nullptr);
108 call_error = grpc_call_start_batch_and_execute(
109 call_, ops, static_cast<size_t>(op - ops), &on_recv_initial_metadata_);
110 CHECK_EQ(call_error, GRPC_CALL_OK);
111 // Start a batch for recv_trailing_metadata.
112 memset(ops, 0, sizeof(ops));
113 op = ops;
114 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
115 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
116 op->data.recv_status_on_client.status = &status_code_;
117 op->data.recv_status_on_client.status_details = &status_details_;
118 op->flags = 0;
119 op->reserved = nullptr;
120 ++op;
121 // This callback signals the end of the call, so it relies on the initial
122 // ref instead of a new ref. When it's invoked, it's the initial ref that is
123 // unreffed.
124 GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
125 call_error = grpc_call_start_batch_and_execute(
126 call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
127 CHECK_EQ(call_error, GRPC_CALL_OK);
128 GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
129 }
130
131 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
~GrpcStreamingCall()132 ~GrpcStreamingCall() {
133 grpc_metadata_array_destroy(&trailing_metadata_recv_);
134 grpc_byte_buffer_destroy(send_message_payload_);
135 grpc_byte_buffer_destroy(recv_message_payload_);
136 CSliceUnref(status_details_);
137 CHECK_NE(call_, nullptr);
138 grpc_call_unref(call_);
139 }
140
Orphan()141 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::Orphan() {
142 CHECK_NE(call_, nullptr);
143 // If we are here because xds_client wants to cancel the call,
144 // OnStatusReceived() will complete the cancellation and clean up.
145 // Otherwise, we are here because xds_client has to orphan a failed call,
146 // in which case the following cancellation will be a no-op.
147 grpc_call_cancel_internal(call_);
148 // Note that the initial ref is held by OnStatusReceived(), so the
149 // corresponding unref happens there instead of here.
150 }
151
SendMessage(std::string payload)152 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
153 std::string payload) {
154 // Create payload.
155 grpc_slice slice = grpc_slice_from_cpp_string(std::move(payload));
156 send_message_payload_ = grpc_raw_byte_buffer_create(&slice, 1);
157 CSliceUnref(slice);
158 // Send the message.
159 grpc_op op;
160 memset(&op, 0, sizeof(op));
161 op.op = GRPC_OP_SEND_MESSAGE;
162 op.data.send_message.send_message = send_message_payload_;
163 Ref(DEBUG_LOCATION, "OnRequestSent").release();
164 grpc_call_error call_error =
165 grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
166 CHECK_EQ(call_error, GRPC_CALL_OK);
167 }
168
169 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
StartRecvMessage()170 StartRecvMessage() {
171 Ref(DEBUG_LOCATION, "StartRecvMessage").release();
172 grpc_op op;
173 memset(&op, 0, sizeof(op));
174 op.op = GRPC_OP_RECV_MESSAGE;
175 op.data.recv_message.recv_message = &recv_message_payload_;
176 CHECK_NE(call_, nullptr);
177 const grpc_call_error call_error =
178 grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
179 CHECK_EQ(call_error, GRPC_CALL_OK);
180 }
181
182 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRecvInitialMetadata(void * arg,grpc_error_handle)183 OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) {
184 RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
185 grpc_metadata_array_destroy(&self->initial_metadata_recv_);
186 }
187
188 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRequestSent(void * arg,grpc_error_handle error)189 OnRequestSent(void* arg, grpc_error_handle error) {
190 RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
191 // Clean up the sent message.
192 grpc_byte_buffer_destroy(self->send_message_payload_);
193 self->send_message_payload_ = nullptr;
194 // Invoke request handler.
195 self->event_handler_->OnRequestSent(error.ok());
196 }
197
198 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void * arg,grpc_error_handle)199 OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
200 RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
201 // If there was no payload, then we received status before we received
202 // another message, so we stop reading.
203 if (self->recv_message_payload_ != nullptr) {
204 // Process the response.
205 grpc_byte_buffer_reader bbr;
206 grpc_byte_buffer_reader_init(&bbr, self->recv_message_payload_);
207 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
208 grpc_byte_buffer_reader_destroy(&bbr);
209 grpc_byte_buffer_destroy(self->recv_message_payload_);
210 self->recv_message_payload_ = nullptr;
211 self->event_handler_->OnRecvMessage(StringViewFromSlice(response_slice));
212 CSliceUnref(response_slice);
213 }
214 }
215
216 void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnStatusReceived(void * arg,grpc_error_handle)217 OnStatusReceived(void* arg, grpc_error_handle /*error*/) {
218 RefCountedPtr<GrpcStreamingCall> self(static_cast<GrpcStreamingCall*>(arg));
219 self->event_handler_->OnStatusReceived(
220 absl::Status(static_cast<absl::StatusCode>(self->status_code_),
221 StringViewFromSlice(self->status_details_)));
222 }
223
224 //
225 // GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher
226 //
227
228 class GrpcXdsTransportFactory::GrpcXdsTransport::StateWatcher final
229 : public AsyncConnectivityStateWatcherInterface {
230 public:
StateWatcher(RefCountedPtr<ConnectivityFailureWatcher> watcher)231 explicit StateWatcher(RefCountedPtr<ConnectivityFailureWatcher> watcher)
232 : watcher_(std::move(watcher)) {}
233
234 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)235 void OnConnectivityStateChange(grpc_connectivity_state new_state,
236 const absl::Status& status) override {
237 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
238 watcher_->OnConnectivityFailure(absl::Status(
239 status.code(),
240 absl::StrCat("channel in TRANSIENT_FAILURE: ", status.message())));
241 }
242 }
243
244 RefCountedPtr<ConnectivityFailureWatcher> watcher_;
245 };
246
247 //
248 // GrpcXdsTransportFactory::GrpcXdsTransport
249 //
250
251 namespace {
252
CreateXdsChannel(const ChannelArgs & args,const GrpcXdsServer & server)253 RefCountedPtr<Channel> CreateXdsChannel(const ChannelArgs& args,
254 const GrpcXdsServer& server) {
255 RefCountedPtr<grpc_channel_credentials> channel_creds =
256 CoreConfiguration::Get().channel_creds_registry().CreateChannelCreds(
257 server.channel_creds_config());
258 return RefCountedPtr<Channel>(Channel::FromC(grpc_channel_create(
259 server.server_uri().c_str(), channel_creds.get(), args.ToC().get())));
260 }
261
262 } // namespace
263
GrpcXdsTransport(WeakRefCountedPtr<GrpcXdsTransportFactory> factory,const XdsBootstrap::XdsServer & server,absl::Status * status)264 GrpcXdsTransportFactory::GrpcXdsTransport::GrpcXdsTransport(
265 WeakRefCountedPtr<GrpcXdsTransportFactory> factory,
266 const XdsBootstrap::XdsServer& server, absl::Status* status)
267 : XdsTransport(GRPC_TRACE_FLAG_ENABLED(xds_client_refcount)
268 ? "GrpcXdsTransport"
269 : nullptr),
270 factory_(std::move(factory)),
271 key_(server.Key()) {
272 GRPC_TRACE_LOG(xds_client, INFO)
273 << "[GrpcXdsTransport " << this << "] created";
274 channel_ = CreateXdsChannel(factory_->args_,
275 static_cast<const GrpcXdsServer&>(server));
276 CHECK(channel_ != nullptr);
277 if (channel_->IsLame()) {
278 *status = absl::UnavailableError("xds client has a lame channel");
279 }
280 }
281
~GrpcXdsTransport()282 GrpcXdsTransportFactory::GrpcXdsTransport::~GrpcXdsTransport() {
283 GRPC_TRACE_LOG(xds_client, INFO)
284 << "[GrpcXdsTransport " << this << "] destroying";
285 }
286
Orphaned()287 void GrpcXdsTransportFactory::GrpcXdsTransport::Orphaned() {
288 GRPC_TRACE_LOG(xds_client, INFO)
289 << "[GrpcXdsTransport " << this << "] orphaned";
290 {
291 MutexLock lock(&factory_->mu_);
292 auto it = factory_->transports_.find(key_);
293 if (it != factory_->transports_.end() && it->second == this) {
294 factory_->transports_.erase(it);
295 }
296 }
297 // Do an async hop before unreffing. This avoids a deadlock upon
298 // shutdown in the case where the xDS channel is itself an xDS channel
299 // (e.g., when using one control plane to find another control plane).
300 grpc_event_engine::experimental::GetDefaultEventEngine()->Run(
301 [self = WeakRefAsSubclass<GrpcXdsTransport>()]() mutable {
302 ApplicationCallbackExecCtx application_exec_ctx;
303 ExecCtx exec_ctx;
304 self.reset();
305 });
306 }
307
StartConnectivityFailureWatch(RefCountedPtr<ConnectivityFailureWatcher> watcher)308 void GrpcXdsTransportFactory::GrpcXdsTransport::StartConnectivityFailureWatch(
309 RefCountedPtr<ConnectivityFailureWatcher> watcher) {
310 if (channel_->IsLame()) return;
311 auto* state_watcher = new StateWatcher(watcher);
312 {
313 MutexLock lock(&mu_);
314 watchers_.emplace(watcher, state_watcher);
315 }
316 channel_->AddConnectivityWatcher(
317 GRPC_CHANNEL_IDLE,
318 OrphanablePtr<AsyncConnectivityStateWatcherInterface>(state_watcher));
319 }
320
StopConnectivityFailureWatch(const RefCountedPtr<ConnectivityFailureWatcher> & watcher)321 void GrpcXdsTransportFactory::GrpcXdsTransport::StopConnectivityFailureWatch(
322 const RefCountedPtr<ConnectivityFailureWatcher>& watcher) {
323 if (channel_->IsLame()) return;
324 StateWatcher* state_watcher = nullptr;
325 {
326 MutexLock lock(&mu_);
327 auto it = watchers_.find(watcher);
328 if (it == watchers_.end()) return;
329 state_watcher = it->second;
330 watchers_.erase(it);
331 }
332 channel_->RemoveConnectivityWatcher(state_watcher);
333 }
334
335 OrphanablePtr<XdsTransportFactory::XdsTransport::StreamingCall>
CreateStreamingCall(const char * method,std::unique_ptr<StreamingCall::EventHandler> event_handler)336 GrpcXdsTransportFactory::GrpcXdsTransport::CreateStreamingCall(
337 const char* method,
338 std::unique_ptr<StreamingCall::EventHandler> event_handler) {
339 return MakeOrphanable<GrpcStreamingCall>(
340 factory_.WeakRef(DEBUG_LOCATION, "StreamingCall"), channel_.get(), method,
341 std::move(event_handler));
342 }
343
ResetBackoff()344 void GrpcXdsTransportFactory::GrpcXdsTransport::ResetBackoff() {
345 channel_->ResetConnectionBackoff();
346 }
347
348 //
349 // GrpcXdsTransportFactory
350 //
351
352 namespace {
353
ModifyChannelArgs(const ChannelArgs & args)354 ChannelArgs ModifyChannelArgs(const ChannelArgs& args) {
355 return args.Set(GRPC_ARG_KEEPALIVE_TIME_MS, Duration::Minutes(5).millis());
356 }
357
358 } // namespace
359
GrpcXdsTransportFactory(const ChannelArgs & args)360 GrpcXdsTransportFactory::GrpcXdsTransportFactory(const ChannelArgs& args)
361 : args_(ModifyChannelArgs(args)),
362 interested_parties_(grpc_pollset_set_create()) {
363 // Calling grpc_init to ensure gRPC does not shut down until the XdsClient is
364 // destroyed.
365 InitInternally();
366 }
367
~GrpcXdsTransportFactory()368 GrpcXdsTransportFactory::~GrpcXdsTransportFactory() {
369 grpc_pollset_set_destroy(interested_parties_);
370 // Calling grpc_shutdown to ensure gRPC does not shut down until the XdsClient
371 // is destroyed.
372 ShutdownInternally();
373 }
374
375 RefCountedPtr<XdsTransportFactory::XdsTransport>
GetTransport(const XdsBootstrap::XdsServer & server,absl::Status * status)376 GrpcXdsTransportFactory::GetTransport(const XdsBootstrap::XdsServer& server,
377 absl::Status* status) {
378 std::string key = server.Key();
379 RefCountedPtr<GrpcXdsTransport> transport;
380 MutexLock lock(&mu_);
381 auto it = transports_.find(key);
382 if (it != transports_.end()) {
383 transport = it->second->RefIfNonZero().TakeAsSubclass<GrpcXdsTransport>();
384 }
385 if (transport == nullptr) {
386 transport = MakeRefCounted<GrpcXdsTransport>(
387 WeakRefAsSubclass<GrpcXdsTransportFactory>(), server, status);
388 transports_.emplace(std::move(key), transport.get());
389 }
390 return transport;
391 }
392
393 } // namespace grpc_core
394