• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 
26 #include <algorithm>
27 #include <cstring>
28 
29 #include "absl/strings/str_format.h"
30 
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/string_util.h>
33 
34 #include "src/core/ext/filters/client_channel/client_channel.h"
35 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
36 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
37 #include "src/core/ext/filters/client_channel/service_config.h"
38 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
39 #include "src/core/lib/address_utils/parse_address.h"
40 #include "src/core/lib/address_utils/sockaddr_utils.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/connected_channel.h"
44 #include "src/core/lib/debug/stats.h"
45 #include "src/core/lib/gpr/alloc.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/channel_init.h"
54 #include "src/core/lib/transport/connectivity_state.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/uri/uri_parser.h"
58 
59 // Strong and weak refs.
60 #define INTERNAL_REF_BITS 16
61 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
62 
63 // Backoff parameters.
64 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
65 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
66 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
67 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
69 
70 // Conversion between subchannel call and call stack.
71 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
72   (grpc_call_stack*)((char*)(call) +        \
73                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
74 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
75   (SubchannelCall*)(((char*)(call_stack)) -      \
76                     GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
77 
78 namespace grpc_core {
79 
80 TraceFlag grpc_trace_subchannel(false, "subchannel");
81 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
82 
83 //
84 // ConnectedSubchannel
85 //
86 
ConnectedSubchannel(grpc_channel_stack * channel_stack,const grpc_channel_args * args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)87 ConnectedSubchannel::ConnectedSubchannel(
88     grpc_channel_stack* channel_stack, const grpc_channel_args* args,
89     RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
90     : RefCounted<ConnectedSubchannel>(
91           GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
92               ? "ConnectedSubchannel"
93               : nullptr),
94       channel_stack_(channel_stack),
95       args_(grpc_channel_args_copy(args)),
96       channelz_subchannel_(std::move(channelz_subchannel)) {}
97 
~ConnectedSubchannel()98 ConnectedSubchannel::~ConnectedSubchannel() {
99   grpc_channel_args_destroy(args_);
100   GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
101 }
102 
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)103 void ConnectedSubchannel::StartWatch(
104     grpc_pollset_set* interested_parties,
105     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
106   grpc_transport_op* op = grpc_make_transport_op(nullptr);
107   op->start_connectivity_watch = std::move(watcher);
108   op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
109   op->bind_pollset_set = interested_parties;
110   grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
111   elem->filter->start_transport_op(elem, op);
112 }
113 
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)114 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
115                                grpc_closure* on_ack) {
116   grpc_transport_op* op = grpc_make_transport_op(nullptr);
117   grpc_channel_element* elem;
118   op->send_ping.on_initiate = on_initiate;
119   op->send_ping.on_ack = on_ack;
120   elem = grpc_channel_stack_element(channel_stack_, 0);
121   elem->filter->start_transport_op(elem, op);
122 }
123 
GetInitialCallSizeEstimate() const124 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
125   return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
126          channel_stack_->call_stack_size;
127 }
128 
129 //
130 // SubchannelCall
131 //
132 
Create(Args args,grpc_error_handle * error)133 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
134                                                      grpc_error_handle* error) {
135   const size_t allocation_size =
136       args.connected_subchannel->GetInitialCallSizeEstimate();
137   Arena* arena = args.arena;
138   return RefCountedPtr<SubchannelCall>(new (
139       arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
140 }
141 
SubchannelCall(Args args,grpc_error_handle * error)142 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
143     : connected_subchannel_(std::move(args.connected_subchannel)),
144       deadline_(args.deadline) {
145   grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
146   const grpc_call_element_args call_args = {
147       callstk,           /* call_stack */
148       nullptr,           /* server_transport_data */
149       args.context,      /* context */
150       args.path,         /* path */
151       args.start_time,   /* start_time */
152       args.deadline,     /* deadline */
153       args.arena,        /* arena */
154       args.call_combiner /* call_combiner */
155   };
156   *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
157                                 SubchannelCall::Destroy, this, &call_args);
158   if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
159     gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str());
160     return;
161   }
162   grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
163   auto* channelz_node = connected_subchannel_->channelz_subchannel();
164   if (channelz_node != nullptr) {
165     channelz_node->RecordCallStarted();
166   }
167 }
168 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)169 void SubchannelCall::StartTransportStreamOpBatch(
170     grpc_transport_stream_op_batch* batch) {
171   GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
172   MaybeInterceptRecvTrailingMetadata(batch);
173   grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
174   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
175   GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
176   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
177 }
178 
GetCallStack()179 grpc_call_stack* SubchannelCall::GetCallStack() {
180   return SUBCHANNEL_CALL_TO_CALL_STACK(this);
181 }
182 
SetAfterCallStackDestroy(grpc_closure * closure)183 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
184   GPR_ASSERT(after_call_stack_destroy_ == nullptr);
185   GPR_ASSERT(closure != nullptr);
186   after_call_stack_destroy_ = closure;
187 }
188 
Ref()189 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
190   IncrementRefCount();
191   return RefCountedPtr<SubchannelCall>(this);
192 }
193 
Ref(const grpc_core::DebugLocation & location,const char * reason)194 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(
195     const grpc_core::DebugLocation& location, const char* reason) {
196   IncrementRefCount(location, reason);
197   return RefCountedPtr<SubchannelCall>(this);
198 }
199 
Unref()200 void SubchannelCall::Unref() {
201   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
202 }
203 
Unref(const DebugLocation &,const char * reason)204 void SubchannelCall::Unref(const DebugLocation& /*location*/,
205                            const char* reason) {
206   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
207 }
208 
Destroy(void * arg,grpc_error_handle)209 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
210   GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
211   SubchannelCall* self = static_cast<SubchannelCall*>(arg);
212   // Keep some members before destroying the subchannel call.
213   grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
214   RefCountedPtr<ConnectedSubchannel> connected_subchannel =
215       std::move(self->connected_subchannel_);
216   // Destroy the subchannel call.
217   self->~SubchannelCall();
218   // Destroy the call stack. This should be after destroying the subchannel
219   // call, because call->after_call_stack_destroy(), if not null, will free the
220   // call arena.
221   grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
222                           after_call_stack_destroy);
223   // Automatically reset connected_subchannel. This should be after destroying
224   // the call stack, because destroying call stack needs access to the channel
225   // stack.
226 }
227 
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)228 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
229     grpc_transport_stream_op_batch* batch) {
230   // only intercept payloads with recv trailing.
231   if (!batch->recv_trailing_metadata) {
232     return;
233   }
234   // only add interceptor is channelz is enabled.
235   if (connected_subchannel_->channelz_subchannel() == nullptr) {
236     return;
237   }
238   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
239                     this, grpc_schedule_on_exec_ctx);
240   // save some state needed for the interception callback.
241   GPR_ASSERT(recv_trailing_metadata_ == nullptr);
242   recv_trailing_metadata_ =
243       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
244   original_recv_trailing_metadata_ =
245       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
246   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
247       &recv_trailing_metadata_ready_;
248 }
249 
250 namespace {
251 
252 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)253 void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
254                    grpc_metadata_batch* md_batch, grpc_error_handle error) {
255   if (error != GRPC_ERROR_NONE) {
256     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
257   } else {
258     if (md_batch->idx.named.grpc_status != nullptr) {
259       *status = grpc_get_status_code_from_metadata(
260           md_batch->idx.named.grpc_status->md);
261     } else {
262       *status = GRPC_STATUS_UNKNOWN;
263     }
264   }
265   GRPC_ERROR_UNREF(error);
266 }
267 
268 }  // namespace
269 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)270 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
271                                                grpc_error_handle error) {
272   SubchannelCall* call = static_cast<SubchannelCall*>(arg);
273   GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
274   grpc_status_code status = GRPC_STATUS_OK;
275   GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
276                 GRPC_ERROR_REF(error));
277   channelz::SubchannelNode* channelz_subchannel =
278       call->connected_subchannel_->channelz_subchannel();
279   GPR_ASSERT(channelz_subchannel != nullptr);
280   if (status == GRPC_STATUS_OK) {
281     channelz_subchannel->RecordCallSucceeded();
282   } else {
283     channelz_subchannel->RecordCallFailed();
284   }
285   Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
286                GRPC_ERROR_REF(error));
287 }
288 
IncrementRefCount()289 void SubchannelCall::IncrementRefCount() {
290   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
291 }
292 
IncrementRefCount(const grpc_core::DebugLocation &,const char * reason)293 void SubchannelCall::IncrementRefCount(
294     const grpc_core::DebugLocation& /*location*/, const char* reason) {
295   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
296 }
297 
298 //
299 // Subchannel::ConnectedSubchannelStateWatcher
300 //
301 
302 class Subchannel::ConnectedSubchannelStateWatcher
303     : public AsyncConnectivityStateWatcherInterface {
304  public:
305   // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)306   explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
307       : subchannel_(std::move(c)) {}
308 
~ConnectedSubchannelStateWatcher()309   ~ConnectedSubchannelStateWatcher() override {
310     subchannel_.reset(DEBUG_LOCATION, "state_watcher");
311   }
312 
313  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)314   void OnConnectivityStateChange(grpc_connectivity_state new_state,
315                                  const absl::Status& status) override {
316     Subchannel* c = subchannel_.get();
317     MutexLock lock(&c->mu_);
318     switch (new_state) {
319       case GRPC_CHANNEL_TRANSIENT_FAILURE:
320       case GRPC_CHANNEL_SHUTDOWN: {
321         if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
322           if (grpc_trace_subchannel.enabled()) {
323             gpr_log(GPR_INFO,
324                     "Connected subchannel %p of subchannel %p has gone into "
325                     "%s. Attempting to reconnect.",
326                     c->connected_subchannel_.get(), c,
327                     ConnectivityStateName(new_state));
328           }
329           c->connected_subchannel_.reset();
330           if (c->channelz_node() != nullptr) {
331             c->channelz_node()->SetChildSocket(nullptr);
332           }
333           // We need to construct our own status if the underlying state was
334           // shutdown since the accompanying status will be StatusCode::OK
335           // otherwise.
336           c->SetConnectivityStateLocked(
337               GRPC_CHANNEL_TRANSIENT_FAILURE,
338               new_state == GRPC_CHANNEL_SHUTDOWN
339                   ? absl::Status(absl::StatusCode::kUnavailable,
340                                  "Subchannel has disconnected.")
341                   : status);
342           c->backoff_begun_ = false;
343           c->backoff_.Reset();
344         }
345         break;
346       }
347       default: {
348         // In principle, this should never happen.  We should not get
349         // a callback for READY, because that was the state we started
350         // this watch from.  And a connected subchannel should never go
351         // from READY to CONNECTING or IDLE.
352         c->SetConnectivityStateLocked(new_state, status);
353       }
354     }
355   }
356 
357   WeakRefCountedPtr<Subchannel> subchannel_;
358 };
359 
360 // Asynchronously notifies the \a watcher of a change in the connectvity state
361 // of \a subchannel to the current \a state. Deletes itself when done.
362 class Subchannel::AsyncWatcherNotifierLocked {
363  public:
AsyncWatcherNotifierLocked(RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)364   AsyncWatcherNotifierLocked(
365       RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,
366       Subchannel* subchannel, grpc_connectivity_state state,
367       const absl::Status& status)
368       : watcher_(std::move(watcher)) {
369     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
370     if (state == GRPC_CHANNEL_READY) {
371       connected_subchannel = subchannel->connected_subchannel_;
372     }
373     watcher_->PushConnectivityStateChange(
374         {state, status, std::move(connected_subchannel)});
375     ExecCtx::Run(DEBUG_LOCATION,
376                  GRPC_CLOSURE_INIT(
377                      &closure_,
378                      [](void* arg, grpc_error_handle /*error*/) {
379                        auto* self =
380                            static_cast<AsyncWatcherNotifierLocked*>(arg);
381                        self->watcher_->OnConnectivityStateChange();
382                        delete self;
383                      },
384                      this, nullptr),
385                  GRPC_ERROR_NONE);
386   }
387 
388  private:
389   RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_;
390   grpc_closure closure_;
391 };
392 
393 //
394 // Subchannel::ConnectivityStateWatcherList
395 //
396 
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)397 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
398     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
399   watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
400 }
401 
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)402 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
403     ConnectivityStateWatcherInterface* watcher) {
404   watchers_.erase(watcher);
405 }
406 
NotifyLocked(Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)407 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
408     Subchannel* subchannel, grpc_connectivity_state state,
409     const absl::Status& status) {
410   for (const auto& p : watchers_) {
411     new AsyncWatcherNotifierLocked(p.second, subchannel, state, status);
412   }
413 }
414 
415 //
416 // Subchannel::HealthWatcherMap::HealthWatcher
417 //
418 
419 // State needed for tracking the connectivity state with a particular
420 // health check service name.
421 class Subchannel::HealthWatcherMap::HealthWatcher
422     : public AsyncConnectivityStateWatcherInterface {
423  public:
HealthWatcher(WeakRefCountedPtr<Subchannel> c,std::string health_check_service_name)424   HealthWatcher(WeakRefCountedPtr<Subchannel> c,
425                 std::string health_check_service_name)
426       : subchannel_(std::move(c)),
427         health_check_service_name_(std::move(health_check_service_name)),
428         state_(subchannel_->state_ == GRPC_CHANNEL_READY
429                    ? GRPC_CHANNEL_CONNECTING
430                    : subchannel_->state_) {
431     // If the subchannel is already connected, start health checking.
432     if (subchannel_->state_ == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
433   }
434 
~HealthWatcher()435   ~HealthWatcher() override {
436     subchannel_.reset(DEBUG_LOCATION, "health_watcher");
437   }
438 
health_check_service_name() const439   const std::string& health_check_service_name() const {
440     return health_check_service_name_;
441   }
442 
state() const443   grpc_connectivity_state state() const { return state_; }
444 
AddWatcherLocked(grpc_connectivity_state initial_state,RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher)445   void AddWatcherLocked(
446       grpc_connectivity_state initial_state,
447       RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
448     if (state_ != initial_state) {
449       new AsyncWatcherNotifierLocked(watcher, subchannel_.get(), state_,
450                                      status_);
451     }
452     watcher_list_.AddWatcherLocked(std::move(watcher));
453   }
454 
RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface * watcher)455   void RemoveWatcherLocked(
456       Subchannel::ConnectivityStateWatcherInterface* watcher) {
457     watcher_list_.RemoveWatcherLocked(watcher);
458   }
459 
HasWatchers() const460   bool HasWatchers() const { return !watcher_list_.empty(); }
461 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)462   void NotifyLocked(grpc_connectivity_state state, const absl::Status& status)
463       ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) {
464     if (state == GRPC_CHANNEL_READY) {
465       // If we had not already notified for CONNECTING state, do so now.
466       // (We may have missed this earlier, because if the transition
467       // from IDLE to CONNECTING to READY was too quick, the connected
468       // subchannel may not have sent us a notification for CONNECTING.)
469       if (state_ != GRPC_CHANNEL_CONNECTING) {
470         state_ = GRPC_CHANNEL_CONNECTING;
471         status_ = status;
472         watcher_list_.NotifyLocked(subchannel_.get(), state_, status);
473       }
474       // If we've become connected, start health checking.
475       StartHealthCheckingLocked();
476     } else {
477       state_ = state;
478       status_ = status;
479       watcher_list_.NotifyLocked(subchannel_.get(), state_, status);
480       // We're not connected, so stop health checking.
481       health_check_client_.reset();
482     }
483   }
484 
Orphan()485   void Orphan() override {
486     watcher_list_.Clear();
487     health_check_client_.reset();
488     Unref();
489   }
490 
491  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)492   void OnConnectivityStateChange(grpc_connectivity_state new_state,
493                                  const absl::Status& status) override {
494     MutexLock lock(&subchannel_->mu_);
495     if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
496       state_ = new_state;
497       status_ = status;
498       watcher_list_.NotifyLocked(subchannel_.get(), new_state, status);
499     }
500   }
501 
StartHealthCheckingLocked()502   void StartHealthCheckingLocked()
503       ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) {
504     GPR_ASSERT(health_check_client_ == nullptr);
505     health_check_client_ = MakeOrphanable<HealthCheckClient>(
506         health_check_service_name_, subchannel_->connected_subchannel_,
507         subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
508   }
509 
510   WeakRefCountedPtr<Subchannel> subchannel_;
511   std::string health_check_service_name_;
512   OrphanablePtr<HealthCheckClient> health_check_client_;
513   grpc_connectivity_state state_;
514   absl::Status status_;
515   ConnectivityStateWatcherList watcher_list_;
516 };
517 
518 //
519 // Subchannel::HealthWatcherMap
520 //
521 
AddWatcherLocked(WeakRefCountedPtr<Subchannel> subchannel,grpc_connectivity_state initial_state,const std::string & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)522 void Subchannel::HealthWatcherMap::AddWatcherLocked(
523     WeakRefCountedPtr<Subchannel> subchannel,
524     grpc_connectivity_state initial_state,
525     const std::string& health_check_service_name,
526     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
527   // If the health check service name is not already present in the map,
528   // add it.
529   auto it = map_.find(health_check_service_name);
530   HealthWatcher* health_watcher;
531   if (it == map_.end()) {
532     auto w = MakeOrphanable<HealthWatcher>(std::move(subchannel),
533                                            health_check_service_name);
534     health_watcher = w.get();
535     map_.emplace(health_check_service_name, std::move(w));
536   } else {
537     health_watcher = it->second.get();
538   }
539   // Add the watcher to the entry.
540   health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
541 }
542 
RemoveWatcherLocked(const std::string & health_check_service_name,ConnectivityStateWatcherInterface * watcher)543 void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
544     const std::string& health_check_service_name,
545     ConnectivityStateWatcherInterface* watcher) {
546   auto it = map_.find(health_check_service_name);
547   GPR_ASSERT(it != map_.end());
548   it->second->RemoveWatcherLocked(watcher);
549   // If we just removed the last watcher for this service name, remove
550   // the map entry.
551   if (!it->second->HasWatchers()) map_.erase(it);
552 }
553 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)554 void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
555                                                 const absl::Status& status) {
556   for (const auto& p : map_) {
557     p.second->NotifyLocked(state, status);
558   }
559 }
560 
561 grpc_connectivity_state
CheckConnectivityStateLocked(Subchannel * subchannel,const std::string & health_check_service_name)562 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
563     Subchannel* subchannel, const std::string& health_check_service_name) {
564   auto it = map_.find(health_check_service_name);
565   if (it == map_.end()) {
566     // If the health check service name is not found in the map, we're
567     // not currently doing a health check for that service name.  If the
568     // subchannel's state without health checking is READY, report
569     // CONNECTING, since that's what we'd be in as soon as we do start a
570     // watch.  Otherwise, report the channel's state without health checking.
571     return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
572                                                     : subchannel->state_;
573   }
574   HealthWatcher* health_watcher = it->second.get();
575   return health_watcher->state();
576 }
577 
ShutdownLocked()578 void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
579 
580 //
581 // Subchannel
582 //
583 
584 namespace {
585 
ParseArgsForBackoffValues(const grpc_channel_args * args,grpc_millis * min_connect_timeout_ms)586 BackOff::Options ParseArgsForBackoffValues(
587     const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) {
588   grpc_millis initial_backoff_ms =
589       GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
590   *min_connect_timeout_ms =
591       GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
592   grpc_millis max_backoff_ms =
593       GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
594   bool fixed_reconnect_backoff = false;
595   if (args != nullptr) {
596     for (size_t i = 0; i < args->num_args; i++) {
597       if (0 == strcmp(args->args[i].key,
598                       "grpc.testing.fixed_reconnect_backoff_ms")) {
599         fixed_reconnect_backoff = true;
600         initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
601             grpc_channel_arg_get_integer(
602                 &args->args[i],
603                 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
604       } else if (0 ==
605                  strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
606         fixed_reconnect_backoff = false;
607         *min_connect_timeout_ms = grpc_channel_arg_get_integer(
608             &args->args[i],
609             {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
610       } else if (0 ==
611                  strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
612         fixed_reconnect_backoff = false;
613         max_backoff_ms = grpc_channel_arg_get_integer(
614             &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
615       } else if (0 == strcmp(args->args[i].key,
616                              GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
617         fixed_reconnect_backoff = false;
618         initial_backoff_ms = grpc_channel_arg_get_integer(
619             &args->args[i],
620             {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
621       }
622     }
623   }
624   return BackOff::Options()
625       .set_initial_backoff(initial_backoff_ms)
626       .set_multiplier(fixed_reconnect_backoff
627                           ? 1.0
628                           : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
629       .set_jitter(fixed_reconnect_backoff ? 0.0
630                                           : GRPC_SUBCHANNEL_RECONNECT_JITTER)
631       .set_max_backoff(max_backoff_ms);
632 }
633 
634 }  // namespace
635 
PushConnectivityStateChange(ConnectivityStateChange state_change)636 void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
637     ConnectivityStateChange state_change) {
638   MutexLock lock(&mu_);
639   connectivity_state_queue_.push_back(std::move(state_change));
640 }
641 
642 Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
PopConnectivityStateChange()643 Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
644   MutexLock lock(&mu_);
645   GPR_ASSERT(!connectivity_state_queue_.empty());
646   ConnectivityStateChange state_change = connectivity_state_queue_.front();
647   connectivity_state_queue_.pop_front();
648   return state_change;
649 }
650 
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)651 Subchannel::Subchannel(SubchannelKey key,
652                        OrphanablePtr<SubchannelConnector> connector,
653                        const grpc_channel_args* args)
654     : DualRefCounted<Subchannel>(
655           GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel"
656                                                                   : nullptr),
657       key_(std::move(key)),
658       connector_(std::move(connector)),
659       backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
660   GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
661   pollset_set_ = grpc_pollset_set_create();
662   grpc_resolved_address* addr =
663       static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
664   GetAddressFromSubchannelAddressArg(args, addr);
665   grpc_resolved_address* new_address = nullptr;
666   grpc_channel_args* new_args = nullptr;
667   if (ProxyMapperRegistry::MapAddress(*addr, args, &new_address, &new_args)) {
668     GPR_ASSERT(new_address != nullptr);
669     gpr_free(addr);
670     addr = new_address;
671   }
672   static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
673   grpc_arg new_arg = CreateSubchannelAddressArg(addr);
674   gpr_free(addr);
675   args_ = grpc_channel_args_copy_and_add_and_remove(
676       new_args != nullptr ? new_args : args, keys_to_remove,
677       GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
678   gpr_free(new_arg.value.string);
679   if (new_args != nullptr) grpc_channel_args_destroy(new_args);
680   GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
681                     grpc_schedule_on_exec_ctx);
682   const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
683   const bool channelz_enabled =
684       grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
685   arg = grpc_channel_args_find(
686       args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
687   const grpc_integer_options options = {
688       GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
689   size_t channel_tracer_max_memory =
690       static_cast<size_t>(grpc_channel_arg_get_integer(arg, options));
691   if (channelz_enabled) {
692     channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
693         GetTargetAddress(), channel_tracer_max_memory);
694     channelz_node_->AddTraceEvent(
695         channelz::ChannelTrace::Severity::Info,
696         grpc_slice_from_static_string("subchannel created"));
697   }
698 }
699 
~Subchannel()700 Subchannel::~Subchannel() {
701   if (channelz_node_ != nullptr) {
702     channelz_node_->AddTraceEvent(
703         channelz::ChannelTrace::Severity::Info,
704         grpc_slice_from_static_string("Subchannel destroyed"));
705     channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
706   }
707   grpc_channel_args_destroy(args_);
708   connector_.reset();
709   grpc_pollset_set_destroy(pollset_set_);
710 }
711 
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)712 RefCountedPtr<Subchannel> Subchannel::Create(
713     OrphanablePtr<SubchannelConnector> connector,
714     const grpc_channel_args* args) {
715   SubchannelKey key(args);
716   SubchannelPoolInterface* subchannel_pool =
717       SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
718   GPR_ASSERT(subchannel_pool != nullptr);
719   RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
720   if (c != nullptr) {
721     return c;
722   }
723   c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
724   // Try to register the subchannel before setting the subchannel pool.
725   // Otherwise, in case of a registration race, unreffing c in
726   // RegisterSubchannel() will cause c to be tried to be unregistered, while
727   // its key maps to a different subchannel.
728   RefCountedPtr<Subchannel> registered =
729       subchannel_pool->RegisterSubchannel(c->key_, c);
730   if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
731   return registered;
732 }
733 
ThrottleKeepaliveTime(int new_keepalive_time)734 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
735   MutexLock lock(&mu_);
736   // Only update the value if the new keepalive time is larger.
737   if (new_keepalive_time > keepalive_time_) {
738     keepalive_time_ = new_keepalive_time;
739     if (grpc_trace_subchannel.enabled()) {
740       gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this,
741               new_keepalive_time);
742     }
743     const grpc_arg arg_to_add = grpc_channel_arg_integer_create(
744         const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time);
745     const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS;
746     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
747         args_, &arg_to_remove, 1, &arg_to_add, 1);
748     grpc_channel_args_destroy(args_);
749     args_ = new_args;
750   }
751 }
752 
GetTargetAddress()753 const char* Subchannel::GetTargetAddress() {
754   const grpc_arg* addr_arg =
755       grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS);
756   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
757   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
758   return addr_str;
759 }
760 
channelz_node()761 channelz::SubchannelNode* Subchannel::channelz_node() {
762   return channelz_node_.get();
763 }
764 
CheckConnectivityState(const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectedSubchannel> * connected_subchannel)765 grpc_connectivity_state Subchannel::CheckConnectivityState(
766     const absl::optional<std::string>& health_check_service_name,
767     RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
768   MutexLock lock(&mu_);
769   grpc_connectivity_state state;
770   if (!health_check_service_name.has_value()) {
771     state = state_;
772   } else {
773     state = health_watcher_map_.CheckConnectivityStateLocked(
774         this, *health_check_service_name);
775   }
776   if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
777     *connected_subchannel = connected_subchannel_;
778   }
779   return state;
780 }
781 
WatchConnectivityState(grpc_connectivity_state initial_state,const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)782 void Subchannel::WatchConnectivityState(
783     grpc_connectivity_state initial_state,
784     const absl::optional<std::string>& health_check_service_name,
785     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
786   MutexLock lock(&mu_);
787   grpc_pollset_set* interested_parties = watcher->interested_parties();
788   if (interested_parties != nullptr) {
789     grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
790   }
791   if (!health_check_service_name.has_value()) {
792     if (state_ != initial_state) {
793       new AsyncWatcherNotifierLocked(watcher, this, state_, status_);
794     }
795     watcher_list_.AddWatcherLocked(std::move(watcher));
796   } else {
797     health_watcher_map_.AddWatcherLocked(
798         WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state,
799         *health_check_service_name, std::move(watcher));
800   }
801 }
802 
CancelConnectivityStateWatch(const absl::optional<std::string> & health_check_service_name,ConnectivityStateWatcherInterface * watcher)803 void Subchannel::CancelConnectivityStateWatch(
804     const absl::optional<std::string>& health_check_service_name,
805     ConnectivityStateWatcherInterface* watcher) {
806   MutexLock lock(&mu_);
807   grpc_pollset_set* interested_parties = watcher->interested_parties();
808   if (interested_parties != nullptr) {
809     grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
810   }
811   if (!health_check_service_name.has_value()) {
812     watcher_list_.RemoveWatcherLocked(watcher);
813   } else {
814     health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
815                                             watcher);
816   }
817 }
818 
AttemptToConnect()819 void Subchannel::AttemptToConnect() {
820   MutexLock lock(&mu_);
821   MaybeStartConnectingLocked();
822 }
823 
ResetBackoff()824 void Subchannel::ResetBackoff() {
825   MutexLock lock(&mu_);
826   backoff_.Reset();
827   if (have_retry_alarm_) {
828     retry_immediately_ = true;
829     grpc_timer_cancel(&retry_alarm_);
830   } else {
831     backoff_begun_ = false;
832     MaybeStartConnectingLocked();
833   }
834 }
835 
Orphan()836 void Subchannel::Orphan() {
837   // The subchannel_pool is only used once here in this subchannel, so the
838   // access can be outside of the lock.
839   if (subchannel_pool_ != nullptr) {
840     subchannel_pool_->UnregisterSubchannel(key_, this);
841     subchannel_pool_.reset();
842   }
843   MutexLock lock(&mu_);
844   GPR_ASSERT(!disconnected_);
845   disconnected_ = true;
846   connector_.reset();
847   connected_subchannel_.reset();
848   health_watcher_map_.ShutdownLocked();
849 }
850 
CreateSubchannelAddressArg(const grpc_resolved_address * addr)851 grpc_arg Subchannel::CreateSubchannelAddressArg(
852     const grpc_resolved_address* addr) {
853   return grpc_channel_arg_string_create(
854       const_cast<char*>(GRPC_ARG_SUBCHANNEL_ADDRESS),
855       gpr_strdup(addr->len > 0 ? grpc_sockaddr_to_uri(addr).c_str() : ""));
856 }
857 
GetUriFromSubchannelAddressArg(const grpc_channel_args * args)858 const char* Subchannel::GetUriFromSubchannelAddressArg(
859     const grpc_channel_args* args) {
860   const grpc_arg* addr_arg =
861       grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
862   const char* addr_str = grpc_channel_arg_get_string(addr_arg);
863   GPR_ASSERT(addr_str != nullptr);  // Should have been set by LB policy.
864   return addr_str;
865 }
866 
867 namespace {
868 
UriToSockaddr(const char * uri_str,grpc_resolved_address * addr)869 void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) {
870   absl::StatusOr<URI> uri = URI::Parse(uri_str);
871   if (!uri.ok()) {
872     gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
873     GPR_ASSERT(uri.ok());
874   }
875   if (!grpc_parse_uri(*uri, addr)) memset(addr, 0, sizeof(*addr));
876 }
877 
878 }  // namespace
879 
GetAddressFromSubchannelAddressArg(const grpc_channel_args * args,grpc_resolved_address * addr)880 void Subchannel::GetAddressFromSubchannelAddressArg(
881     const grpc_channel_args* args, grpc_resolved_address* addr) {
882   const char* addr_uri_str = GetUriFromSubchannelAddressArg(args);
883   memset(addr, 0, sizeof(*addr));
884   if (*addr_uri_str != '\0') {
885     UriToSockaddr(addr_uri_str, addr);
886   }
887 }
888 
889 namespace {
890 
891 // Returns a string indicating the subchannel's connectivity state change to
892 // \a state.
SubchannelConnectivityStateChangeString(grpc_connectivity_state state)893 const char* SubchannelConnectivityStateChangeString(
894     grpc_connectivity_state state) {
895   switch (state) {
896     case GRPC_CHANNEL_IDLE:
897       return "Subchannel state change to IDLE";
898     case GRPC_CHANNEL_CONNECTING:
899       return "Subchannel state change to CONNECTING";
900     case GRPC_CHANNEL_READY:
901       return "Subchannel state change to READY";
902     case GRPC_CHANNEL_TRANSIENT_FAILURE:
903       return "Subchannel state change to TRANSIENT_FAILURE";
904     case GRPC_CHANNEL_SHUTDOWN:
905       return "Subchannel state change to SHUTDOWN";
906   }
907   GPR_UNREACHABLE_CODE(return "UNKNOWN");
908 }
909 
910 }  // namespace
911 
912 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)913 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
914                                             const absl::Status& status) {
915   state_ = state;
916   status_ = status;
917   if (channelz_node_ != nullptr) {
918     channelz_node_->UpdateConnectivityState(state);
919     channelz_node_->AddTraceEvent(
920         channelz::ChannelTrace::Severity::Info,
921         grpc_slice_from_static_string(
922             SubchannelConnectivityStateChangeString(state)));
923   }
924   // Notify non-health watchers.
925   watcher_list_.NotifyLocked(this, state, status);
926   // Notify health watchers.
927   health_watcher_map_.NotifyLocked(state, status);
928 }
929 
MaybeStartConnectingLocked()930 void Subchannel::MaybeStartConnectingLocked() {
931   if (disconnected_) {
932     // Don't try to connect if we're already disconnected.
933     return;
934   }
935   if (connecting_) {
936     // Already connecting: don't restart.
937     return;
938   }
939   if (connected_subchannel_ != nullptr) {
940     // Already connected: don't restart.
941     return;
942   }
943   connecting_ = true;
944   WeakRef(DEBUG_LOCATION, "connecting")
945       .release();  // ref held by pending connect
946   if (!backoff_begun_) {
947     backoff_begun_ = true;
948     ContinueConnectingLocked();
949   } else {
950     GPR_ASSERT(!have_retry_alarm_);
951     have_retry_alarm_ = true;
952     const grpc_millis time_til_next =
953         next_attempt_deadline_ - ExecCtx::Get()->Now();
954     if (time_til_next <= 0) {
955       gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this);
956     } else {
957       gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds",
958               this, time_til_next);
959     }
960     GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
961                       grpc_schedule_on_exec_ctx);
962     grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
963   }
964 }
965 
OnRetryAlarm(void * arg,grpc_error_handle error)966 void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) {
967   WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
968   MutexLock lock(&c->mu_);
969   c->have_retry_alarm_ = false;
970   if (c->disconnected_) {
971     error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
972                                                              &error, 1);
973   } else if (c->retry_immediately_) {
974     c->retry_immediately_ = false;
975     error = GRPC_ERROR_NONE;
976   } else {
977     GRPC_ERROR_REF(error);
978   }
979   if (error == GRPC_ERROR_NONE) {
980     gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
981     c->ContinueConnectingLocked();
982     // Still connecting, keep ref around. Note that this stolen ref won't
983     // be dropped without first acquiring c->mu_.
984     c.release();
985   }
986   GRPC_ERROR_UNREF(error);
987 }
988 
ContinueConnectingLocked()989 void Subchannel::ContinueConnectingLocked() {
990   SubchannelConnector::Args args;
991   args.interested_parties = pollset_set_;
992   const grpc_millis min_deadline =
993       min_connect_timeout_ms_ + ExecCtx::Get()->Now();
994   next_attempt_deadline_ = backoff_.NextAttemptTime();
995   args.deadline = std::max(next_attempt_deadline_, min_deadline);
996   args.channel_args = args_;
997   SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status());
998   connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
999 }
1000 
OnConnectingFinished(void * arg,grpc_error_handle error)1001 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
1002   WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
1003   const grpc_channel_args* delete_channel_args =
1004       c->connecting_result_.channel_args;
1005   {
1006     MutexLock lock(&c->mu_);
1007     c->connecting_ = false;
1008     if (c->connecting_result_.transport != nullptr &&
1009         c->PublishTransportLocked()) {
1010       // Do nothing, transport was published.
1011     } else if (!c->disconnected_) {
1012       gpr_log(GPR_INFO, "Connect failed: %s",
1013               grpc_error_std_string(error).c_str());
1014       c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
1015                                     grpc_error_to_absl_status(error));
1016     }
1017   }
1018   grpc_channel_args_destroy(delete_channel_args);
1019   c.reset(DEBUG_LOCATION, "connecting");
1020 }
1021 
1022 namespace {
1023 
ConnectionDestroy(void * arg,grpc_error_handle)1024 void ConnectionDestroy(void* arg, grpc_error_handle /*error*/) {
1025   grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
1026   grpc_channel_stack_destroy(stk);
1027   gpr_free(stk);
1028 }
1029 
1030 }  // namespace
1031 
PublishTransportLocked()1032 bool Subchannel::PublishTransportLocked() {
1033   // Construct channel stack.
1034   grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
1035   grpc_channel_stack_builder_set_channel_arguments(
1036       builder, connecting_result_.channel_args);
1037   grpc_channel_stack_builder_set_transport(builder,
1038                                            connecting_result_.transport);
1039   if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
1040     grpc_channel_stack_builder_destroy(builder);
1041     return false;
1042   }
1043   grpc_channel_stack* stk;
1044   grpc_error_handle error = grpc_channel_stack_builder_finish(
1045       builder, 0, 1, ConnectionDestroy, nullptr,
1046       reinterpret_cast<void**>(&stk));
1047   if (error != GRPC_ERROR_NONE) {
1048     grpc_transport_destroy(connecting_result_.transport);
1049     gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
1050             grpc_error_std_string(error).c_str());
1051     GRPC_ERROR_UNREF(error);
1052     return false;
1053   }
1054   RefCountedPtr<channelz::SocketNode> socket =
1055       std::move(connecting_result_.socket_node);
1056   connecting_result_.Reset();
1057   if (disconnected_) {
1058     grpc_channel_stack_destroy(stk);
1059     gpr_free(stk);
1060     return false;
1061   }
1062   // Publish.
1063   connected_subchannel_.reset(
1064       new ConnectedSubchannel(stk, args_, channelz_node_));
1065   gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
1066           connected_subchannel_.get(), this);
1067   if (channelz_node_ != nullptr) {
1068     channelz_node_->SetChildSocket(std::move(socket));
1069   }
1070   // Start watching connected subchannel.
1071   connected_subchannel_->StartWatch(
1072       pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
1073                         WeakRef(DEBUG_LOCATION, "state_watcher")));
1074   // Report initial state.
1075   SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
1076   return true;
1077 }
1078 
1079 }  // namespace grpc_core
1080