• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/client_channel/subchannel.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 
24 #include <algorithm>
25 #include <memory>
26 #include <new>
27 #include <utility>
28 
29 #include "absl/status/statusor.h"
30 #include "absl/strings/cord.h"
31 #include "absl/strings/str_cat.h"
32 #include "absl/strings/string_view.h"
33 #include "absl/types/optional.h"
34 
35 #include <grpc/impl/channel_arg_names.h>
36 #include <grpc/slice.h>
37 #include <grpc/status.h>
38 #include <grpc/support/log.h>
39 
40 #include "src/core/client_channel/subchannel_pool_interface.h"
41 #include "src/core/lib/address_utils/sockaddr_utils.h"
42 #include "src/core/lib/backoff/backoff.h"
43 #include "src/core/lib/channel/channel_args.h"
44 #include "src/core/lib/channel/channel_stack.h"
45 #include "src/core/lib/channel/channel_stack_builder_impl.h"
46 #include "src/core/lib/channel/channel_trace.h"
47 #include "src/core/lib/channel/channelz.h"
48 #include "src/core/lib/config/core_configuration.h"
49 #include "src/core/lib/debug/stats.h"
50 #include "src/core/lib/debug/stats_data.h"
51 #include "src/core/lib/debug/trace.h"
52 #include "src/core/lib/gpr/alloc.h"
53 #include "src/core/lib/gpr/useful.h"
54 #include "src/core/lib/gprpp/debug_location.h"
55 #include "src/core/lib/gprpp/ref_counted_ptr.h"
56 #include "src/core/lib/gprpp/status_helper.h"
57 #include "src/core/lib/gprpp/sync.h"
58 #include "src/core/lib/handshaker/proxy_mapper_registry.h"
59 #include "src/core/lib/iomgr/exec_ctx.h"
60 #include "src/core/lib/iomgr/pollset_set.h"
61 #include "src/core/lib/promise/cancel_callback.h"
62 #include "src/core/lib/promise/seq.h"
63 #include "src/core/lib/slice/slice_internal.h"
64 #include "src/core/lib/surface/channel_init.h"
65 #include "src/core/lib/surface/channel_stack_type.h"
66 #include "src/core/lib/surface/init_internally.h"
67 #include "src/core/lib/transport/connectivity_state.h"
68 #include "src/core/lib/transport/error_utils.h"
69 #include "src/core/lib/transport/transport.h"
70 
71 // Backoff parameters.
72 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
73 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
74 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
75 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
76 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
77 
78 // Conversion between subchannel call and call stack.
79 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
80   (grpc_call_stack*)((char*)(call) +        \
81                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
82 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
83   (SubchannelCall*)(((char*)(call_stack)) -      \
84                     GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
85 
86 namespace grpc_core {
87 
88 using ::grpc_event_engine::experimental::EventEngine;
89 
90 TraceFlag grpc_trace_subchannel(false, "subchannel");
91 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
92 
93 //
94 // ConnectedSubchannel
95 //
96 
ConnectedSubchannel(grpc_channel_stack * channel_stack,const ChannelArgs & args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)97 ConnectedSubchannel::ConnectedSubchannel(
98     grpc_channel_stack* channel_stack, const ChannelArgs& args,
99     RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
100     : RefCounted<ConnectedSubchannel>(
101           GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
102               ? "ConnectedSubchannel"
103               : nullptr),
104       channel_stack_(channel_stack),
105       args_(args),
106       channelz_subchannel_(std::move(channelz_subchannel)) {}
107 
~ConnectedSubchannel()108 ConnectedSubchannel::~ConnectedSubchannel() {
109   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
110 }
111 
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)112 void ConnectedSubchannel::StartWatch(
113     grpc_pollset_set* interested_parties,
114     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
115   grpc_transport_op* op = grpc_make_transport_op(nullptr);
116   op->start_connectivity_watch = std::move(watcher);
117   op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
118   op->bind_pollset_set = interested_parties;
119   grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
120   elem->filter->start_transport_op(elem, op);
121 }
122 
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)123 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
124                                grpc_closure* on_ack) {
125   grpc_transport_op* op = grpc_make_transport_op(nullptr);
126   grpc_channel_element* elem;
127   op->send_ping.on_initiate = on_initiate;
128   op->send_ping.on_ack = on_ack;
129   elem = grpc_channel_stack_element(channel_stack_, 0);
130   elem->filter->start_transport_op(elem, op);
131 }
132 
GetInitialCallSizeEstimate() const133 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
134   return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
135          channel_stack_->call_stack_size;
136 }
137 
MakeCallPromise(CallArgs call_args)138 ArenaPromise<ServerMetadataHandle> ConnectedSubchannel::MakeCallPromise(
139     CallArgs call_args) {
140   // If not using channelz, we just need to call the channel stack.
141   if (channelz_subchannel() == nullptr) {
142     return channel_stack_->MakeClientCallPromise(std::move(call_args));
143   }
144   // Otherwise, we need to wrap the channel stack promise with code that
145   // handles the channelz updates.
146   return OnCancel(
147       Seq(channel_stack_->MakeClientCallPromise(std::move(call_args)),
148           [self = Ref()](ServerMetadataHandle metadata) {
149             channelz::SubchannelNode* channelz_subchannel =
150                 self->channelz_subchannel();
151             GPR_ASSERT(channelz_subchannel != nullptr);
152             if (metadata->get(GrpcStatusMetadata())
153                     .value_or(GRPC_STATUS_UNKNOWN) != GRPC_STATUS_OK) {
154               channelz_subchannel->RecordCallFailed();
155             } else {
156               channelz_subchannel->RecordCallSucceeded();
157             }
158             return metadata;
159           }),
160       [self = Ref()]() {
161         channelz::SubchannelNode* channelz_subchannel =
162             self->channelz_subchannel();
163         GPR_ASSERT(channelz_subchannel != nullptr);
164         channelz_subchannel->RecordCallFailed();
165       });
166 }
167 
168 //
169 // SubchannelCall
170 //
171 
Create(Args args,grpc_error_handle * error)172 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
173                                                      grpc_error_handle* error) {
174   const size_t allocation_size =
175       args.connected_subchannel->GetInitialCallSizeEstimate();
176   Arena* arena = args.arena;
177   return RefCountedPtr<SubchannelCall>(new (
178       arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
179 }
180 
SubchannelCall(Args args,grpc_error_handle * error)181 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
182     : connected_subchannel_(std::move(args.connected_subchannel)),
183       deadline_(args.deadline) {
184   grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
185   const grpc_call_element_args call_args = {
186       callstk,              // call_stack
187       nullptr,              // server_transport_data
188       args.context,         // context
189       args.path.c_slice(),  // path
190       args.start_time,      // start_time
191       args.deadline,        // deadline
192       args.arena,           // arena
193       args.call_combiner    // call_combiner
194   };
195   *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
196                                 SubchannelCall::Destroy, this, &call_args);
197   if (GPR_UNLIKELY(!error->ok())) {
198     gpr_log(GPR_ERROR, "error: %s", StatusToString(*error).c_str());
199     return;
200   }
201   grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
202   auto* channelz_node = connected_subchannel_->channelz_subchannel();
203   if (channelz_node != nullptr) {
204     channelz_node->RecordCallStarted();
205   }
206 }
207 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)208 void SubchannelCall::StartTransportStreamOpBatch(
209     grpc_transport_stream_op_batch* batch) {
210   MaybeInterceptRecvTrailingMetadata(batch);
211   grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
212   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
213   GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
214   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
215 }
216 
GetCallStack()217 grpc_call_stack* SubchannelCall::GetCallStack() {
218   return SUBCHANNEL_CALL_TO_CALL_STACK(this);
219 }
220 
SetAfterCallStackDestroy(grpc_closure * closure)221 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
222   GPR_ASSERT(after_call_stack_destroy_ == nullptr);
223   GPR_ASSERT(closure != nullptr);
224   after_call_stack_destroy_ = closure;
225 }
226 
Ref()227 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
228   IncrementRefCount();
229   return RefCountedPtr<SubchannelCall>(this);
230 }
231 
Ref(const DebugLocation & location,const char * reason)232 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location,
233                                                   const char* reason) {
234   IncrementRefCount(location, reason);
235   return RefCountedPtr<SubchannelCall>(this);
236 }
237 
Unref()238 void SubchannelCall::Unref() {
239   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
240 }
241 
Unref(const DebugLocation &,const char * reason)242 void SubchannelCall::Unref(const DebugLocation& /*location*/,
243                            const char* reason) {
244   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
245 }
246 
Destroy(void * arg,grpc_error_handle)247 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
248   SubchannelCall* self = static_cast<SubchannelCall*>(arg);
249   // Keep some members before destroying the subchannel call.
250   grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
251   RefCountedPtr<ConnectedSubchannel> connected_subchannel =
252       std::move(self->connected_subchannel_);
253   // Destroy the subchannel call.
254   self->~SubchannelCall();
255   // Destroy the call stack. This should be after destroying the subchannel
256   // call, because call->after_call_stack_destroy(), if not null, will free the
257   // call arena.
258   grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
259                           after_call_stack_destroy);
260   // Automatically reset connected_subchannel. This should be after destroying
261   // the call stack, because destroying call stack needs access to the channel
262   // stack.
263 }
264 
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)265 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
266     grpc_transport_stream_op_batch* batch) {
267   // only intercept payloads with recv trailing.
268   if (!batch->recv_trailing_metadata) {
269     return;
270   }
271   // only add interceptor is channelz is enabled.
272   if (connected_subchannel_->channelz_subchannel() == nullptr) {
273     return;
274   }
275   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
276                     this, grpc_schedule_on_exec_ctx);
277   // save some state needed for the interception callback.
278   GPR_ASSERT(recv_trailing_metadata_ == nullptr);
279   recv_trailing_metadata_ =
280       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
281   original_recv_trailing_metadata_ =
282       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
283   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
284       &recv_trailing_metadata_ready_;
285 }
286 
287 namespace {
288 
289 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)290 void GetCallStatus(grpc_status_code* status, Timestamp deadline,
291                    grpc_metadata_batch* md_batch, grpc_error_handle error) {
292   if (!error.ok()) {
293     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
294   } else {
295     *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
296   }
297 }
298 
299 }  // namespace
300 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)301 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
302                                                grpc_error_handle error) {
303   SubchannelCall* call = static_cast<SubchannelCall*>(arg);
304   GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
305   grpc_status_code status = GRPC_STATUS_OK;
306   GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
307   channelz::SubchannelNode* channelz_subchannel =
308       call->connected_subchannel_->channelz_subchannel();
309   GPR_ASSERT(channelz_subchannel != nullptr);
310   if (status == GRPC_STATUS_OK) {
311     channelz_subchannel->RecordCallSucceeded();
312   } else {
313     channelz_subchannel->RecordCallFailed();
314   }
315   Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
316 }
317 
IncrementRefCount()318 void SubchannelCall::IncrementRefCount() {
319   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
320 }
321 
IncrementRefCount(const DebugLocation &,const char * reason)322 void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/,
323                                        const char* reason) {
324   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
325 }
326 
327 //
328 // Subchannel::ConnectedSubchannelStateWatcher
329 //
330 
331 class Subchannel::ConnectedSubchannelStateWatcher final
332     : public AsyncConnectivityStateWatcherInterface {
333  public:
334   // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)335   explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
336       : subchannel_(std::move(c)) {}
337 
~ConnectedSubchannelStateWatcher()338   ~ConnectedSubchannelStateWatcher() override {
339     subchannel_.reset(DEBUG_LOCATION, "state_watcher");
340   }
341 
342  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)343   void OnConnectivityStateChange(grpc_connectivity_state new_state,
344                                  const absl::Status& status) override {
345     Subchannel* c = subchannel_.get();
346     {
347       MutexLock lock(&c->mu_);
348       // If we're either shutting down or have already seen this connection
349       // failure (i.e., c->connected_subchannel_ is null), do nothing.
350       //
351       // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
352       // upon connection close.  So if the server gracefully shuts down,
353       // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
354       // will see only SHUTDOWN.  Either way, we react to the first one we
355       // see, ignoring anything that happens after that.
356       if (c->connected_subchannel_ == nullptr) return;
357       if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
358           new_state == GRPC_CHANNEL_SHUTDOWN) {
359         if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
360           gpr_log(GPR_INFO,
361                   "subchannel %p %s: Connected subchannel %p reports %s: %s", c,
362                   c->key_.ToString().c_str(), c->connected_subchannel_.get(),
363                   ConnectivityStateName(new_state), status.ToString().c_str());
364         }
365         c->connected_subchannel_.reset();
366         if (c->channelz_node() != nullptr) {
367           c->channelz_node()->SetChildSocket(nullptr);
368         }
369         // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
370         // pass along the status from the transport, since it may have
371         // keepalive info attached to it that the channel needs.
372         // TODO(roth): Consider whether there's a cleaner way to do this.
373         c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
374         c->backoff_.Reset();
375       }
376     }
377     // Drain any connectivity state notifications after releasing the mutex.
378     c->work_serializer_.DrainQueue();
379   }
380 
381   WeakRefCountedPtr<Subchannel> subchannel_;
382 };
383 
384 //
385 // Subchannel::ConnectivityStateWatcherList
386 //
387 
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)388 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
389     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
390   watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
391 }
392 
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)393 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
394     ConnectivityStateWatcherInterface* watcher) {
395   watchers_.erase(watcher);
396 }
397 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)398 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
399     grpc_connectivity_state state, const absl::Status& status) {
400   for (const auto& p : watchers_) {
401     subchannel_->work_serializer_.Schedule(
402         [watcher = p.second->Ref(), state, status]() mutable {
403           auto* watcher_ptr = watcher.get();
404           watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
405                                                  status);
406         },
407         DEBUG_LOCATION);
408   }
409 }
410 
411 //
412 // Subchannel
413 //
414 
415 namespace {
416 
ParseArgsForBackoffValues(const ChannelArgs & args,Duration * min_connect_timeout)417 BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args,
418                                            Duration* min_connect_timeout) {
419   const absl::optional<Duration> fixed_reconnect_backoff =
420       args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms");
421   if (fixed_reconnect_backoff.has_value()) {
422     const Duration backoff =
423         std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff);
424     *min_connect_timeout = backoff;
425     return BackOff::Options()
426         .set_initial_backoff(backoff)
427         .set_multiplier(1.0)
428         .set_jitter(0.0)
429         .set_max_backoff(backoff);
430   }
431   const Duration initial_backoff = std::max(
432       Duration::Milliseconds(100),
433       args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)
434           .value_or(Duration::Seconds(
435               GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS)));
436   *min_connect_timeout =
437       std::max(Duration::Milliseconds(100),
438                args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)
439                    .value_or(Duration::Seconds(
440                        GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS)));
441   const Duration max_backoff =
442       std::max(Duration::Milliseconds(100),
443                args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)
444                    .value_or(Duration::Seconds(
445                        GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS)));
446   return BackOff::Options()
447       .set_initial_backoff(initial_backoff)
448       .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
449       .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER)
450       .set_max_backoff(max_backoff);
451 }
452 
453 }  // namespace
454 
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const ChannelArgs & args)455 Subchannel::Subchannel(SubchannelKey key,
456                        OrphanablePtr<SubchannelConnector> connector,
457                        const ChannelArgs& args)
458     : DualRefCounted<Subchannel>(
459           GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel"
460                                                                   : nullptr),
461       key_(std::move(key)),
462       args_(args),
463       pollset_set_(grpc_pollset_set_create()),
464       connector_(std::move(connector)),
465       watcher_list_(this),
466       work_serializer_(args_.GetObjectRef<EventEngine>()),
467       backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
468       event_engine_(args_.GetObjectRef<EventEngine>()) {
469   // A grpc_init is added here to ensure that grpc_shutdown does not happen
470   // until the subchannel is destroyed. Subchannels can persist longer than
471   // channels because they maybe reused/shared among multiple channels. As a
472   // result the subchannel destruction happens asynchronously to channel
473   // destruction. If the last channel destruction triggers a grpc_shutdown
474   // before the last subchannel destruction, then there maybe race conditions
475   // triggering segmentation faults. To prevent this issue, we call a grpc_init
476   // here and a grpc_shutdown in the subchannel destructor.
477   InitInternally();
478   global_stats().IncrementClientSubchannelsCreated();
479   GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
480                     grpc_schedule_on_exec_ctx);
481   // Check proxy mapper to determine address to connect to and channel
482   // args to use.
483   address_for_connect_ = CoreConfiguration::Get()
484                              .proxy_mapper_registry()
485                              .MapAddress(key_.address(), &args_)
486                              .value_or(key_.address());
487   // Initialize channelz.
488   const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
489                                     .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
490   if (channelz_enabled) {
491     const size_t channel_tracer_max_memory = Clamp(
492         args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
493             .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT),
494         0, INT_MAX);
495     channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
496         grpc_sockaddr_to_uri(&key_.address())
497             .value_or("<unknown address type>"),
498         channel_tracer_max_memory);
499     channelz_node_->AddTraceEvent(
500         channelz::ChannelTrace::Severity::Info,
501         grpc_slice_from_static_string("subchannel created"));
502   }
503 }
504 
~Subchannel()505 Subchannel::~Subchannel() {
506   if (channelz_node_ != nullptr) {
507     channelz_node_->AddTraceEvent(
508         channelz::ChannelTrace::Severity::Info,
509         grpc_slice_from_static_string("Subchannel destroyed"));
510     channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
511   }
512   connector_.reset();
513   grpc_pollset_set_destroy(pollset_set_);
514   // grpc_shutdown is called here because grpc_init is called in the ctor.
515   ShutdownInternally();
516 }
517 
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_resolved_address & address,const ChannelArgs & args)518 RefCountedPtr<Subchannel> Subchannel::Create(
519     OrphanablePtr<SubchannelConnector> connector,
520     const grpc_resolved_address& address, const ChannelArgs& args) {
521   SubchannelKey key(address, args);
522   auto* subchannel_pool = args.GetObject<SubchannelPoolInterface>();
523   GPR_ASSERT(subchannel_pool != nullptr);
524   RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
525   if (c != nullptr) {
526     return c;
527   }
528   c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
529   // Try to register the subchannel before setting the subchannel pool.
530   // Otherwise, in case of a registration race, unreffing c in
531   // RegisterSubchannel() will cause c to be tried to be unregistered, while
532   // its key maps to a different subchannel.
533   RefCountedPtr<Subchannel> registered =
534       subchannel_pool->RegisterSubchannel(c->key_, c);
535   if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
536   return registered;
537 }
538 
ThrottleKeepaliveTime(int new_keepalive_time)539 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
540   MutexLock lock(&mu_);
541   // Only update the value if the new keepalive time is larger.
542   if (new_keepalive_time > keepalive_time_) {
543     keepalive_time_ = new_keepalive_time;
544     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
545       gpr_log(GPR_INFO, "subchannel %p %s: throttling keepalive time to %d",
546               this, key_.ToString().c_str(), new_keepalive_time);
547     }
548     args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time);
549   }
550 }
551 
channelz_node()552 channelz::SubchannelNode* Subchannel::channelz_node() {
553   return channelz_node_.get();
554 }
555 
WatchConnectivityState(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)556 void Subchannel::WatchConnectivityState(
557     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
558   {
559     MutexLock lock(&mu_);
560     grpc_pollset_set* interested_parties = watcher->interested_parties();
561     if (interested_parties != nullptr) {
562       grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
563     }
564     work_serializer_.Schedule(
565         [watcher = watcher->Ref(), state = state_, status = status_]() mutable {
566           auto* watcher_ptr = watcher.get();
567           watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
568                                                  status);
569         },
570         DEBUG_LOCATION);
571     watcher_list_.AddWatcherLocked(std::move(watcher));
572   }
573   // Drain any connectivity state notifications after releasing the mutex.
574   work_serializer_.DrainQueue();
575 }
576 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)577 void Subchannel::CancelConnectivityStateWatch(
578     ConnectivityStateWatcherInterface* watcher) {
579   {
580     MutexLock lock(&mu_);
581     grpc_pollset_set* interested_parties = watcher->interested_parties();
582     if (interested_parties != nullptr) {
583       grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
584     }
585     watcher_list_.RemoveWatcherLocked(watcher);
586   }
587   // Drain any connectivity state notifications after releasing the mutex.
588   // (Shouldn't actually be necessary in this case, but better safe than sorry.)
589   work_serializer_.DrainQueue();
590 }
591 
RequestConnection()592 void Subchannel::RequestConnection() {
593   {
594     MutexLock lock(&mu_);
595     if (state_ == GRPC_CHANNEL_IDLE) {
596       StartConnectingLocked();
597     }
598   }
599   // Drain any connectivity state notifications after releasing the mutex.
600   work_serializer_.DrainQueue();
601 }
602 
ResetBackoff()603 void Subchannel::ResetBackoff() {
604   // Hold a ref to ensure cancellation and subsequent deletion of the closure
605   // does not eliminate the last ref and destroy the Subchannel before the
606   // method returns.
607   auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
608   {
609     MutexLock lock(&mu_);
610     backoff_.Reset();
611     if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
612         event_engine_->Cancel(retry_timer_handle_)) {
613       OnRetryTimerLocked();
614     } else if (state_ == GRPC_CHANNEL_CONNECTING) {
615       next_attempt_time_ = Timestamp::Now();
616     }
617   }
618   // Drain any connectivity state notifications after releasing the mutex.
619   work_serializer_.DrainQueue();
620 }
621 
Orphaned()622 void Subchannel::Orphaned() {
623   // The subchannel_pool is only used once here in this subchannel, so the
624   // access can be outside of the lock.
625   if (subchannel_pool_ != nullptr) {
626     subchannel_pool_->UnregisterSubchannel(key_, this);
627     subchannel_pool_.reset();
628   }
629   {
630     MutexLock lock(&mu_);
631     GPR_ASSERT(!shutdown_);
632     shutdown_ = true;
633     connector_.reset();
634     connected_subchannel_.reset();
635   }
636   // Drain any connectivity state notifications after releasing the mutex.
637   work_serializer_.DrainQueue();
638 }
639 
GetOrAddDataProducer(UniqueTypeName type,std::function<void (DataProducerInterface **)> get_or_add)640 void Subchannel::GetOrAddDataProducer(
641     UniqueTypeName type,
642     std::function<void(DataProducerInterface**)> get_or_add) {
643   MutexLock lock(&mu_);
644   auto it = data_producer_map_.emplace(type, nullptr).first;
645   get_or_add(&it->second);
646 }
647 
RemoveDataProducer(DataProducerInterface * data_producer)648 void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) {
649   MutexLock lock(&mu_);
650   auto it = data_producer_map_.find(data_producer->type());
651   if (it != data_producer_map_.end() && it->second == data_producer) {
652     data_producer_map_.erase(it);
653   }
654 }
655 
656 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)657 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
658                                             const absl::Status& status) {
659   state_ = state;
660   if (status.ok()) {
661     status_ = status;
662   } else {
663     // Augment status message to include IP address.
664     status_ = absl::Status(status.code(),
665                            absl::StrCat(grpc_sockaddr_to_uri(&key_.address())
666                                             .value_or("<unknown address type>"),
667                                         ": ", status.message()));
668     status.ForEachPayload(
669         [this](absl::string_view key, const absl::Cord& value)
670         // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here,
671         // but that won't work, because we can't pass the lock
672         // annotation through absl::Status::ForEachPayload().
673         ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); });
674   }
675   if (channelz_node_ != nullptr) {
676     channelz_node_->UpdateConnectivityState(state);
677     channelz_node_->AddTraceEvent(
678         channelz::ChannelTrace::Severity::Info,
679         grpc_slice_from_cpp_string(absl::StrCat(
680             "Subchannel connectivity state changed to ",
681             ConnectivityStateName(state),
682             status.ok() ? "" : absl::StrCat(": ", status_.ToString()))));
683   }
684   // Notify watchers.
685   watcher_list_.NotifyLocked(state, status_);
686 }
687 
OnRetryTimer()688 void Subchannel::OnRetryTimer() {
689   {
690     MutexLock lock(&mu_);
691     OnRetryTimerLocked();
692   }
693   // Drain any connectivity state notifications after releasing the mutex.
694   work_serializer_.DrainQueue();
695 }
696 
OnRetryTimerLocked()697 void Subchannel::OnRetryTimerLocked() {
698   if (shutdown_) return;
699   gpr_log(GPR_INFO, "subchannel %p %s: backoff delay elapsed, reporting IDLE",
700           this, key_.ToString().c_str());
701   SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus());
702 }
703 
StartConnectingLocked()704 void Subchannel::StartConnectingLocked() {
705   // Set next attempt time.
706   const Timestamp min_deadline = min_connect_timeout_ + Timestamp::Now();
707   next_attempt_time_ = backoff_.NextAttemptTime();
708   // Report CONNECTING.
709   SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
710   // Start connection attempt.
711   SubchannelConnector::Args args;
712   args.address = &address_for_connect_;
713   args.interested_parties = pollset_set_;
714   args.deadline = std::max(next_attempt_time_, min_deadline);
715   args.channel_args = args_;
716   WeakRef(DEBUG_LOCATION, "Connect").release();  // Ref held by callback.
717   connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
718 }
719 
OnConnectingFinished(void * arg,grpc_error_handle error)720 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
721   WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
722   {
723     MutexLock lock(&c->mu_);
724     c->OnConnectingFinishedLocked(error);
725   }
726   // Drain any connectivity state notifications after releasing the mutex.
727   c->work_serializer_.DrainQueue();
728   c.reset(DEBUG_LOCATION, "Connect");
729 }
730 
OnConnectingFinishedLocked(grpc_error_handle error)731 void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
732   if (shutdown_) {
733     connecting_result_.Reset();
734     return;
735   }
736   // If we didn't get a transport or we fail to publish it, report
737   // TRANSIENT_FAILURE and start the retry timer.
738   // Note that if the connection attempt took longer than the backoff
739   // time, then the timer will fire immediately, and we will quickly
740   // transition back to IDLE.
741   if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
742     const Duration time_until_next_attempt =
743         next_attempt_time_ - Timestamp::Now();
744     gpr_log(GPR_INFO,
745             "subchannel %p %s: connect failed (%s), backing off for %" PRId64
746             " ms",
747             this, key_.ToString().c_str(), StatusToString(error).c_str(),
748             time_until_next_attempt.millis());
749     SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
750                                grpc_error_to_absl_status(error));
751     retry_timer_handle_ = event_engine_->RunAfter(
752         time_until_next_attempt,
753         [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
754           {
755             ApplicationCallbackExecCtx callback_exec_ctx;
756             ExecCtx exec_ctx;
757             self->OnRetryTimer();
758             // Subchannel deletion might require an active ExecCtx. So if
759             // self.reset() is not called here, the WeakRefCountedPtr destructor
760             // may run after the ExecCtx declared in the callback is destroyed.
761             // Since subchannel may get destroyed when the WeakRefCountedPtr
762             // destructor runs, it may not have an active ExecCtx - thus leading
763             // to crashes.
764             self.reset();
765           }
766         });
767   }
768 }
769 
PublishTransportLocked()770 bool Subchannel::PublishTransportLocked() {
771   // Construct channel stack.
772   // Builder takes ownership of transport.
773   ChannelStackBuilderImpl builder(
774       "subchannel", GRPC_CLIENT_SUBCHANNEL,
775       connecting_result_.channel_args.SetObject(
776           std::exchange(connecting_result_.transport, nullptr)));
777   if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
778     return false;
779   }
780   absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stk = builder.Build();
781   if (!stk.ok()) {
782     auto error = absl_status_to_grpc_error(stk.status());
783     connecting_result_.Reset();
784     gpr_log(GPR_ERROR,
785             "subchannel %p %s: error initializing subchannel stack: %s", this,
786             key_.ToString().c_str(), StatusToString(error).c_str());
787     return false;
788   }
789   RefCountedPtr<channelz::SocketNode> socket =
790       std::move(connecting_result_.socket_node);
791   connecting_result_.Reset();
792   if (shutdown_) return false;
793   // Publish.
794   connected_subchannel_.reset(
795       new ConnectedSubchannel(stk->release(), args_, channelz_node_));
796   if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel)) {
797     gpr_log(GPR_INFO, "subchannel %p %s: new connected subchannel at %p", this,
798             key_.ToString().c_str(), connected_subchannel_.get());
799   }
800   if (channelz_node_ != nullptr) {
801     channelz_node_->SetChildSocket(std::move(socket));
802   }
803   // Start watching connected subchannel.
804   connected_subchannel_->StartWatch(
805       pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
806                         WeakRef(DEBUG_LOCATION, "state_watcher")));
807   // Report initial state.
808   SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
809   return true;
810 }
811 
812 }  // namespace grpc_core
813