1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25
26 #include <algorithm>
27 #include <cstring>
28
29 #include "absl/strings/str_format.h"
30
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/ext/filters/client_channel/client_channel.h"
35 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
36 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
37 #include "src/core/ext/filters/client_channel/service_config.h"
38 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
39 #include "src/core/lib/address_utils/parse_address.h"
40 #include "src/core/lib/address_utils/sockaddr_utils.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/connected_channel.h"
44 #include "src/core/lib/debug/stats.h"
45 #include "src/core/lib/gpr/alloc.h"
46 #include "src/core/lib/gprpp/debug_location.h"
47 #include "src/core/lib/gprpp/manual_constructor.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/channel_init.h"
54 #include "src/core/lib/transport/connectivity_state.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/uri/uri_parser.h"
58
59 // Strong and weak refs.
60 #define INTERNAL_REF_BITS 16
61 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
62
63 // Backoff parameters.
64 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
65 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
66 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
67 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
69
70 // Conversion between subchannel call and call stack.
71 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
72 (grpc_call_stack*)((char*)(call) + \
73 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
74 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
75 (SubchannelCall*)(((char*)(call_stack)) - \
76 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
77
78 namespace grpc_core {
79
80 TraceFlag grpc_trace_subchannel(false, "subchannel");
81 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
82
83 //
84 // ConnectedSubchannel
85 //
86
ConnectedSubchannel(grpc_channel_stack * channel_stack,const grpc_channel_args * args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)87 ConnectedSubchannel::ConnectedSubchannel(
88 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
89 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
90 : RefCounted<ConnectedSubchannel>(
91 GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
92 ? "ConnectedSubchannel"
93 : nullptr),
94 channel_stack_(channel_stack),
95 args_(grpc_channel_args_copy(args)),
96 channelz_subchannel_(std::move(channelz_subchannel)) {}
97
~ConnectedSubchannel()98 ConnectedSubchannel::~ConnectedSubchannel() {
99 grpc_channel_args_destroy(args_);
100 GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
101 }
102
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)103 void ConnectedSubchannel::StartWatch(
104 grpc_pollset_set* interested_parties,
105 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
106 grpc_transport_op* op = grpc_make_transport_op(nullptr);
107 op->start_connectivity_watch = std::move(watcher);
108 op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
109 op->bind_pollset_set = interested_parties;
110 grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
111 elem->filter->start_transport_op(elem, op);
112 }
113
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)114 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
115 grpc_closure* on_ack) {
116 grpc_transport_op* op = grpc_make_transport_op(nullptr);
117 grpc_channel_element* elem;
118 op->send_ping.on_initiate = on_initiate;
119 op->send_ping.on_ack = on_ack;
120 elem = grpc_channel_stack_element(channel_stack_, 0);
121 elem->filter->start_transport_op(elem, op);
122 }
123
GetInitialCallSizeEstimate() const124 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
125 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
126 channel_stack_->call_stack_size;
127 }
128
129 //
130 // SubchannelCall
131 //
132
Create(Args args,grpc_error_handle * error)133 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
134 grpc_error_handle* error) {
135 const size_t allocation_size =
136 args.connected_subchannel->GetInitialCallSizeEstimate();
137 Arena* arena = args.arena;
138 return RefCountedPtr<SubchannelCall>(new (
139 arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
140 }
141
SubchannelCall(Args args,grpc_error_handle * error)142 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
143 : connected_subchannel_(std::move(args.connected_subchannel)),
144 deadline_(args.deadline) {
145 grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
146 const grpc_call_element_args call_args = {
147 callstk, /* call_stack */
148 nullptr, /* server_transport_data */
149 args.context, /* context */
150 args.path, /* path */
151 args.start_time, /* start_time */
152 args.deadline, /* deadline */
153 args.arena, /* arena */
154 args.call_combiner /* call_combiner */
155 };
156 *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
157 SubchannelCall::Destroy, this, &call_args);
158 if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
159 gpr_log(GPR_ERROR, "error: %s", grpc_error_std_string(*error).c_str());
160 return;
161 }
162 grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
163 auto* channelz_node = connected_subchannel_->channelz_subchannel();
164 if (channelz_node != nullptr) {
165 channelz_node->RecordCallStarted();
166 }
167 }
168
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)169 void SubchannelCall::StartTransportStreamOpBatch(
170 grpc_transport_stream_op_batch* batch) {
171 GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
172 MaybeInterceptRecvTrailingMetadata(batch);
173 grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
174 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
175 GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
176 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
177 }
178
GetCallStack()179 grpc_call_stack* SubchannelCall::GetCallStack() {
180 return SUBCHANNEL_CALL_TO_CALL_STACK(this);
181 }
182
SetAfterCallStackDestroy(grpc_closure * closure)183 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
184 GPR_ASSERT(after_call_stack_destroy_ == nullptr);
185 GPR_ASSERT(closure != nullptr);
186 after_call_stack_destroy_ = closure;
187 }
188
Ref()189 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
190 IncrementRefCount();
191 return RefCountedPtr<SubchannelCall>(this);
192 }
193
Ref(const grpc_core::DebugLocation & location,const char * reason)194 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(
195 const grpc_core::DebugLocation& location, const char* reason) {
196 IncrementRefCount(location, reason);
197 return RefCountedPtr<SubchannelCall>(this);
198 }
199
Unref()200 void SubchannelCall::Unref() {
201 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
202 }
203
Unref(const DebugLocation &,const char * reason)204 void SubchannelCall::Unref(const DebugLocation& /*location*/,
205 const char* reason) {
206 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
207 }
208
Destroy(void * arg,grpc_error_handle)209 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
210 GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
211 SubchannelCall* self = static_cast<SubchannelCall*>(arg);
212 // Keep some members before destroying the subchannel call.
213 grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
214 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
215 std::move(self->connected_subchannel_);
216 // Destroy the subchannel call.
217 self->~SubchannelCall();
218 // Destroy the call stack. This should be after destroying the subchannel
219 // call, because call->after_call_stack_destroy(), if not null, will free the
220 // call arena.
221 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
222 after_call_stack_destroy);
223 // Automatically reset connected_subchannel. This should be after destroying
224 // the call stack, because destroying call stack needs access to the channel
225 // stack.
226 }
227
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)228 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
229 grpc_transport_stream_op_batch* batch) {
230 // only intercept payloads with recv trailing.
231 if (!batch->recv_trailing_metadata) {
232 return;
233 }
234 // only add interceptor is channelz is enabled.
235 if (connected_subchannel_->channelz_subchannel() == nullptr) {
236 return;
237 }
238 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
239 this, grpc_schedule_on_exec_ctx);
240 // save some state needed for the interception callback.
241 GPR_ASSERT(recv_trailing_metadata_ == nullptr);
242 recv_trailing_metadata_ =
243 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
244 original_recv_trailing_metadata_ =
245 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
246 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
247 &recv_trailing_metadata_ready_;
248 }
249
250 namespace {
251
252 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)253 void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
254 grpc_metadata_batch* md_batch, grpc_error_handle error) {
255 if (error != GRPC_ERROR_NONE) {
256 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
257 } else {
258 if (md_batch->idx.named.grpc_status != nullptr) {
259 *status = grpc_get_status_code_from_metadata(
260 md_batch->idx.named.grpc_status->md);
261 } else {
262 *status = GRPC_STATUS_UNKNOWN;
263 }
264 }
265 GRPC_ERROR_UNREF(error);
266 }
267
268 } // namespace
269
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)270 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
271 grpc_error_handle error) {
272 SubchannelCall* call = static_cast<SubchannelCall*>(arg);
273 GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
274 grpc_status_code status = GRPC_STATUS_OK;
275 GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
276 GRPC_ERROR_REF(error));
277 channelz::SubchannelNode* channelz_subchannel =
278 call->connected_subchannel_->channelz_subchannel();
279 GPR_ASSERT(channelz_subchannel != nullptr);
280 if (status == GRPC_STATUS_OK) {
281 channelz_subchannel->RecordCallSucceeded();
282 } else {
283 channelz_subchannel->RecordCallFailed();
284 }
285 Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
286 GRPC_ERROR_REF(error));
287 }
288
IncrementRefCount()289 void SubchannelCall::IncrementRefCount() {
290 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
291 }
292
IncrementRefCount(const grpc_core::DebugLocation &,const char * reason)293 void SubchannelCall::IncrementRefCount(
294 const grpc_core::DebugLocation& /*location*/, const char* reason) {
295 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
296 }
297
298 //
299 // Subchannel::ConnectedSubchannelStateWatcher
300 //
301
302 class Subchannel::ConnectedSubchannelStateWatcher
303 : public AsyncConnectivityStateWatcherInterface {
304 public:
305 // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)306 explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
307 : subchannel_(std::move(c)) {}
308
~ConnectedSubchannelStateWatcher()309 ~ConnectedSubchannelStateWatcher() override {
310 subchannel_.reset(DEBUG_LOCATION, "state_watcher");
311 }
312
313 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)314 void OnConnectivityStateChange(grpc_connectivity_state new_state,
315 const absl::Status& status) override {
316 Subchannel* c = subchannel_.get();
317 MutexLock lock(&c->mu_);
318 switch (new_state) {
319 case GRPC_CHANNEL_TRANSIENT_FAILURE:
320 case GRPC_CHANNEL_SHUTDOWN: {
321 if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
322 if (grpc_trace_subchannel.enabled()) {
323 gpr_log(GPR_INFO,
324 "Connected subchannel %p of subchannel %p has gone into "
325 "%s. Attempting to reconnect.",
326 c->connected_subchannel_.get(), c,
327 ConnectivityStateName(new_state));
328 }
329 c->connected_subchannel_.reset();
330 if (c->channelz_node() != nullptr) {
331 c->channelz_node()->SetChildSocket(nullptr);
332 }
333 // We need to construct our own status if the underlying state was
334 // shutdown since the accompanying status will be StatusCode::OK
335 // otherwise.
336 c->SetConnectivityStateLocked(
337 GRPC_CHANNEL_TRANSIENT_FAILURE,
338 new_state == GRPC_CHANNEL_SHUTDOWN
339 ? absl::Status(absl::StatusCode::kUnavailable,
340 "Subchannel has disconnected.")
341 : status);
342 c->backoff_begun_ = false;
343 c->backoff_.Reset();
344 }
345 break;
346 }
347 default: {
348 // In principle, this should never happen. We should not get
349 // a callback for READY, because that was the state we started
350 // this watch from. And a connected subchannel should never go
351 // from READY to CONNECTING or IDLE.
352 c->SetConnectivityStateLocked(new_state, status);
353 }
354 }
355 }
356
357 WeakRefCountedPtr<Subchannel> subchannel_;
358 };
359
360 // Asynchronously notifies the \a watcher of a change in the connectvity state
361 // of \a subchannel to the current \a state. Deletes itself when done.
362 class Subchannel::AsyncWatcherNotifierLocked {
363 public:
AsyncWatcherNotifierLocked(RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)364 AsyncWatcherNotifierLocked(
365 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,
366 Subchannel* subchannel, grpc_connectivity_state state,
367 const absl::Status& status)
368 : watcher_(std::move(watcher)) {
369 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
370 if (state == GRPC_CHANNEL_READY) {
371 connected_subchannel = subchannel->connected_subchannel_;
372 }
373 watcher_->PushConnectivityStateChange(
374 {state, status, std::move(connected_subchannel)});
375 ExecCtx::Run(DEBUG_LOCATION,
376 GRPC_CLOSURE_INIT(
377 &closure_,
378 [](void* arg, grpc_error_handle /*error*/) {
379 auto* self =
380 static_cast<AsyncWatcherNotifierLocked*>(arg);
381 self->watcher_->OnConnectivityStateChange();
382 delete self;
383 },
384 this, nullptr),
385 GRPC_ERROR_NONE);
386 }
387
388 private:
389 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_;
390 grpc_closure closure_;
391 };
392
393 //
394 // Subchannel::ConnectivityStateWatcherList
395 //
396
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)397 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
398 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
399 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
400 }
401
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)402 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
403 ConnectivityStateWatcherInterface* watcher) {
404 watchers_.erase(watcher);
405 }
406
NotifyLocked(Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)407 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
408 Subchannel* subchannel, grpc_connectivity_state state,
409 const absl::Status& status) {
410 for (const auto& p : watchers_) {
411 new AsyncWatcherNotifierLocked(p.second, subchannel, state, status);
412 }
413 }
414
415 //
416 // Subchannel::HealthWatcherMap::HealthWatcher
417 //
418
419 // State needed for tracking the connectivity state with a particular
420 // health check service name.
421 class Subchannel::HealthWatcherMap::HealthWatcher
422 : public AsyncConnectivityStateWatcherInterface {
423 public:
HealthWatcher(WeakRefCountedPtr<Subchannel> c,std::string health_check_service_name)424 HealthWatcher(WeakRefCountedPtr<Subchannel> c,
425 std::string health_check_service_name)
426 : subchannel_(std::move(c)),
427 health_check_service_name_(std::move(health_check_service_name)),
428 state_(subchannel_->state_ == GRPC_CHANNEL_READY
429 ? GRPC_CHANNEL_CONNECTING
430 : subchannel_->state_) {
431 // If the subchannel is already connected, start health checking.
432 if (subchannel_->state_ == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
433 }
434
~HealthWatcher()435 ~HealthWatcher() override {
436 subchannel_.reset(DEBUG_LOCATION, "health_watcher");
437 }
438
health_check_service_name() const439 const std::string& health_check_service_name() const {
440 return health_check_service_name_;
441 }
442
state() const443 grpc_connectivity_state state() const { return state_; }
444
AddWatcherLocked(grpc_connectivity_state initial_state,RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher)445 void AddWatcherLocked(
446 grpc_connectivity_state initial_state,
447 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
448 if (state_ != initial_state) {
449 new AsyncWatcherNotifierLocked(watcher, subchannel_.get(), state_,
450 status_);
451 }
452 watcher_list_.AddWatcherLocked(std::move(watcher));
453 }
454
RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface * watcher)455 void RemoveWatcherLocked(
456 Subchannel::ConnectivityStateWatcherInterface* watcher) {
457 watcher_list_.RemoveWatcherLocked(watcher);
458 }
459
HasWatchers() const460 bool HasWatchers() const { return !watcher_list_.empty(); }
461
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)462 void NotifyLocked(grpc_connectivity_state state, const absl::Status& status)
463 ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) {
464 if (state == GRPC_CHANNEL_READY) {
465 // If we had not already notified for CONNECTING state, do so now.
466 // (We may have missed this earlier, because if the transition
467 // from IDLE to CONNECTING to READY was too quick, the connected
468 // subchannel may not have sent us a notification for CONNECTING.)
469 if (state_ != GRPC_CHANNEL_CONNECTING) {
470 state_ = GRPC_CHANNEL_CONNECTING;
471 status_ = status;
472 watcher_list_.NotifyLocked(subchannel_.get(), state_, status);
473 }
474 // If we've become connected, start health checking.
475 StartHealthCheckingLocked();
476 } else {
477 state_ = state;
478 status_ = status;
479 watcher_list_.NotifyLocked(subchannel_.get(), state_, status);
480 // We're not connected, so stop health checking.
481 health_check_client_.reset();
482 }
483 }
484
Orphan()485 void Orphan() override {
486 watcher_list_.Clear();
487 health_check_client_.reset();
488 Unref();
489 }
490
491 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)492 void OnConnectivityStateChange(grpc_connectivity_state new_state,
493 const absl::Status& status) override {
494 MutexLock lock(&subchannel_->mu_);
495 if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
496 state_ = new_state;
497 status_ = status;
498 watcher_list_.NotifyLocked(subchannel_.get(), new_state, status);
499 }
500 }
501
StartHealthCheckingLocked()502 void StartHealthCheckingLocked()
503 ABSL_EXCLUSIVE_LOCKS_REQUIRED(subchannel_->mu_) {
504 GPR_ASSERT(health_check_client_ == nullptr);
505 health_check_client_ = MakeOrphanable<HealthCheckClient>(
506 health_check_service_name_, subchannel_->connected_subchannel_,
507 subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
508 }
509
510 WeakRefCountedPtr<Subchannel> subchannel_;
511 std::string health_check_service_name_;
512 OrphanablePtr<HealthCheckClient> health_check_client_;
513 grpc_connectivity_state state_;
514 absl::Status status_;
515 ConnectivityStateWatcherList watcher_list_;
516 };
517
518 //
519 // Subchannel::HealthWatcherMap
520 //
521
AddWatcherLocked(WeakRefCountedPtr<Subchannel> subchannel,grpc_connectivity_state initial_state,const std::string & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)522 void Subchannel::HealthWatcherMap::AddWatcherLocked(
523 WeakRefCountedPtr<Subchannel> subchannel,
524 grpc_connectivity_state initial_state,
525 const std::string& health_check_service_name,
526 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
527 // If the health check service name is not already present in the map,
528 // add it.
529 auto it = map_.find(health_check_service_name);
530 HealthWatcher* health_watcher;
531 if (it == map_.end()) {
532 auto w = MakeOrphanable<HealthWatcher>(std::move(subchannel),
533 health_check_service_name);
534 health_watcher = w.get();
535 map_.emplace(health_check_service_name, std::move(w));
536 } else {
537 health_watcher = it->second.get();
538 }
539 // Add the watcher to the entry.
540 health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
541 }
542
RemoveWatcherLocked(const std::string & health_check_service_name,ConnectivityStateWatcherInterface * watcher)543 void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
544 const std::string& health_check_service_name,
545 ConnectivityStateWatcherInterface* watcher) {
546 auto it = map_.find(health_check_service_name);
547 GPR_ASSERT(it != map_.end());
548 it->second->RemoveWatcherLocked(watcher);
549 // If we just removed the last watcher for this service name, remove
550 // the map entry.
551 if (!it->second->HasWatchers()) map_.erase(it);
552 }
553
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)554 void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
555 const absl::Status& status) {
556 for (const auto& p : map_) {
557 p.second->NotifyLocked(state, status);
558 }
559 }
560
561 grpc_connectivity_state
CheckConnectivityStateLocked(Subchannel * subchannel,const std::string & health_check_service_name)562 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
563 Subchannel* subchannel, const std::string& health_check_service_name) {
564 auto it = map_.find(health_check_service_name);
565 if (it == map_.end()) {
566 // If the health check service name is not found in the map, we're
567 // not currently doing a health check for that service name. If the
568 // subchannel's state without health checking is READY, report
569 // CONNECTING, since that's what we'd be in as soon as we do start a
570 // watch. Otherwise, report the channel's state without health checking.
571 return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
572 : subchannel->state_;
573 }
574 HealthWatcher* health_watcher = it->second.get();
575 return health_watcher->state();
576 }
577
ShutdownLocked()578 void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
579
580 //
581 // Subchannel
582 //
583
584 namespace {
585
ParseArgsForBackoffValues(const grpc_channel_args * args,grpc_millis * min_connect_timeout_ms)586 BackOff::Options ParseArgsForBackoffValues(
587 const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) {
588 grpc_millis initial_backoff_ms =
589 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
590 *min_connect_timeout_ms =
591 GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
592 grpc_millis max_backoff_ms =
593 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
594 bool fixed_reconnect_backoff = false;
595 if (args != nullptr) {
596 for (size_t i = 0; i < args->num_args; i++) {
597 if (0 == strcmp(args->args[i].key,
598 "grpc.testing.fixed_reconnect_backoff_ms")) {
599 fixed_reconnect_backoff = true;
600 initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
601 grpc_channel_arg_get_integer(
602 &args->args[i],
603 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
604 } else if (0 ==
605 strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
606 fixed_reconnect_backoff = false;
607 *min_connect_timeout_ms = grpc_channel_arg_get_integer(
608 &args->args[i],
609 {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
610 } else if (0 ==
611 strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
612 fixed_reconnect_backoff = false;
613 max_backoff_ms = grpc_channel_arg_get_integer(
614 &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
615 } else if (0 == strcmp(args->args[i].key,
616 GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
617 fixed_reconnect_backoff = false;
618 initial_backoff_ms = grpc_channel_arg_get_integer(
619 &args->args[i],
620 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
621 }
622 }
623 }
624 return BackOff::Options()
625 .set_initial_backoff(initial_backoff_ms)
626 .set_multiplier(fixed_reconnect_backoff
627 ? 1.0
628 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
629 .set_jitter(fixed_reconnect_backoff ? 0.0
630 : GRPC_SUBCHANNEL_RECONNECT_JITTER)
631 .set_max_backoff(max_backoff_ms);
632 }
633
634 } // namespace
635
PushConnectivityStateChange(ConnectivityStateChange state_change)636 void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
637 ConnectivityStateChange state_change) {
638 MutexLock lock(&mu_);
639 connectivity_state_queue_.push_back(std::move(state_change));
640 }
641
642 Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
PopConnectivityStateChange()643 Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
644 MutexLock lock(&mu_);
645 GPR_ASSERT(!connectivity_state_queue_.empty());
646 ConnectivityStateChange state_change = connectivity_state_queue_.front();
647 connectivity_state_queue_.pop_front();
648 return state_change;
649 }
650
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)651 Subchannel::Subchannel(SubchannelKey key,
652 OrphanablePtr<SubchannelConnector> connector,
653 const grpc_channel_args* args)
654 : DualRefCounted<Subchannel>(
655 GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount) ? "Subchannel"
656 : nullptr),
657 key_(std::move(key)),
658 connector_(std::move(connector)),
659 backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
660 GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
661 pollset_set_ = grpc_pollset_set_create();
662 grpc_resolved_address* addr =
663 static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
664 GetAddressFromSubchannelAddressArg(args, addr);
665 grpc_resolved_address* new_address = nullptr;
666 grpc_channel_args* new_args = nullptr;
667 if (ProxyMapperRegistry::MapAddress(*addr, args, &new_address, &new_args)) {
668 GPR_ASSERT(new_address != nullptr);
669 gpr_free(addr);
670 addr = new_address;
671 }
672 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
673 grpc_arg new_arg = CreateSubchannelAddressArg(addr);
674 gpr_free(addr);
675 args_ = grpc_channel_args_copy_and_add_and_remove(
676 new_args != nullptr ? new_args : args, keys_to_remove,
677 GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
678 gpr_free(new_arg.value.string);
679 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
680 GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
681 grpc_schedule_on_exec_ctx);
682 const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
683 const bool channelz_enabled =
684 grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
685 arg = grpc_channel_args_find(
686 args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
687 const grpc_integer_options options = {
688 GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
689 size_t channel_tracer_max_memory =
690 static_cast<size_t>(grpc_channel_arg_get_integer(arg, options));
691 if (channelz_enabled) {
692 channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
693 GetTargetAddress(), channel_tracer_max_memory);
694 channelz_node_->AddTraceEvent(
695 channelz::ChannelTrace::Severity::Info,
696 grpc_slice_from_static_string("subchannel created"));
697 }
698 }
699
~Subchannel()700 Subchannel::~Subchannel() {
701 if (channelz_node_ != nullptr) {
702 channelz_node_->AddTraceEvent(
703 channelz::ChannelTrace::Severity::Info,
704 grpc_slice_from_static_string("Subchannel destroyed"));
705 channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
706 }
707 grpc_channel_args_destroy(args_);
708 connector_.reset();
709 grpc_pollset_set_destroy(pollset_set_);
710 }
711
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)712 RefCountedPtr<Subchannel> Subchannel::Create(
713 OrphanablePtr<SubchannelConnector> connector,
714 const grpc_channel_args* args) {
715 SubchannelKey key(args);
716 SubchannelPoolInterface* subchannel_pool =
717 SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
718 GPR_ASSERT(subchannel_pool != nullptr);
719 RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
720 if (c != nullptr) {
721 return c;
722 }
723 c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
724 // Try to register the subchannel before setting the subchannel pool.
725 // Otherwise, in case of a registration race, unreffing c in
726 // RegisterSubchannel() will cause c to be tried to be unregistered, while
727 // its key maps to a different subchannel.
728 RefCountedPtr<Subchannel> registered =
729 subchannel_pool->RegisterSubchannel(c->key_, c);
730 if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
731 return registered;
732 }
733
ThrottleKeepaliveTime(int new_keepalive_time)734 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
735 MutexLock lock(&mu_);
736 // Only update the value if the new keepalive time is larger.
737 if (new_keepalive_time > keepalive_time_) {
738 keepalive_time_ = new_keepalive_time;
739 if (grpc_trace_subchannel.enabled()) {
740 gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this,
741 new_keepalive_time);
742 }
743 const grpc_arg arg_to_add = grpc_channel_arg_integer_create(
744 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time);
745 const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS;
746 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
747 args_, &arg_to_remove, 1, &arg_to_add, 1);
748 grpc_channel_args_destroy(args_);
749 args_ = new_args;
750 }
751 }
752
GetTargetAddress()753 const char* Subchannel::GetTargetAddress() {
754 const grpc_arg* addr_arg =
755 grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS);
756 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
757 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
758 return addr_str;
759 }
760
channelz_node()761 channelz::SubchannelNode* Subchannel::channelz_node() {
762 return channelz_node_.get();
763 }
764
CheckConnectivityState(const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectedSubchannel> * connected_subchannel)765 grpc_connectivity_state Subchannel::CheckConnectivityState(
766 const absl::optional<std::string>& health_check_service_name,
767 RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
768 MutexLock lock(&mu_);
769 grpc_connectivity_state state;
770 if (!health_check_service_name.has_value()) {
771 state = state_;
772 } else {
773 state = health_watcher_map_.CheckConnectivityStateLocked(
774 this, *health_check_service_name);
775 }
776 if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
777 *connected_subchannel = connected_subchannel_;
778 }
779 return state;
780 }
781
WatchConnectivityState(grpc_connectivity_state initial_state,const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)782 void Subchannel::WatchConnectivityState(
783 grpc_connectivity_state initial_state,
784 const absl::optional<std::string>& health_check_service_name,
785 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
786 MutexLock lock(&mu_);
787 grpc_pollset_set* interested_parties = watcher->interested_parties();
788 if (interested_parties != nullptr) {
789 grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
790 }
791 if (!health_check_service_name.has_value()) {
792 if (state_ != initial_state) {
793 new AsyncWatcherNotifierLocked(watcher, this, state_, status_);
794 }
795 watcher_list_.AddWatcherLocked(std::move(watcher));
796 } else {
797 health_watcher_map_.AddWatcherLocked(
798 WeakRef(DEBUG_LOCATION, "health_watcher"), initial_state,
799 *health_check_service_name, std::move(watcher));
800 }
801 }
802
CancelConnectivityStateWatch(const absl::optional<std::string> & health_check_service_name,ConnectivityStateWatcherInterface * watcher)803 void Subchannel::CancelConnectivityStateWatch(
804 const absl::optional<std::string>& health_check_service_name,
805 ConnectivityStateWatcherInterface* watcher) {
806 MutexLock lock(&mu_);
807 grpc_pollset_set* interested_parties = watcher->interested_parties();
808 if (interested_parties != nullptr) {
809 grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
810 }
811 if (!health_check_service_name.has_value()) {
812 watcher_list_.RemoveWatcherLocked(watcher);
813 } else {
814 health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
815 watcher);
816 }
817 }
818
AttemptToConnect()819 void Subchannel::AttemptToConnect() {
820 MutexLock lock(&mu_);
821 MaybeStartConnectingLocked();
822 }
823
ResetBackoff()824 void Subchannel::ResetBackoff() {
825 MutexLock lock(&mu_);
826 backoff_.Reset();
827 if (have_retry_alarm_) {
828 retry_immediately_ = true;
829 grpc_timer_cancel(&retry_alarm_);
830 } else {
831 backoff_begun_ = false;
832 MaybeStartConnectingLocked();
833 }
834 }
835
Orphan()836 void Subchannel::Orphan() {
837 // The subchannel_pool is only used once here in this subchannel, so the
838 // access can be outside of the lock.
839 if (subchannel_pool_ != nullptr) {
840 subchannel_pool_->UnregisterSubchannel(key_, this);
841 subchannel_pool_.reset();
842 }
843 MutexLock lock(&mu_);
844 GPR_ASSERT(!disconnected_);
845 disconnected_ = true;
846 connector_.reset();
847 connected_subchannel_.reset();
848 health_watcher_map_.ShutdownLocked();
849 }
850
CreateSubchannelAddressArg(const grpc_resolved_address * addr)851 grpc_arg Subchannel::CreateSubchannelAddressArg(
852 const grpc_resolved_address* addr) {
853 return grpc_channel_arg_string_create(
854 const_cast<char*>(GRPC_ARG_SUBCHANNEL_ADDRESS),
855 gpr_strdup(addr->len > 0 ? grpc_sockaddr_to_uri(addr).c_str() : ""));
856 }
857
GetUriFromSubchannelAddressArg(const grpc_channel_args * args)858 const char* Subchannel::GetUriFromSubchannelAddressArg(
859 const grpc_channel_args* args) {
860 const grpc_arg* addr_arg =
861 grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
862 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
863 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
864 return addr_str;
865 }
866
867 namespace {
868
UriToSockaddr(const char * uri_str,grpc_resolved_address * addr)869 void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) {
870 absl::StatusOr<URI> uri = URI::Parse(uri_str);
871 if (!uri.ok()) {
872 gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
873 GPR_ASSERT(uri.ok());
874 }
875 if (!grpc_parse_uri(*uri, addr)) memset(addr, 0, sizeof(*addr));
876 }
877
878 } // namespace
879
GetAddressFromSubchannelAddressArg(const grpc_channel_args * args,grpc_resolved_address * addr)880 void Subchannel::GetAddressFromSubchannelAddressArg(
881 const grpc_channel_args* args, grpc_resolved_address* addr) {
882 const char* addr_uri_str = GetUriFromSubchannelAddressArg(args);
883 memset(addr, 0, sizeof(*addr));
884 if (*addr_uri_str != '\0') {
885 UriToSockaddr(addr_uri_str, addr);
886 }
887 }
888
889 namespace {
890
891 // Returns a string indicating the subchannel's connectivity state change to
892 // \a state.
SubchannelConnectivityStateChangeString(grpc_connectivity_state state)893 const char* SubchannelConnectivityStateChangeString(
894 grpc_connectivity_state state) {
895 switch (state) {
896 case GRPC_CHANNEL_IDLE:
897 return "Subchannel state change to IDLE";
898 case GRPC_CHANNEL_CONNECTING:
899 return "Subchannel state change to CONNECTING";
900 case GRPC_CHANNEL_READY:
901 return "Subchannel state change to READY";
902 case GRPC_CHANNEL_TRANSIENT_FAILURE:
903 return "Subchannel state change to TRANSIENT_FAILURE";
904 case GRPC_CHANNEL_SHUTDOWN:
905 return "Subchannel state change to SHUTDOWN";
906 }
907 GPR_UNREACHABLE_CODE(return "UNKNOWN");
908 }
909
910 } // namespace
911
912 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)913 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
914 const absl::Status& status) {
915 state_ = state;
916 status_ = status;
917 if (channelz_node_ != nullptr) {
918 channelz_node_->UpdateConnectivityState(state);
919 channelz_node_->AddTraceEvent(
920 channelz::ChannelTrace::Severity::Info,
921 grpc_slice_from_static_string(
922 SubchannelConnectivityStateChangeString(state)));
923 }
924 // Notify non-health watchers.
925 watcher_list_.NotifyLocked(this, state, status);
926 // Notify health watchers.
927 health_watcher_map_.NotifyLocked(state, status);
928 }
929
MaybeStartConnectingLocked()930 void Subchannel::MaybeStartConnectingLocked() {
931 if (disconnected_) {
932 // Don't try to connect if we're already disconnected.
933 return;
934 }
935 if (connecting_) {
936 // Already connecting: don't restart.
937 return;
938 }
939 if (connected_subchannel_ != nullptr) {
940 // Already connected: don't restart.
941 return;
942 }
943 connecting_ = true;
944 WeakRef(DEBUG_LOCATION, "connecting")
945 .release(); // ref held by pending connect
946 if (!backoff_begun_) {
947 backoff_begun_ = true;
948 ContinueConnectingLocked();
949 } else {
950 GPR_ASSERT(!have_retry_alarm_);
951 have_retry_alarm_ = true;
952 const grpc_millis time_til_next =
953 next_attempt_deadline_ - ExecCtx::Get()->Now();
954 if (time_til_next <= 0) {
955 gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this);
956 } else {
957 gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds",
958 this, time_til_next);
959 }
960 GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
961 grpc_schedule_on_exec_ctx);
962 grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
963 }
964 }
965
OnRetryAlarm(void * arg,grpc_error_handle error)966 void Subchannel::OnRetryAlarm(void* arg, grpc_error_handle error) {
967 WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
968 MutexLock lock(&c->mu_);
969 c->have_retry_alarm_ = false;
970 if (c->disconnected_) {
971 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
972 &error, 1);
973 } else if (c->retry_immediately_) {
974 c->retry_immediately_ = false;
975 error = GRPC_ERROR_NONE;
976 } else {
977 GRPC_ERROR_REF(error);
978 }
979 if (error == GRPC_ERROR_NONE) {
980 gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
981 c->ContinueConnectingLocked();
982 // Still connecting, keep ref around. Note that this stolen ref won't
983 // be dropped without first acquiring c->mu_.
984 c.release();
985 }
986 GRPC_ERROR_UNREF(error);
987 }
988
ContinueConnectingLocked()989 void Subchannel::ContinueConnectingLocked() {
990 SubchannelConnector::Args args;
991 args.interested_parties = pollset_set_;
992 const grpc_millis min_deadline =
993 min_connect_timeout_ms_ + ExecCtx::Get()->Now();
994 next_attempt_deadline_ = backoff_.NextAttemptTime();
995 args.deadline = std::max(next_attempt_deadline_, min_deadline);
996 args.channel_args = args_;
997 SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status());
998 connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
999 }
1000
OnConnectingFinished(void * arg,grpc_error_handle error)1001 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
1002 WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
1003 const grpc_channel_args* delete_channel_args =
1004 c->connecting_result_.channel_args;
1005 {
1006 MutexLock lock(&c->mu_);
1007 c->connecting_ = false;
1008 if (c->connecting_result_.transport != nullptr &&
1009 c->PublishTransportLocked()) {
1010 // Do nothing, transport was published.
1011 } else if (!c->disconnected_) {
1012 gpr_log(GPR_INFO, "Connect failed: %s",
1013 grpc_error_std_string(error).c_str());
1014 c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
1015 grpc_error_to_absl_status(error));
1016 }
1017 }
1018 grpc_channel_args_destroy(delete_channel_args);
1019 c.reset(DEBUG_LOCATION, "connecting");
1020 }
1021
1022 namespace {
1023
ConnectionDestroy(void * arg,grpc_error_handle)1024 void ConnectionDestroy(void* arg, grpc_error_handle /*error*/) {
1025 grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
1026 grpc_channel_stack_destroy(stk);
1027 gpr_free(stk);
1028 }
1029
1030 } // namespace
1031
PublishTransportLocked()1032 bool Subchannel::PublishTransportLocked() {
1033 // Construct channel stack.
1034 grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
1035 grpc_channel_stack_builder_set_channel_arguments(
1036 builder, connecting_result_.channel_args);
1037 grpc_channel_stack_builder_set_transport(builder,
1038 connecting_result_.transport);
1039 if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
1040 grpc_channel_stack_builder_destroy(builder);
1041 return false;
1042 }
1043 grpc_channel_stack* stk;
1044 grpc_error_handle error = grpc_channel_stack_builder_finish(
1045 builder, 0, 1, ConnectionDestroy, nullptr,
1046 reinterpret_cast<void**>(&stk));
1047 if (error != GRPC_ERROR_NONE) {
1048 grpc_transport_destroy(connecting_result_.transport);
1049 gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
1050 grpc_error_std_string(error).c_str());
1051 GRPC_ERROR_UNREF(error);
1052 return false;
1053 }
1054 RefCountedPtr<channelz::SocketNode> socket =
1055 std::move(connecting_result_.socket_node);
1056 connecting_result_.Reset();
1057 if (disconnected_) {
1058 grpc_channel_stack_destroy(stk);
1059 gpr_free(stk);
1060 return false;
1061 }
1062 // Publish.
1063 connected_subchannel_.reset(
1064 new ConnectedSubchannel(stk, args_, channelz_node_));
1065 gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
1066 connected_subchannel_.get(), this);
1067 if (channelz_node_ != nullptr) {
1068 channelz_node_->SetChildSocket(std::move(socket));
1069 }
1070 // Start watching connected subchannel.
1071 connected_subchannel_->StartWatch(
1072 pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
1073 WeakRef(DEBUG_LOCATION, "state_watcher")));
1074 // Report initial state.
1075 SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
1076 return true;
1077 }
1078
1079 } // namespace grpc_core
1080