1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/client_channel/subchannel.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23
24 #include <algorithm>
25 #include <memory>
26 #include <new>
27 #include <utility>
28
29 #include "absl/status/statusor.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34
35 #include <grpc/impl/channel_arg_names.h>
36 #include <grpc/slice.h>
37 #include <grpc/status.h>
38 #include <grpc/support/log.h>
39
40 #include "src/core/client_channel/subchannel_pool_interface.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/channel_stack.h"
45 #include "src/core/lib/channel/channel_stack_builder_impl.h"
46 #include "src/core/lib/channel/channel_trace.h"
47 #include "src/core/lib/channel/channelz.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/stats.h"
50 #include "src/core/lib/debug/stats_data.h"
51 #include "src/core/lib/debug/trace.h"
52 #include "src/core/lib/gpr/alloc.h"
53 #include "src/core/lib/gpr/useful.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/gprpp/status_helper.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/iomgr/pollset_set.h"
61 #include "src/core/lib/promise/cancel_callback.h"
62 #include "src/core/lib/promise/seq.h"
63 #include "src/core/lib/slice/slice_internal.h"
64 #include "src/core/lib/surface/channel_init.h"
65 #include "src/core/lib/surface/channel_stack_type.h"
66 #include "src/core/lib/surface/init_internally.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/error_utils.h"
69 #include "src/core/lib/transport/transport.h"
70
71 // Backoff parameters.
72 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
73 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
74 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
75 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
76 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
77
78 // Conversion between subchannel call and call stack.
79 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
80 (grpc_call_stack*)((char*)(call) + \
81 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
82 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
83 (SubchannelCall*)(((char*)(call_stack)) - \
84 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
85
86 namespace grpc_core {
87
88 using ::grpc_event_engine::experimental::EventEngine;
89
90 TraceFlag grpc_trace_subchannel(false, "subchannel");
91 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
92
93 //
94 // ConnectedSubchannel
95 //
96
ConnectedSubchannel(grpc_channel_stack * channel_stack,const ChannelArgs & args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)97 ConnectedSubchannel::ConnectedSubchannel(
98 grpc_channel_stack* channel_stack, const ChannelArgs& args,
99 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
100 : RefCounted<ConnectedSubchannel>(
101 GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
102 ? "ConnectedSubchannel"
103 : nullptr),
104 channel_stack_(channel_stack),
105 args_(args),
106 channelz_subchannel_(std::move(channelz_subchannel)) {}
107
~ConnectedSubchannel()108 ConnectedSubchannel::~ConnectedSubchannel() {
109 GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
110 }
111
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)112 void ConnectedSubchannel::StartWatch(
113 grpc_pollset_set* interested_parties,
114 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
115 grpc_transport_op* op = grpc_make_transport_op(nullptr);
116 op->start_connectivity_watch = std::move(watcher);
117 op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
118 op->bind_pollset_set = interested_parties;
119 grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
120 elem->filter->start_transport_op(elem, op);
121 }
122
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)123 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
124 grpc_closure* on_ack) {
125 grpc_transport_op* op = grpc_make_transport_op(nullptr);
126 grpc_channel_element* elem;
127 op->send_ping.on_initiate = on_initiate;
128 op->send_ping.on_ack = on_ack;
129 elem = grpc_channel_stack_element(channel_stack_, 0);
130 elem->filter->start_transport_op(elem, op);
131 }
132
GetInitialCallSizeEstimate() const133 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
134 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
135 channel_stack_->call_stack_size;
136 }
137
MakeCallPromise(CallArgs call_args)138 ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise(
139 CallArgs call_args) {
140 // If not using channelz, we just need to call the channel stack.
141 if (channelz_subchannel() == nullptr) {
142 return channel_stack_->MakeClientCallPromise(std::move(call_args));
143 }
144 // Otherwise, we need to wrap the channel stack promise with code that
145 // handles the channelz updates.
146 return OnCancel(
147 Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
148 [self = Ref()](ServerMetadataHandle metadata) {
149 channelz::SubchannelNode* channelz_subchannel =
150 self->channelz_subchannel();
151 GPR_ASSERT(channelz_subchannel != nullptr);
152 if (metadata->get(GrpcStatusMetadata())
153 .value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
154 channelz_subchannel->RecordCallFailed();
155 } else {
156 channelz_subchannel->RecordCallSucceeded();
157 }
158 return metadata;
159 }),
160 [self = Ref()]() {
161 channelz::SubchannelNode* channelz_subchannel =
162 self->channelz_subchannel();
163 GPR_ASSERT(channelz_subchannel != nullptr);
164 channelz_subchannel->RecordCallFailed();
165 });
166 }
167
168 //
169 // SubchannelCall
170 //
171
Create(Args args,grpc_error_handle * error)172 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
173 grpc_error_handle* error) {
174 const size_t allocation_size =
175 args.connected_subchannel->GetInitialCallSizeEstimate();
176 Arena* arena = args.arena;
177 return RefCountedPtr<SubchannelCall>(new (
178 arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
179 }
180
SubchannelCall(Args args,grpc_error_handle * error)181 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
182 : connected_subchannel_(std::move(args.connected_subchannel)),
183 deadline_(args.deadline) {
184 grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
185 const grpc_call_element_args call_args = {
186 callstk, // call_stack
187 nullptr, // server_transport_data
188 args.context, // context
189 args.path.c_slice(), // path
190 args.start_time, // start_time
191 args.deadline, // deadline
192 args.arena, // arena
193 args.call_combiner // call_combiner
194 };
195 *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
196 SubchannelCall::Destroy, this, &call_args);
197 if (GPR_UNLIKELY(!error->ok())) {
198 gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str());
199 return;
200 }
201 grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
202 auto* channelz_node = connected_subchannel_->channelz_subchannel();
203 if (channelz_node != nullptr) {
204 channelz_node->RecordCallStarted();
205 }
206 }
207
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)208 void SubchannelCall::StartTransportStreamOpBatch(
209 grpc_transport_stream_op_batch* batch) {
210 MaybeInterceptRecvTrailingMetadata(batch);
211 grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
212 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
213 GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
214 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
215 }
216
GetCallStack()217 grpc_call_stack* SubchannelCall::GetCallStack() {
218 return SUBCHANNEL_CALL_TO_CALL_STACK(this);
219 }
220
SetAfterCallStackDestroy(grpc_closure * closure)221 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
222 GPR_ASSERT(after_call_stack_destroy_ == nullptr);
223 GPR_ASSERT(closure != nullptr);
224 after_call_stack_destroy_ = closure;
225 }
226
Ref()227 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
228 IncrementRefCount();
229 return RefCountedPtr<SubchannelCall>(this);
230 }
231
Ref(const DebugLocation & location,const char * reason)232 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location,
233 const char* reason) {
234 IncrementRefCount(location, reason);
235 return RefCountedPtr<SubchannelCall>(this);
236 }
237
Unref()238 void SubchannelCall::Unref() {
239 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
240 }
241
Unref(const DebugLocation &,const char * reason)242 void SubchannelCall::Unref(const DebugLocation& /*location*/,
243 const char* reason) {
244 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
245 }
246
Destroy(void * arg,grpc_error_handle)247 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
248 SubchannelCall* self = static_cast<SubchannelCall*>(arg);
249 // Keep some members before destroying the subchannel call.
250 grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
251 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
252 std::move(self->connected_subchannel_);
253 // Destroy the subchannel call.
254 self->~SubchannelCall();
255 // Destroy the call stack. This should be after destroying the subchannel
256 // call, because call->after_call_stack_destroy(), if not null, will free the
257 // call arena.
258 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
259 after_call_stack_destroy);
260 // Automatically reset connected_subchannel. This should be after destroying
261 // the call stack, because destroying call stack needs access to the channel
262 // stack.
263 }
264
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)265 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
266 grpc_transport_stream_op_batch* batch) {
267 // only intercept payloads with recv trailing.
268 if (!batch->recv_trailing_metadata) {
269 return;
270 }
271 // only add interceptor is channelz is enabled.
272 if (connected_subchannel_->channelz_subchannel() == nullptr) {
273 return;
274 }
275 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
276 this, grpc_schedule_on_exec_ctx);
277 // save some state needed for the interception callback.
278 GPR_ASSERT(recv_trailing_metadata_ == nullptr);
279 recv_trailing_metadata_ =
280 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
281 original_recv_trailing_metadata_ =
282 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
283 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
284 &recv_trailing_metadata_ready_;
285 }
286
287 namespace {
288
289 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)290 void GetCallStatus(grpc_status_code* status, Timestamp deadline,
291 grpc_metadata_batch* md_batch, grpc_error_handle error) {
292 if (!error.ok()) {
293 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
294 } else {
295 *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
296 }
297 }
298
299 } // namespace
300
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)301 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
302 grpc_error_handle error) {
303 SubchannelCall* call = static_cast<SubchannelCall*>(arg);
304 GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
305 grpc_status_code status = GRPC_STATUS_OK;
306 GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
307 channelz::SubchannelNode* channelz_subchannel =
308 call->connected_subchannel_->channelz_subchannel();
309 GPR_ASSERT(channelz_subchannel != nullptr);
310 if (status == GRPC_STATUS_OK) {
311 channelz_subchannel->RecordCallSucceeded();
312 } else {
313 channelz_subchannel->RecordCallFailed();
314 }
315 Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
316 }
317
IncrementRefCount()318 void SubchannelCall::IncrementRefCount() {
319 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
320 }
321
IncrementRefCount(const DebugLocation &,const char * reason)322 void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/,
323 const char* reason) {
324 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
325 }
326
327 //
328 // Subchannel::ConnectedSubchannelStateWatcher
329 //
330
331 class Subchannel::ConnectedSubchannelStateWatcher final
332 : public AsyncConnectivityStateWatcherInterface {
333 public:
334 // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)335 explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
336 : subchannel_(std::move(c)) {}
337
~ConnectedSubchannelStateWatcher()338 ~ConnectedSubchannelStateWatcher() override {
339 subchannel_.reset(DEBUG_LOCATION, "state_watcher");
340 }
341
342 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)343 void OnConnectivityStateChange(grpc_connectivity_state new_state,
344 const absl::Status& status) override {
345 Subchannel* c = subchannel_.get();
346 {
347 MutexLock lock(&c->mu_);
348 // If we're either shutting down or have already seen this connection
349 // failure (i.e., c->connected_subchannel_ is null), do nothing.
350 //
351 // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
352 // upon connection close. So if the server gracefully shuts down,
353 // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
354 // will see only SHUTDOWN. Either way, we react to the first one we
355 // see, ignoring anything that happens after that.
356 if (c->connected_subchannel_ == nullptr) return;
357 if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
358 new_state == GRPC_CHANNEL_SHUTDOWN) {
359 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
360 gpr_log(GPR_INFO,
361 "subchannel %p %s: Connected subchannel %p reports %s: %s", c,
362 c->key_.ToString().c_str(), c->connected_subchannel_.get(),
363 ConnectivityStateName(new_state), status.ToString().c_str());
364 }
365 c->connected_subchannel_.reset();
366 if (c->channelz_node() != nullptr) {
367 c->channelz_node()->SetChildSocket(nullptr);
368 }
369 // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
370 // pass along the status from the transport, since it may have
371 // keepalive info attached to it that the channel needs.
372 // TODO(roth): Consider whether there's a cleaner way to do this.
373 c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
374 c->backoff_.Reset();
375 }
376 }
377 // Drain any connectivity state notifications after releasing the mutex.
378 c->work_serializer_.DrainQueue();
379 }
380
381 WeakRefCountedPtr<Subchannel> subchannel_;
382 };
383
384 //
385 // Subchannel::ConnectivityStateWatcherList
386 //
387
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)388 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
389 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
390 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
391 }
392
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)393 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
394 ConnectivityStateWatcherInterface* watcher) {
395 watchers_.erase(watcher);
396 }
397
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)398 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
399 grpc_connectivity_state state, const absl::Status& status) {
400 for (const auto& p : watchers_) {
401 subchannel_->work_serializer_.Schedule(
402 [watcher = p.second->Ref(), state, status]() mutable {
403 auto* watcher_ptr = watcher.get();
404 watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
405 status);
406 },
407 DEBUG_LOCATION);
408 }
409 }
410
411 //
412 // Subchannel
413 //
414
415 namespace {
416
ParseArgsForBackoffValues(const ChannelArgs & args,Duration * min_connect_timeout)417 BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args,
418 Duration* min_connect_timeout) {
419 const absl::optional<Duration> fixed_reconnect_backoff =
420 args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms");
421 if (fixed_reconnect_backoff.has_value()) {
422 const Duration backoff =
423 std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff);
424 *min_connect_timeout = backoff;
425 return BackOff::Options()
426 .set_initial_backoff(backoff)
427 .set_multiplier(1.0)
428 .set_jitter(0.0)
429 .set_max_backoff(backoff);
430 }
431 const Duration initial_backoff = std::max(
432 Duration::Milliseconds(100),
433 args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)
434 .value_or(Duration::Seconds(
435 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS)));
436 *min_connect_timeout =
437 std::max(Duration::Milliseconds(100),
438 args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)
439 .value_or(Duration::Seconds(
440 GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS)));
441 const Duration max_backoff =
442 std::max(Duration::Milliseconds(100),
443 args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)
444 .value_or(Duration::Seconds(
445 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS)));
446 return BackOff::Options()
447 .set_initial_backoff(initial_backoff)
448 .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
449 .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER)
450 .set_max_backoff(max_backoff);
451 }
452
453 } // namespace
454
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const ChannelArgs & args)455 Subchannel::Subchannel(SubchannelKey key,
456 OrphanablePtr<SubchannelConnector> connector,
457 const ChannelArgs& args)
458 : DualRefCounted<Subchannel>(
459 GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel"
460 : nullptr),
461 key_(std::move(key)),
462 args_(args),
463 pollset_set_(grpc_pollset_set_create()),
464 connector_(std::move(connector)),
465 watcher_list_(this),
466 work_serializer_(args_.GetObjectRef<EventEngine>()),
467 backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
468 event_engine_(args_.GetObjectRef<EventEngine>()) {
469 // A grpc_init is added here to ensure that grpc_shutdown does not happen
470 // until the subchannel is destroyed. Subchannels can persist longer than
471 // channels because they maybe reused/shared among multiple channels. As a
472 // result the subchannel destruction happens asynchronously to channel
473 // destruction. If the last channel destruction triggers a grpc_shutdown
474 // before the last subchannel destruction, then there maybe race conditions
475 // triggering segmentation faults. To prevent this issue, we call a grpc_init
476 // here and a grpc_shutdown in the subchannel destructor.
477 InitInternally();
478 global_stats().IncrementClientSubchannelsCreated();
479 GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
480 grpc_schedule_on_exec_ctx);
481 // Check proxy mapper to determine address to connect to and channel
482 // args to use.
483 address_for_connect_ = CoreConfiguration::Get()
484 .proxy_mapper_registry()
485 .MapAddress(key_.address(), &args_)
486 .value_or(key_.address());
487 // Initialize channelz.
488 const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
489 .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
490 if (channelz_enabled) {
491 const size_t channel_tracer_max_memory = Clamp(
492 args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
493 .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT),
494 0, INT_MAX);
495 channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
496 grpc_sockaddr_to_uri(&key_.address())
497 .value_or("<unknown address type>"),
498 channel_tracer_max_memory);
499 channelz_node_->AddTraceEvent(
500 channelz::ChannelTrace::Severity::Info,
501 grpc_slice_from_static_string("subchannel created"));
502 }
503 }
504
~Subchannel()505 Subchannel::~Subchannel() {
506 if (channelz_node_ != nullptr) {
507 channelz_node_->AddTraceEvent(
508 channelz::ChannelTrace::Severity::Info,
509 grpc_slice_from_static_string("Subchannel destroyed"));
510 channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
511 }
512 connector_.reset();
513 grpc_pollset_set_destroy(pollset_set_);
514 // grpc_shutdown is called here because grpc_init is called in the ctor.
515 ShutdownInternally();
516 }
517
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_resolved_address & address,const ChannelArgs & args)518 RefCountedPtr<Subchannel> Subchannel::Create(
519 OrphanablePtr<SubchannelConnector> connector,
520 const grpc_resolved_address& address, const ChannelArgs& args) {
521 SubchannelKey key(address, args);
522 auto* subchannel_pool = args.GetObject<SubchannelPoolInterface>();
523 GPR_ASSERT(subchannel_pool != nullptr);
524 RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
525 if (c != nullptr) {
526 return c;
527 }
528 c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
529 // Try to register the subchannel before setting the subchannel pool.
530 // Otherwise, in case of a registration race, unreffing c in
531 // RegisterSubchannel() will cause c to be tried to be unregistered, while
532 // its key maps to a different subchannel.
533 RefCountedPtr<Subchannel> registered =
534 subchannel_pool->RegisterSubchannel(c->key_, c);
535 if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
536 return registered;
537 }
538
ThrottleKeepaliveTime(int new_keepalive_time)539 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
540 MutexLock lock(&mu_);
541 // Only update the value if the new keepalive time is larger.
542 if (new_keepalive_time > keepalive_time_) {
543 keepalive_time_ = new_keepalive_time;
544 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
545 gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d",
546 this, key_.ToString().c_str(), new_keepalive_time);
547 }
548 args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time);
549 }
550 }
551
channelz_node()552 channelz::SubchannelNode* Subchannel::channelz_node() {
553 return channelz_node_.get();
554 }
555
WatchConnectivityState(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)556 void Subchannel::WatchConnectivityState(
557 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
558 {
559 MutexLock lock(&mu_);
560 grpc_pollset_set* interested_parties = watcher->interested_parties();
561 if (interested_parties != nullptr) {
562 grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
563 }
564 work_serializer_.Schedule(
565 [watcher = watcher->Ref(), state = state_, status = status_]() mutable {
566 auto* watcher_ptr = watcher.get();
567 watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
568 status);
569 },
570 DEBUG_LOCATION);
571 watcher_list_.AddWatcherLocked(std::move(watcher));
572 }
573 // Drain any connectivity state notifications after releasing the mutex.
574 work_serializer_.DrainQueue();
575 }
576
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)577 void Subchannel::CancelConnectivityStateWatch(
578 ConnectivityStateWatcherInterface* watcher) {
579 {
580 MutexLock lock(&mu_);
581 grpc_pollset_set* interested_parties = watcher->interested_parties();
582 if (interested_parties != nullptr) {
583 grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
584 }
585 watcher_list_.RemoveWatcherLocked(watcher);
586 }
587 // Drain any connectivity state notifications after releasing the mutex.
588 // (Shouldn't actually be necessary in this case, but better safe than sorry.)
589 work_serializer_.DrainQueue();
590 }
591
RequestConnection()592 void Subchannel::RequestConnection() {
593 {
594 MutexLock lock(&mu_);
595 if (state_ == GRPC_CHANNEL_IDLE) {
596 StartConnectingLocked();
597 }
598 }
599 // Drain any connectivity state notifications after releasing the mutex.
600 work_serializer_.DrainQueue();
601 }
602
ResetBackoff()603 void Subchannel::ResetBackoff() {
604 // Hold a ref to ensure cancellation and subsequent deletion of the closure
605 // does not eliminate the last ref and destroy the Subchannel before the
606 // method returns.
607 auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
608 {
609 MutexLock lock(&mu_);
610 backoff_.Reset();
611 if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
612 event_engine_->Cancel(retry_timer_handle_)) {
613 OnRetryTimerLocked();
614 } else if (state_ == GRPC_CHANNEL_CONNECTING) {
615 next_attempt_time_ = Timestamp::Now();
616 }
617 }
618 // Drain any connectivity state notifications after releasing the mutex.
619 work_serializer_.DrainQueue();
620 }
621
Orphaned()622 void Subchannel::Orphaned() {
623 // The subchannel_pool is only used once here in this subchannel, so the
624 // access can be outside of the lock.
625 if (subchannel_pool_ != nullptr) {
626 subchannel_pool_->UnregisterSubchannel(key_, this);
627 subchannel_pool_.reset();
628 }
629 {
630 MutexLock lock(&mu_);
631 GPR_ASSERT(!shutdown_);
632 shutdown_ = true;
633 connector_.reset();
634 connected_subchannel_.reset();
635 }
636 // Drain any connectivity state notifications after releasing the mutex.
637 work_serializer_.DrainQueue();
638 }
639
GetOrAddDataProducer(UniqueTypeName type,std::function<void (DataProducerInterface **)> get_or_add)640 void Subchannel::GetOrAddDataProducer(
641 UniqueTypeName type,
642 std::function<void(DataProducerInterface**)> get_or_add) {
643 MutexLock lock(&mu_);
644 auto it = data_producer_map_.emplace(type, nullptr).first;
645 get_or_add(&it->second);
646 }
647
RemoveDataProducer(DataProducerInterface * data_producer)648 void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) {
649 MutexLock lock(&mu_);
650 auto it = data_producer_map_.find(data_producer->type());
651 if (it != data_producer_map_.end() && it->second == data_producer) {
652 data_producer_map_.erase(it);
653 }
654 }
655
656 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)657 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
658 const absl::Status& status) {
659 state_ = state;
660 if (status.ok()) {
661 status_ = status;
662 } else {
663 // Augment status message to include IP address.
664 status_ = absl::Status(status.code(),
665 absl::StrCat(grpc_sockaddr_to_uri(&key_.address())
666 .value_or("<unknown address type>"),
667 ": ", status.message()));
668 status.ForEachPayload(
669 [this](absl::string_view key, const absl::Cord& value)
670 // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here,
671 // but that won't work, because we can't pass the lock
672 // annotation through absl::Status::ForEachPayload().
673 ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); });
674 }
675 if (channelz_node_ != nullptr) {
676 channelz_node_->UpdateConnectivityState(state);
677 channelz_node_->AddTraceEvent(
678 channelz::ChannelTrace::Severity::Info,
679 grpc_slice_from_cpp_string(absl::StrCat(
680 "Subchannel connectivity state changed to ",
681 ConnectivityStateName(state),
682 status.ok() ? "" : absl::StrCat(": ", status_.ToString()))));
683 }
684 // Notify watchers.
685 watcher_list_.NotifyLocked(state, status_);
686 }
687
OnRetryTimer()688 void Subchannel::OnRetryTimer() {
689 {
690 MutexLock lock(&mu_);
691 OnRetryTimerLocked();
692 }
693 // Drain any connectivity state notifications after releasing the mutex.
694 work_serializer_.DrainQueue();
695 }
696
OnRetryTimerLocked()697 void Subchannel::OnRetryTimerLocked() {
698 if (shutdown_) return;
699 gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE",
700 this, key_.ToString().c_str());
701 SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus());
702 }
703
StartConnectingLocked()704 void Subchannel::StartConnectingLocked() {
705 // Set next attempt time.
706 const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now();
707 next_attempt_time_ = backoff_.NextAttemptTime();
708 // Report CONNECTING.
709 SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
710 // Start connection attempt.
711 SubchannelConnector::Args args;
712 args.address = &address_for_connect_;
713 args.interested_parties = pollset_set_;
714 args.deadline = std::max(next_attempt_time_, min_deadline);
715 args.channel_args = args_;
716 WeakRef(DEBUG_LOCATION, "Connect").release(); // Ref held by callback.
717 connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
718 }
719
OnConnectingFinished(void * arg,grpc_error_handle error)720 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
721 WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
722 {
723 MutexLock lock(&c->mu_);
724 c->OnConnectingFinishedLocked(error);
725 }
726 // Drain any connectivity state notifications after releasing the mutex.
727 c->work_serializer_.DrainQueue();
728 c.reset(DEBUG_LOCATION, "Connect");
729 }
730
OnConnectingFinishedLocked(grpc_error_handle error)731 void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
732 if (shutdown_) {
733 connecting_result_.Reset();
734 return;
735 }
736 // If we didn't get a transport or we fail to publish it, report
737 // TRANSIENT_FAILURE and start the retry timer.
738 // Note that if the connection attempt took longer than the backoff
739 // time, then the timer will fire immediately, and we will quickly
740 // transition back to IDLE.
741 if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
742 const Duration time_until_next_attempt =
743 next_attempt_time_ - Timestamp::Now();
744 gpr_log(GPR_INFO,
745 "subchannel %p %s: connect failed (%s), backing off for %" PRId64
746 " ms",
747 this, key_.ToString().c_str(), StatusToString(error).c_str(),
748 time_until_next_attempt.millis());
749 SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
750 grpc_error_to_absl_status(error));
751 retry_timer_handle_ = event_engine_->RunAfter(
752 time_until_next_attempt,
753 [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
754 {
755 ApplicationCallbackExecCtx callback_exec_ctx;
756 ExecCtx exec_ctx;
757 self->OnRetryTimer();
758 // Subchannel deletion might require an active ExecCtx. So if
759 // self.reset() is not called here, the WeakRefCountedPtr destructor
760 // may run after the ExecCtx declared in the callback is destroyed.
761 // Since subchannel may get destroyed when the WeakRefCountedPtr
762 // destructor runs, it may not have an active ExecCtx - thus leading
763 // to crashes.
764 self.reset();
765 }
766 });
767 }
768 }
769
PublishTransportLocked()770 bool Subchannel::PublishTransportLocked() {
771 // Construct channel stack.
772 // Builder takes ownership of transport.
773 ChannelStackBuilderImpl builder(
774 "subchannel", GRPC_CLIENT_SUBCHANNEL,
775 connecting_result_.channel_args.SetObject(
776 std::exchange(connecting_result_.transport, nullptr)));
777 if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
778 return false;
779 }
780 absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stk = builder.Build();
781 if (!stk.ok()) {
782 auto error = absl_status_to_grpc_error(stk.status());
783 connecting_result_.Reset();
784 gpr_log(GPR_ERROR,
785 "subchannel %p %s: error initializing subchannel stack: %s", this,
786 key_.ToString().c_str(), StatusToString(error).c_str());
787 return false;
788 }
789 RefCountedPtr<channelz::SocketNode> socket =
790 std::move(connecting_result_.socket_node);
791 connecting_result_.Reset();
792 if (shutdown_) return false;
793 // Publish.
794 connected_subchannel_.reset(
795 new ConnectedSubchannel(stk->release(), args_, channelz_node_));
796 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
797 gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this,
798 key_.ToString().c_str(), connected_subchannel_.get());
799 }
800 if (channelz_node_ != nullptr) {
801 channelz_node_->SetChildSocket(std::move(socket));
802 }
803 // Start watching connected subchannel.
804 connected_subchannel_->StartWatch(
805 pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
806 WeakRef(DEBUG_LOCATION, "state_watcher")));
807 // Report initial state.
808 SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
809 return true;
810 }
811
812 } // namespace grpc_core
813