1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/client_channel/subchannel.h"
18
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/slice.h>
21 #include <grpc/status.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <limits.h>
25
26 #include <algorithm>
27 #include <memory>
28 #include <new>
29 #include <utility>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/cord.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/channelz/channel_trace.h"
39 #include "src/core/channelz/channelz.h"
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/client_channel/subchannel_pool_interface.h"
42 #include "src/core/config/core_configuration.h"
43 #include "src/core/handshaker/proxy_mapper_registry.h"
44 #include "src/core/lib/address_utils/sockaddr_utils.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/channel/channel_stack_builder_impl.h"
48 #include "src/core/lib/debug/trace.h"
49 #include "src/core/lib/experiments/experiments.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/iomgr/pollset_set.h"
52 #include "src/core/lib/promise/cancel_callback.h"
53 #include "src/core/lib/promise/seq.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/channel_init.h"
56 #include "src/core/lib/surface/channel_stack_type.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/transport/connectivity_state.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/interception_chain.h"
61 #include "src/core/lib/transport/transport.h"
62 #include "src/core/telemetry/stats.h"
63 #include "src/core/telemetry/stats_data.h"
64 #include "src/core/util/alloc.h"
65 #include "src/core/util/backoff.h"
66 #include "src/core/util/debug_location.h"
67 #include "src/core/util/orphanable.h"
68 #include "src/core/util/ref_counted.h"
69 #include "src/core/util/ref_counted_ptr.h"
70 #include "src/core/util/status_helper.h"
71 #include "src/core/util/sync.h"
72 #include "src/core/util/useful.h"
73
74 // Backoff parameters.
75 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
76 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
77 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
78 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
79 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
80
81 // Conversion between subchannel call and call stack.
82 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
83 (grpc_call_stack*)((char*)(call) + \
84 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
85 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
86 (SubchannelCall*)(((char*)(call_stack)) - \
87 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
88
89 namespace grpc_core {
90
91 using ::grpc_event_engine::experimental::EventEngine;
92
93 //
94 // ConnectedSubchannel
95 //
96
ConnectedSubchannel(const ChannelArgs & args)97 ConnectedSubchannel::ConnectedSubchannel(const ChannelArgs& args)
98 : RefCounted<ConnectedSubchannel>(
99 GRPC_TRACE_FLAG_ENABLED(subchannel_refcount) ? "ConnectedSubchannel"
100 : nullptr),
101 args_(args) {}
102
103 //
104 // LegacyConnectedSubchannel
105 //
106
107 class LegacyConnectedSubchannel : public ConnectedSubchannel {
108 public:
LegacyConnectedSubchannel(RefCountedPtr<grpc_channel_stack> channel_stack,const ChannelArgs & args,RefCountedPtr<channelz::SubchannelNode> channelz_node)109 LegacyConnectedSubchannel(
110 RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
111 RefCountedPtr<channelz::SubchannelNode> channelz_node)
112 : ConnectedSubchannel(args),
113 channelz_node_(std::move(channelz_node)),
114 channel_stack_(std::move(channel_stack)) {}
115
~LegacyConnectedSubchannel()116 ~LegacyConnectedSubchannel() override {
117 channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
118 }
119
channelz_node() const120 channelz::SubchannelNode* channelz_node() const {
121 return channelz_node_.get();
122 }
123
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)124 void StartWatch(
125 grpc_pollset_set* interested_parties,
126 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
127 grpc_transport_op* op = grpc_make_transport_op(nullptr);
128 op->start_connectivity_watch = std::move(watcher);
129 op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
130 op->bind_pollset_set = interested_parties;
131 grpc_channel_element* elem =
132 grpc_channel_stack_element(channel_stack_.get(), 0);
133 elem->filter->start_transport_op(elem, op);
134 }
135
Ping(absl::AnyInvocable<void (absl::Status)>)136 void Ping(absl::AnyInvocable<void(absl::Status)>) override {
137 Crash("call v3 ping method called in legacy impl");
138 }
139
unstarted_call_destination() const140 RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
141 const override {
142 Crash("call v3 unstarted_call_destination method called in legacy impl");
143 }
144
channel_stack() const145 grpc_channel_stack* channel_stack() const override {
146 return channel_stack_.get();
147 }
148
GetInitialCallSizeEstimate() const149 size_t GetInitialCallSizeEstimate() const override {
150 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
151 channel_stack_->call_stack_size;
152 }
153
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)154 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
155 grpc_transport_op* op = grpc_make_transport_op(nullptr);
156 op->send_ping.on_initiate = on_initiate;
157 op->send_ping.on_ack = on_ack;
158 grpc_channel_element* elem =
159 grpc_channel_stack_element(channel_stack_.get(), 0);
160 elem->filter->start_transport_op(elem, op);
161 }
162
163 private:
164 RefCountedPtr<channelz::SubchannelNode> channelz_node_;
165 RefCountedPtr<grpc_channel_stack> channel_stack_;
166 };
167
168 //
169 // NewConnectedSubchannel
170 //
171
172 class NewConnectedSubchannel : public ConnectedSubchannel {
173 public:
174 class TransportCallDestination final : public CallDestination {
175 public:
TransportCallDestination(OrphanablePtr<ClientTransport> transport)176 explicit TransportCallDestination(OrphanablePtr<ClientTransport> transport)
177 : transport_(std::move(transport)) {}
178
transport()179 ClientTransport* transport() { return transport_.get(); }
180
HandleCall(CallHandler handler)181 void HandleCall(CallHandler handler) override {
182 transport_->StartCall(std::move(handler));
183 }
184
Orphaned()185 void Orphaned() override { transport_.reset(); }
186
187 private:
188 OrphanablePtr<ClientTransport> transport_;
189 };
190
NewConnectedSubchannel(RefCountedPtr<UnstartedCallDestination> call_destination,RefCountedPtr<TransportCallDestination> transport,const ChannelArgs & args)191 NewConnectedSubchannel(
192 RefCountedPtr<UnstartedCallDestination> call_destination,
193 RefCountedPtr<TransportCallDestination> transport,
194 const ChannelArgs& args)
195 : ConnectedSubchannel(args),
196 call_destination_(std::move(call_destination)),
197 transport_(std::move(transport)) {}
198
StartWatch(grpc_pollset_set *,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)199 void StartWatch(
200 grpc_pollset_set*,
201 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
202 transport_->transport()->StartConnectivityWatch(std::move(watcher));
203 }
204
Ping(absl::AnyInvocable<void (absl::Status)>)205 void Ping(absl::AnyInvocable<void(absl::Status)>) override {
206 // TODO(ctiller): add new transport API for this in v3 stack
207 Crash("not implemented");
208 }
209
unstarted_call_destination() const210 RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
211 const override {
212 return call_destination_;
213 }
214
channel_stack() const215 grpc_channel_stack* channel_stack() const override { return nullptr; }
216
GetInitialCallSizeEstimate() const217 size_t GetInitialCallSizeEstimate() const override { return 0; }
218
Ping(grpc_closure *,grpc_closure *)219 void Ping(grpc_closure*, grpc_closure*) override {
220 Crash("legacy ping method called in call v3 impl");
221 }
222
223 private:
224 RefCountedPtr<UnstartedCallDestination> call_destination_;
225 RefCountedPtr<TransportCallDestination> transport_;
226 };
227
228 //
229 // SubchannelCall
230 //
231
Create(Args args,grpc_error_handle * error)232 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
233 grpc_error_handle* error) {
234 const size_t allocation_size =
235 args.connected_subchannel->GetInitialCallSizeEstimate();
236 Arena* arena = args.arena;
237 return RefCountedPtr<SubchannelCall>(new (
238 arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
239 }
240
SubchannelCall(Args args,grpc_error_handle * error)241 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
242 : connected_subchannel_(args.connected_subchannel
243 .TakeAsSubclass<LegacyConnectedSubchannel>()),
244 deadline_(args.deadline) {
245 grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
246 const grpc_call_element_args call_args = {
247 callstk, // call_stack
248 nullptr, // server_transport_data
249 args.path.c_slice(), // path
250 args.start_time, // start_time
251 args.deadline, // deadline
252 args.arena, // arena
253 args.call_combiner // call_combiner
254 };
255 *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
256 SubchannelCall::Destroy, this, &call_args);
257 if (GPR_UNLIKELY(!error->ok())) {
258 LOG(ERROR) << "error: " << StatusToString(*error);
259 return;
260 }
261 grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
262 auto* channelz_node = connected_subchannel_->channelz_node();
263 if (channelz_node != nullptr) {
264 channelz_node->RecordCallStarted();
265 }
266 }
267
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)268 void SubchannelCall::StartTransportStreamOpBatch(
269 grpc_transport_stream_op_batch* batch) {
270 MaybeInterceptRecvTrailingMetadata(batch);
271 grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
272 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
273 GRPC_TRACE_LOG(channel, INFO)
274 << "OP[" << top_elem->filter->name << ":" << top_elem
275 << "]: " << grpc_transport_stream_op_batch_string(batch, false);
276 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
277 }
278
GetCallStack()279 grpc_call_stack* SubchannelCall::GetCallStack() {
280 return SUBCHANNEL_CALL_TO_CALL_STACK(this);
281 }
282
SetAfterCallStackDestroy(grpc_closure * closure)283 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
284 CHECK_EQ(after_call_stack_destroy_, nullptr);
285 CHECK_NE(closure, nullptr);
286 after_call_stack_destroy_ = closure;
287 }
288
Ref()289 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
290 IncrementRefCount();
291 return RefCountedPtr<SubchannelCall>(this);
292 }
293
Ref(const DebugLocation & location,const char * reason)294 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location,
295 const char* reason) {
296 IncrementRefCount(location, reason);
297 return RefCountedPtr<SubchannelCall>(this);
298 }
299
Unref()300 void SubchannelCall::Unref() {
301 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
302 }
303
Unref(const DebugLocation &,const char * reason)304 void SubchannelCall::Unref(const DebugLocation& /*location*/,
305 const char* reason) {
306 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
307 }
308
Destroy(void * arg,grpc_error_handle)309 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
310 SubchannelCall* self = static_cast<SubchannelCall*>(arg);
311 // Keep some members before destroying the subchannel call.
312 grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
313 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
314 std::move(self->connected_subchannel_);
315 // Destroy the subchannel call.
316 self->~SubchannelCall();
317 // Destroy the call stack. This should be after destroying the subchannel
318 // call, because call->after_call_stack_destroy(), if not null, will free
319 // the call arena.
320 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
321 after_call_stack_destroy);
322 // Automatically reset connected_subchannel. This should be after destroying
323 // the call stack, because destroying call stack needs access to the channel
324 // stack.
325 }
326
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)327 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
328 grpc_transport_stream_op_batch* batch) {
329 // only intercept payloads with recv trailing.
330 if (!batch->recv_trailing_metadata) return;
331 // only add interceptor is channelz is enabled.
332 if (connected_subchannel_->channelz_node() == nullptr) return;
333 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
334 this, grpc_schedule_on_exec_ctx);
335 // save some state needed for the interception callback.
336 CHECK_EQ(recv_trailing_metadata_, nullptr);
337 recv_trailing_metadata_ =
338 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
339 original_recv_trailing_metadata_ =
340 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
341 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
342 &recv_trailing_metadata_ready_;
343 }
344
345 namespace {
346
347 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)348 void GetCallStatus(grpc_status_code* status, Timestamp deadline,
349 grpc_metadata_batch* md_batch, grpc_error_handle error) {
350 if (!error.ok()) {
351 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
352 } else {
353 *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
354 }
355 }
356
357 } // namespace
358
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)359 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
360 grpc_error_handle error) {
361 SubchannelCall* call = static_cast<SubchannelCall*>(arg);
362 CHECK_NE(call->recv_trailing_metadata_, nullptr);
363 grpc_status_code status = GRPC_STATUS_OK;
364 GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
365 channelz::SubchannelNode* channelz_node =
366 call->connected_subchannel_->channelz_node();
367 CHECK_NE(channelz_node, nullptr);
368 if (status == GRPC_STATUS_OK) {
369 channelz_node->RecordCallSucceeded();
370 } else {
371 channelz_node->RecordCallFailed();
372 }
373 Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
374 }
375
IncrementRefCount()376 void SubchannelCall::IncrementRefCount() {
377 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
378 }
379
IncrementRefCount(const DebugLocation &,const char * reason)380 void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/,
381 const char* reason) {
382 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
383 }
384
385 //
386 // Subchannel::ConnectedSubchannelStateWatcher
387 //
388
389 class Subchannel::ConnectedSubchannelStateWatcher final
390 : public AsyncConnectivityStateWatcherInterface {
391 public:
392 // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)393 explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
394 : subchannel_(std::move(c)) {}
395
~ConnectedSubchannelStateWatcher()396 ~ConnectedSubchannelStateWatcher() override {
397 subchannel_.reset(DEBUG_LOCATION, "state_watcher");
398 }
399
400 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)401 void OnConnectivityStateChange(grpc_connectivity_state new_state,
402 const absl::Status& status) override {
403 Subchannel* c = subchannel_.get();
404 {
405 MutexLock lock(&c->mu_);
406 // If we're either shutting down or have already seen this connection
407 // failure (i.e., c->connected_subchannel_ is null), do nothing.
408 //
409 // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
410 // upon connection close. So if the server gracefully shuts down,
411 // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
412 // will see only SHUTDOWN. Either way, we react to the first one we
413 // see, ignoring anything that happens after that.
414 if (c->connected_subchannel_ == nullptr) return;
415 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
416 new_state == GRPC_CHANNEL_SHUTDOWN) {
417 GRPC_TRACE_LOG(subchannel, INFO)
418 << "subchannel " << c << " " << c->key_.ToString()
419 << ": Connected subchannel " << c->connected_subchannel_.get()
420 << " reports " << ConnectivityStateName(new_state) << ": "
421 << status;
422 c->connected_subchannel_.reset();
423 if (c->channelz_node() != nullptr) {
424 c->channelz_node()->SetChildSocket(nullptr);
425 }
426 // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
427 // pass along the status from the transport, since it may have
428 // keepalive info attached to it that the channel needs.
429 // TODO(roth): Consider whether there's a cleaner way to do this.
430 c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
431 c->backoff_.Reset();
432 }
433 }
434 // Drain any connectivity state notifications after releasing the mutex.
435 c->work_serializer_.DrainQueue();
436 }
437
438 WeakRefCountedPtr<Subchannel> subchannel_;
439 };
440
441 //
442 // Subchannel::ConnectivityStateWatcherList
443 //
444
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)445 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
446 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
447 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
448 }
449
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)450 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
451 ConnectivityStateWatcherInterface* watcher) {
452 watchers_.erase(watcher);
453 }
454
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)455 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
456 grpc_connectivity_state state, const absl::Status& status) {
457 for (const auto& p : watchers_) {
458 subchannel_->work_serializer_.Schedule(
459 [watcher = p.second->Ref(), state, status]() mutable {
460 auto* watcher_ptr = watcher.get();
461 watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
462 status);
463 },
464 DEBUG_LOCATION);
465 }
466 }
467
468 //
469 // Subchannel
470 //
471
472 namespace {
473
ParseArgsForBackoffValues(const ChannelArgs & args,Duration * min_connect_timeout)474 BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args,
475 Duration* min_connect_timeout) {
476 const absl::optional<Duration> fixed_reconnect_backoff =
477 args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms");
478 if (fixed_reconnect_backoff.has_value()) {
479 const Duration backoff =
480 std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff);
481 *min_connect_timeout = backoff;
482 return BackOff::Options()
483 .set_initial_backoff(backoff)
484 .set_multiplier(1.0)
485 .set_jitter(0.0)
486 .set_max_backoff(backoff);
487 }
488 const Duration initial_backoff = std::max(
489 Duration::Milliseconds(100),
490 args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)
491 .value_or(Duration::Seconds(
492 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS)));
493 *min_connect_timeout =
494 std::max(Duration::Milliseconds(100),
495 args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)
496 .value_or(Duration::Seconds(
497 GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS)));
498 const Duration max_backoff =
499 std::max(Duration::Milliseconds(100),
500 args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)
501 .value_or(Duration::Seconds(
502 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS)));
503 return BackOff::Options()
504 .set_initial_backoff(initial_backoff)
505 .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
506 .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER)
507 .set_max_backoff(max_backoff);
508 }
509
510 } // namespace
511
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const ChannelArgs & args)512 Subchannel::Subchannel(SubchannelKey key,
513 OrphanablePtr<SubchannelConnector> connector,
514 const ChannelArgs& args)
515 : DualRefCounted<Subchannel>(GRPC_TRACE_FLAG_ENABLED(subchannel_refcount)
516 ? "Subchannel"
517 : nullptr),
518 key_(std::move(key)),
519 args_(args),
520 pollset_set_(grpc_pollset_set_create()),
521 connector_(std::move(connector)),
522 watcher_list_(this),
523 work_serializer_(args_.GetObjectRef<EventEngine>()),
524 backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
525 event_engine_(args_.GetObjectRef<EventEngine>()) {
526 // A grpc_init is added here to ensure that grpc_shutdown does not happen
527 // until the subchannel is destroyed. Subchannels can persist longer than
528 // channels because they maybe reused/shared among multiple channels. As a
529 // result the subchannel destruction happens asynchronously to channel
530 // destruction. If the last channel destruction triggers a grpc_shutdown
531 // before the last subchannel destruction, then there maybe race conditions
532 // triggering segmentation faults. To prevent this issue, we call a
533 // grpc_init here and a grpc_shutdown in the subchannel destructor.
534 InitInternally();
535 global_stats().IncrementClientSubchannelsCreated();
536 GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
537 grpc_schedule_on_exec_ctx);
538 // Check proxy mapper to determine address to connect to and channel
539 // args to use.
540 address_for_connect_ = CoreConfiguration::Get()
541 .proxy_mapper_registry()
542 .MapAddress(key_.address(), &args_)
543 .value_or(key_.address());
544 // Initialize channelz.
545 const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
546 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
547 if (channelz_enabled) {
548 const size_t channel_tracer_max_memory = Clamp(
549 args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
550 .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT),
551 0, INT_MAX);
552 channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
553 grpc_sockaddr_to_uri(&key_.address())
554 .value_or("<unknown address type>"),
555 channel_tracer_max_memory);
556 channelz_node_->AddTraceEvent(
557 channelz::ChannelTrace::Severity::Info,
558 grpc_slice_from_static_string("subchannel created"));
559 }
560 }
561
~Subchannel()562 Subchannel::~Subchannel() {
563 if (channelz_node_ != nullptr) {
564 channelz_node_->AddTraceEvent(
565 channelz::ChannelTrace::Severity::Info,
566 grpc_slice_from_static_string("Subchannel destroyed"));
567 channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
568 }
569 connector_.reset();
570 grpc_pollset_set_destroy(pollset_set_);
571 // grpc_shutdown is called here because grpc_init is called in the ctor.
572 ShutdownInternally();
573 }
574
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_resolved_address & address,const ChannelArgs & args)575 RefCountedPtr<Subchannel> Subchannel::Create(
576 OrphanablePtr<SubchannelConnector> connector,
577 const grpc_resolved_address& address, const ChannelArgs& args) {
578 SubchannelKey key(address, args);
579 auto* subchannel_pool = args.GetObject<SubchannelPoolInterface>();
580 CHECK_NE(subchannel_pool, nullptr);
581 RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
582 if (c != nullptr) {
583 return c;
584 }
585 c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
586 // Try to register the subchannel before setting the subchannel pool.
587 // Otherwise, in case of a registration race, unreffing c in
588 // RegisterSubchannel() will cause c to be tried to be unregistered, while
589 // its key maps to a different subchannel.
590 RefCountedPtr<Subchannel> registered =
591 subchannel_pool->RegisterSubchannel(c->key_, c);
592 if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
593 return registered;
594 }
595
ThrottleKeepaliveTime(int new_keepalive_time)596 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
597 MutexLock lock(&mu_);
598 // Only update the value if the new keepalive time is larger.
599 if (new_keepalive_time > keepalive_time_) {
600 keepalive_time_ = new_keepalive_time;
601 GRPC_TRACE_LOG(subchannel, INFO)
602 << "subchannel " << this << " " << key_.ToString()
603 << ": throttling keepalive time to " << new_keepalive_time;
604 args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time);
605 }
606 }
607
channelz_node()608 channelz::SubchannelNode* Subchannel::channelz_node() {
609 return channelz_node_.get();
610 }
611
WatchConnectivityState(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)612 void Subchannel::WatchConnectivityState(
613 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
614 {
615 MutexLock lock(&mu_);
616 grpc_pollset_set* interested_parties = watcher->interested_parties();
617 if (interested_parties != nullptr) {
618 grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
619 }
620 work_serializer_.Schedule(
621 [watcher = watcher->Ref(), state = state_, status = status_]() mutable {
622 auto* watcher_ptr = watcher.get();
623 watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
624 status);
625 },
626 DEBUG_LOCATION);
627 watcher_list_.AddWatcherLocked(std::move(watcher));
628 }
629 // Drain any connectivity state notifications after releasing the mutex.
630 work_serializer_.DrainQueue();
631 }
632
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)633 void Subchannel::CancelConnectivityStateWatch(
634 ConnectivityStateWatcherInterface* watcher) {
635 {
636 MutexLock lock(&mu_);
637 grpc_pollset_set* interested_parties = watcher->interested_parties();
638 if (interested_parties != nullptr) {
639 grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
640 }
641 watcher_list_.RemoveWatcherLocked(watcher);
642 }
643 // Drain any connectivity state notifications after releasing the mutex.
644 // (Shouldn't actually be necessary in this case, but better safe than
645 // sorry.)
646 work_serializer_.DrainQueue();
647 }
648
RequestConnection()649 void Subchannel::RequestConnection() {
650 {
651 MutexLock lock(&mu_);
652 if (state_ == GRPC_CHANNEL_IDLE) {
653 StartConnectingLocked();
654 }
655 }
656 // Drain any connectivity state notifications after releasing the mutex.
657 work_serializer_.DrainQueue();
658 }
659
ResetBackoff()660 void Subchannel::ResetBackoff() {
661 // Hold a ref to ensure cancellation and subsequent deletion of the closure
662 // does not eliminate the last ref and destroy the Subchannel before the
663 // method returns.
664 auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
665 {
666 MutexLock lock(&mu_);
667 backoff_.Reset();
668 if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
669 event_engine_->Cancel(retry_timer_handle_)) {
670 OnRetryTimerLocked();
671 } else if (state_ == GRPC_CHANNEL_CONNECTING) {
672 next_attempt_time_ = Timestamp::Now();
673 }
674 }
675 // Drain any connectivity state notifications after releasing the mutex.
676 work_serializer_.DrainQueue();
677 }
678
Orphaned()679 void Subchannel::Orphaned() {
680 // The subchannel_pool is only used once here in this subchannel, so the
681 // access can be outside of the lock.
682 if (subchannel_pool_ != nullptr) {
683 subchannel_pool_->UnregisterSubchannel(key_, this);
684 subchannel_pool_.reset();
685 }
686 {
687 MutexLock lock(&mu_);
688 CHECK(!shutdown_);
689 shutdown_ = true;
690 connector_.reset();
691 connected_subchannel_.reset();
692 }
693 // Drain any connectivity state notifications after releasing the mutex.
694 work_serializer_.DrainQueue();
695 }
696
GetOrAddDataProducer(UniqueTypeName type,std::function<void (DataProducerInterface **)> get_or_add)697 void Subchannel::GetOrAddDataProducer(
698 UniqueTypeName type,
699 std::function<void(DataProducerInterface**)> get_or_add) {
700 MutexLock lock(&mu_);
701 auto it = data_producer_map_.emplace(type, nullptr).first;
702 get_or_add(&it->second);
703 }
704
RemoveDataProducer(DataProducerInterface * data_producer)705 void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) {
706 MutexLock lock(&mu_);
707 auto it = data_producer_map_.find(data_producer->type());
708 if (it != data_producer_map_.end() && it->second == data_producer) {
709 data_producer_map_.erase(it);
710 }
711 }
712
713 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)714 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
715 const absl::Status& status) {
716 state_ = state;
717 if (status.ok()) {
718 status_ = status;
719 } else {
720 // Augment status message to include IP address.
721 status_ = absl::Status(status.code(),
722 absl::StrCat(grpc_sockaddr_to_uri(&key_.address())
723 .value_or("<unknown address type>"),
724 ": ", status.message()));
725 status.ForEachPayload(
726 [this](absl::string_view key, const absl::Cord& value)
727 // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here,
728 // but that won't work, because we can't pass the lock
729 // annotation through absl::Status::ForEachPayload().
730 ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); });
731 }
732 if (channelz_node_ != nullptr) {
733 channelz_node_->UpdateConnectivityState(state);
734 channelz_node_->AddTraceEvent(
735 channelz::ChannelTrace::Severity::Info,
736 grpc_slice_from_cpp_string(absl::StrCat(
737 "Subchannel connectivity state changed to ",
738 ConnectivityStateName(state),
739 status.ok() ? "" : absl::StrCat(": ", status_.ToString()))));
740 }
741 // Notify watchers.
742 watcher_list_.NotifyLocked(state, status_);
743 }
744
OnRetryTimer()745 void Subchannel::OnRetryTimer() {
746 {
747 MutexLock lock(&mu_);
748 OnRetryTimerLocked();
749 }
750 // Drain any connectivity state notifications after releasing the mutex.
751 work_serializer_.DrainQueue();
752 }
753
OnRetryTimerLocked()754 void Subchannel::OnRetryTimerLocked() {
755 if (shutdown_) return;
756 GRPC_TRACE_LOG(subchannel, INFO)
757 << "subchannel " << this << " " << key_.ToString()
758 << ": backoff delay elapsed, reporting IDLE";
759 SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus());
760 }
761
StartConnectingLocked()762 void Subchannel::StartConnectingLocked() {
763 // Set next attempt time.
764 const Timestamp now = Timestamp::Now();
765 const Timestamp min_deadline = now + min_connect_timeout_;
766 next_attempt_time_ = now + backoff_.NextAttemptDelay();
767 // Report CONNECTING.
768 SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
769 // Start connection attempt.
770 SubchannelConnector::Args args;
771 args.address = &address_for_connect_;
772 args.interested_parties = pollset_set_;
773 args.deadline = std::max(next_attempt_time_, min_deadline);
774 args.channel_args = args_;
775 WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback.
776 connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
777 }
778
OnConnectingFinished(void * arg,grpc_error_handle error)779 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
780 WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
781 {
782 MutexLock lock(&c->mu_);
783 c->OnConnectingFinishedLocked(error);
784 }
785 // Drain any connectivity state notifications after releasing the mutex.
786 c->work_serializer_.DrainQueue();
787 c.reset(DEBUG_LOCATION, "Connect");
788 }
789
OnConnectingFinishedLocked(grpc_error_handle error)790 void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
791 if (shutdown_) {
792 connecting_result_.Reset();
793 return;
794 }
795 // If we didn't get a transport or we fail to publish it, report
796 // TRANSIENT_FAILURE and start the retry timer.
797 // Note that if the connection attempt took longer than the backoff
798 // time, then the timer will fire immediately, and we will quickly
799 // transition back to IDLE.
800 if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
801 const Duration time_until_next_attempt =
802 next_attempt_time_ - Timestamp::Now();
803 GRPC_TRACE_LOG(subchannel, INFO)
804 << "subchannel " << this << " " << key_.ToString()
805 << ": connect failed (" << StatusToString(error)
806 << "), backing off for " << time_until_next_attempt.millis() << " ms";
807 SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
808 grpc_error_to_absl_status(error));
809 retry_timer_handle_ = event_engine_->RunAfter(
810 time_until_next_attempt,
811 [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
812 {
813 ApplicationCallbackExecCtx callback_exec_ctx;
814 ExecCtx exec_ctx;
815 self->OnRetryTimer();
816 // Subchannel deletion might require an active ExecCtx. So if
817 // self.reset() is not called here, the WeakRefCountedPtr
818 // destructor may run after the ExecCtx declared in the callback
819 // is destroyed. Since subchannel may get destroyed when the
820 // WeakRefCountedPtr destructor runs, it may not have an active
821 // ExecCtx - thus leading to crashes.
822 self.reset();
823 }
824 });
825 }
826 }
827
PublishTransportLocked()828 bool Subchannel::PublishTransportLocked() {
829 auto socket_node = std::move(connecting_result_.socket_node);
830 if (connecting_result_.transport->filter_stack_transport() != nullptr) {
831 // Construct channel stack.
832 // Builder takes ownership of transport.
833 ChannelStackBuilderImpl builder(
834 "subchannel", GRPC_CLIENT_SUBCHANNEL,
835 connecting_result_.channel_args.SetObject(
836 std::exchange(connecting_result_.transport, nullptr)));
837 if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
838 return false;
839 }
840 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stack = builder.Build();
841 if (!stack.ok()) {
842 connecting_result_.Reset();
843 LOG(ERROR) << "subchannel " << this << " " << key_.ToString()
844 << ": error initializing subchannel stack: " << stack.status();
845 return false;
846 }
847 connected_subchannel_ = MakeRefCounted<LegacyConnectedSubchannel>(
848 std::move(*stack), args_, channelz_node_);
849 } else {
850 OrphanablePtr<ClientTransport> transport(
851 std::exchange(connecting_result_.transport, nullptr)
852 ->client_transport());
853 InterceptionChainBuilder builder(
854 connecting_result_.channel_args.SetObject(transport.get()));
855 if (channelz_node_ != nullptr) {
856 // TODO(ctiller): If/when we have a good way to access the subchannel
857 // from a filter (maybe GetContext<Subchannel>?), consider replacing
858 // these two hooks with a filter so that we can avoid storing two
859 // separate refs to the channelz node in each connection.
860 builder.AddOnClientInitialMetadata(
861 [channelz_node = channelz_node_](ClientMetadata&) {
862 channelz_node->RecordCallStarted();
863 });
864 builder.AddOnServerTrailingMetadata(
865 [channelz_node = channelz_node_](ServerMetadata& metadata) {
866 if (IsStatusOk(metadata)) {
867 channelz_node->RecordCallSucceeded();
868 } else {
869 channelz_node->RecordCallFailed();
870 }
871 });
872 }
873 CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
874 GRPC_CLIENT_SUBCHANNEL, builder);
875 auto transport_destination =
876 MakeRefCounted<NewConnectedSubchannel::TransportCallDestination>(
877 std::move(transport));
878 auto call_destination = builder.Build(transport_destination);
879 if (!call_destination.ok()) {
880 connecting_result_.Reset();
881 LOG(ERROR) << "subchannel " << this << " " << key_.ToString()
882 << ": error initializing subchannel stack: "
883 << call_destination.status();
884 return false;
885 }
886 connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
887 std::move(*call_destination), std::move(transport_destination), args_);
888 }
889 connecting_result_.Reset();
890 // Publish.
891 GRPC_TRACE_LOG(subchannel, INFO)
892 << "subchannel " << this << " " << key_.ToString()
893 << ": new connected subchannel at " << connected_subchannel_.get();
894 if (channelz_node_ != nullptr) {
895 channelz_node_->SetChildSocket(std::move(socket_node));
896 }
897 // Start watching connected subchannel.
898 connected_subchannel_->StartWatch(
899 pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
900 WeakRef(DEBUG_LOCATION, "state_watcher")));
901 // Report initial state.
902 SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
903 return true;
904 }
905
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)906 ChannelArgs Subchannel::MakeSubchannelArgs(
907 const ChannelArgs& channel_args, const ChannelArgs& address_args,
908 const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
909 const std::string& channel_default_authority) {
910 // Note that we start with the channel-level args and then apply the
911 // per-address args, so that if a value is present in both, the one
912 // in the channel-level args is used. This is particularly important
913 // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
914 // resolvers to set on a per-address basis only if the application
915 // did not explicitly set it at the channel level.
916 return channel_args.UnionWith(address_args)
917 .SetObject(subchannel_pool)
918 // If we haven't already set the default authority arg (i.e., it
919 // was not explicitly set by the application nor overridden by
920 // the resolver), add it from the channel's default.
921 .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
922 // Remove channel args that should not affect subchannel
923 // uniqueness.
924 .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
925 .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
926 .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
927 // Remove all keys with the no-subchannel prefix.
928 .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
929 }
930
931 } // namespace grpc_core
932