1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25
26 #include <algorithm>
27 #include <cstring>
28
29 #include "absl/strings/str_format.h"
30
31 #include <grpc/support/alloc.h>
32 #include <grpc/support/string_util.h>
33
34 #include "src/core/ext/filters/client_channel/client_channel.h"
35 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
36 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
37 #include "src/core/ext/filters/client_channel/service_config.h"
38 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
39 #include "src/core/lib/backoff/backoff.h"
40 #include "src/core/lib/channel/channel_args.h"
41 #include "src/core/lib/channel/connected_channel.h"
42 #include "src/core/lib/debug/stats.h"
43 #include "src/core/lib/gpr/alloc.h"
44 #include "src/core/lib/gprpp/debug_location.h"
45 #include "src/core/lib/gprpp/manual_constructor.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "src/core/lib/iomgr/parse_address.h"
49 #include "src/core/lib/iomgr/sockaddr_utils.h"
50 #include "src/core/lib/profiling/timers.h"
51 #include "src/core/lib/slice/slice_internal.h"
52 #include "src/core/lib/surface/channel.h"
53 #include "src/core/lib/surface/channel_init.h"
54 #include "src/core/lib/transport/connectivity_state.h"
55 #include "src/core/lib/transport/error_utils.h"
56 #include "src/core/lib/transport/status_metadata.h"
57 #include "src/core/lib/uri/uri_parser.h"
58
59 // Strong and weak refs.
60 #define INTERNAL_REF_BITS 16
61 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
62
63 // Backoff parameters.
64 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
65 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
66 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
67 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
68 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
69
70 // Conversion between subchannel call and call stack.
71 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
72 (grpc_call_stack*)((char*)(call) + \
73 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
74 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
75 (SubchannelCall*)(((char*)(call_stack)) - \
76 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
77
78 namespace grpc_core {
79
80 TraceFlag grpc_trace_subchannel(false, "subchannel");
81 DebugOnlyTraceFlag grpc_trace_subchannel_refcount(false, "subchannel_refcount");
82
83 //
84 // ConnectedSubchannel
85 //
86
ConnectedSubchannel(grpc_channel_stack * channel_stack,const grpc_channel_args * args,RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)87 ConnectedSubchannel::ConnectedSubchannel(
88 grpc_channel_stack* channel_stack, const grpc_channel_args* args,
89 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel)
90 : RefCounted<ConnectedSubchannel>(
91 GRPC_TRACE_FLAG_ENABLED(grpc_trace_subchannel_refcount)
92 ? "ConnectedSubchannel"
93 : nullptr),
94 channel_stack_(channel_stack),
95 args_(grpc_channel_args_copy(args)),
96 channelz_subchannel_(std::move(channelz_subchannel)) {}
97
~ConnectedSubchannel()98 ConnectedSubchannel::~ConnectedSubchannel() {
99 grpc_channel_args_destroy(args_);
100 GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
101 }
102
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)103 void ConnectedSubchannel::StartWatch(
104 grpc_pollset_set* interested_parties,
105 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
106 grpc_transport_op* op = grpc_make_transport_op(nullptr);
107 op->start_connectivity_watch = std::move(watcher);
108 op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
109 op->bind_pollset_set = interested_parties;
110 grpc_channel_element* elem = grpc_channel_stack_element(channel_stack_, 0);
111 elem->filter->start_transport_op(elem, op);
112 }
113
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)114 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
115 grpc_closure* on_ack) {
116 grpc_transport_op* op = grpc_make_transport_op(nullptr);
117 grpc_channel_element* elem;
118 op->send_ping.on_initiate = on_initiate;
119 op->send_ping.on_ack = on_ack;
120 elem = grpc_channel_stack_element(channel_stack_, 0);
121 elem->filter->start_transport_op(elem, op);
122 }
123
GetInitialCallSizeEstimate() const124 size_t ConnectedSubchannel::GetInitialCallSizeEstimate() const {
125 return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
126 channel_stack_->call_stack_size;
127 }
128
129 //
130 // SubchannelCall
131 //
132
Create(Args args,grpc_error ** error)133 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
134 grpc_error** error) {
135 const size_t allocation_size =
136 args.connected_subchannel->GetInitialCallSizeEstimate();
137 Arena* arena = args.arena;
138 return RefCountedPtr<SubchannelCall>(new (
139 arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
140 }
141
SubchannelCall(Args args,grpc_error ** error)142 SubchannelCall::SubchannelCall(Args args, grpc_error** error)
143 : connected_subchannel_(std::move(args.connected_subchannel)),
144 deadline_(args.deadline) {
145 grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
146 const grpc_call_element_args call_args = {
147 callstk, /* call_stack */
148 nullptr, /* server_transport_data */
149 args.context, /* context */
150 args.path, /* path */
151 args.start_time, /* start_time */
152 args.deadline, /* deadline */
153 args.arena, /* arena */
154 args.call_combiner /* call_combiner */
155 };
156 *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
157 SubchannelCall::Destroy, this, &call_args);
158 if (GPR_UNLIKELY(*error != GRPC_ERROR_NONE)) {
159 const char* error_string = grpc_error_string(*error);
160 gpr_log(GPR_ERROR, "error: %s", error_string);
161 return;
162 }
163 grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
164 auto* channelz_node = connected_subchannel_->channelz_subchannel();
165 if (channelz_node != nullptr) {
166 channelz_node->RecordCallStarted();
167 }
168 }
169
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)170 void SubchannelCall::StartTransportStreamOpBatch(
171 grpc_transport_stream_op_batch* batch) {
172 GPR_TIMER_SCOPE("subchannel_call_process_op", 0);
173 MaybeInterceptRecvTrailingMetadata(batch);
174 grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
175 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
176 GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
177 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
178 }
179
GetCallStack()180 grpc_call_stack* SubchannelCall::GetCallStack() {
181 return SUBCHANNEL_CALL_TO_CALL_STACK(this);
182 }
183
SetAfterCallStackDestroy(grpc_closure * closure)184 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
185 GPR_ASSERT(after_call_stack_destroy_ == nullptr);
186 GPR_ASSERT(closure != nullptr);
187 after_call_stack_destroy_ = closure;
188 }
189
Ref()190 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
191 IncrementRefCount();
192 return RefCountedPtr<SubchannelCall>(this);
193 }
194
Ref(const grpc_core::DebugLocation & location,const char * reason)195 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(
196 const grpc_core::DebugLocation& location, const char* reason) {
197 IncrementRefCount(location, reason);
198 return RefCountedPtr<SubchannelCall>(this);
199 }
200
Unref()201 void SubchannelCall::Unref() {
202 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
203 }
204
Unref(const DebugLocation &,const char * reason)205 void SubchannelCall::Unref(const DebugLocation& /*location*/,
206 const char* reason) {
207 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
208 }
209
Destroy(void * arg,grpc_error *)210 void SubchannelCall::Destroy(void* arg, grpc_error* /*error*/) {
211 GPR_TIMER_SCOPE("subchannel_call_destroy", 0);
212 SubchannelCall* self = static_cast<SubchannelCall*>(arg);
213 // Keep some members before destroying the subchannel call.
214 grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
215 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
216 std::move(self->connected_subchannel_);
217 // Destroy the subchannel call.
218 self->~SubchannelCall();
219 // Destroy the call stack. This should be after destroying the subchannel
220 // call, because call->after_call_stack_destroy(), if not null, will free the
221 // call arena.
222 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
223 after_call_stack_destroy);
224 // Automatically reset connected_subchannel. This should be after destroying
225 // the call stack, because destroying call stack needs access to the channel
226 // stack.
227 }
228
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)229 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
230 grpc_transport_stream_op_batch* batch) {
231 // only intercept payloads with recv trailing.
232 if (!batch->recv_trailing_metadata) {
233 return;
234 }
235 // only add interceptor is channelz is enabled.
236 if (connected_subchannel_->channelz_subchannel() == nullptr) {
237 return;
238 }
239 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
240 this, grpc_schedule_on_exec_ctx);
241 // save some state needed for the interception callback.
242 GPR_ASSERT(recv_trailing_metadata_ == nullptr);
243 recv_trailing_metadata_ =
244 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
245 original_recv_trailing_metadata_ =
246 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
247 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
248 &recv_trailing_metadata_ready_;
249 }
250
251 namespace {
252
253 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error * error)254 void GetCallStatus(grpc_status_code* status, grpc_millis deadline,
255 grpc_metadata_batch* md_batch, grpc_error* error) {
256 if (error != GRPC_ERROR_NONE) {
257 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
258 } else {
259 if (md_batch->idx.named.grpc_status != nullptr) {
260 *status = grpc_get_status_code_from_metadata(
261 md_batch->idx.named.grpc_status->md);
262 } else {
263 *status = GRPC_STATUS_UNKNOWN;
264 }
265 }
266 GRPC_ERROR_UNREF(error);
267 }
268
269 } // namespace
270
RecvTrailingMetadataReady(void * arg,grpc_error * error)271 void SubchannelCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
272 SubchannelCall* call = static_cast<SubchannelCall*>(arg);
273 GPR_ASSERT(call->recv_trailing_metadata_ != nullptr);
274 grpc_status_code status = GRPC_STATUS_OK;
275 GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_,
276 GRPC_ERROR_REF(error));
277 channelz::SubchannelNode* channelz_subchannel =
278 call->connected_subchannel_->channelz_subchannel();
279 GPR_ASSERT(channelz_subchannel != nullptr);
280 if (status == GRPC_STATUS_OK) {
281 channelz_subchannel->RecordCallSucceeded();
282 } else {
283 channelz_subchannel->RecordCallFailed();
284 }
285 Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_,
286 GRPC_ERROR_REF(error));
287 }
288
IncrementRefCount()289 void SubchannelCall::IncrementRefCount() {
290 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
291 }
292
IncrementRefCount(const grpc_core::DebugLocation &,const char * reason)293 void SubchannelCall::IncrementRefCount(
294 const grpc_core::DebugLocation& /*location*/, const char* reason) {
295 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
296 }
297
298 //
299 // Subchannel::ConnectedSubchannelStateWatcher
300 //
301
302 class Subchannel::ConnectedSubchannelStateWatcher
303 : public AsyncConnectivityStateWatcherInterface {
304 public:
305 // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(Subchannel * c)306 explicit ConnectedSubchannelStateWatcher(Subchannel* c) : subchannel_(c) {
307 // Steal subchannel ref for connecting.
308 GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "state_watcher");
309 GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "connecting");
310 }
311
~ConnectedSubchannelStateWatcher()312 ~ConnectedSubchannelStateWatcher() override {
313 GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "state_watcher");
314 }
315
316 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)317 void OnConnectivityStateChange(grpc_connectivity_state new_state,
318 const absl::Status& status) override {
319 Subchannel* c = subchannel_;
320 MutexLock lock(&c->mu_);
321 switch (new_state) {
322 case GRPC_CHANNEL_TRANSIENT_FAILURE:
323 case GRPC_CHANNEL_SHUTDOWN: {
324 if (!c->disconnected_ && c->connected_subchannel_ != nullptr) {
325 if (grpc_trace_subchannel.enabled()) {
326 gpr_log(GPR_INFO,
327 "Connected subchannel %p of subchannel %p has gone into "
328 "%s. Attempting to reconnect.",
329 c->connected_subchannel_.get(), c,
330 ConnectivityStateName(new_state));
331 }
332 c->connected_subchannel_.reset();
333 if (c->channelz_node() != nullptr) {
334 c->channelz_node()->SetChildSocket(nullptr);
335 }
336 // We need to construct our own status if the underlying state was
337 // shutdown since the accompanying status will be StatusCode::OK
338 // otherwise.
339 c->SetConnectivityStateLocked(
340 GRPC_CHANNEL_TRANSIENT_FAILURE,
341 new_state == GRPC_CHANNEL_SHUTDOWN
342 ? absl::Status(absl::StatusCode::kUnavailable,
343 "Subchannel has disconnected.")
344 : status);
345 c->backoff_begun_ = false;
346 c->backoff_.Reset();
347 }
348 break;
349 }
350 default: {
351 // In principle, this should never happen. We should not get
352 // a callback for READY, because that was the state we started
353 // this watch from. And a connected subchannel should never go
354 // from READY to CONNECTING or IDLE.
355 c->SetConnectivityStateLocked(new_state, status);
356 }
357 }
358 }
359
360 Subchannel* subchannel_;
361 };
362
363 // Asynchronously notifies the \a watcher of a change in the connectvity state
364 // of \a subchannel to the current \a state. Deletes itself when done.
365 class Subchannel::AsyncWatcherNotifierLocked {
366 public:
AsyncWatcherNotifierLocked(RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)367 AsyncWatcherNotifierLocked(
368 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher,
369 Subchannel* subchannel, grpc_connectivity_state state,
370 const absl::Status& status)
371 : watcher_(std::move(watcher)) {
372 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
373 if (state == GRPC_CHANNEL_READY) {
374 connected_subchannel = subchannel->connected_subchannel_;
375 }
376 watcher_->PushConnectivityStateChange(
377 {state, status, std::move(connected_subchannel)});
378 ExecCtx::Run(DEBUG_LOCATION,
379 GRPC_CLOSURE_INIT(
380 &closure_,
381 [](void* arg, grpc_error* /*error*/) {
382 auto* self =
383 static_cast<AsyncWatcherNotifierLocked*>(arg);
384 self->watcher_->OnConnectivityStateChange();
385 delete self;
386 },
387 this, nullptr),
388 GRPC_ERROR_NONE);
389 }
390
391 private:
392 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher_;
393 grpc_closure closure_;
394 };
395
396 //
397 // Subchannel::ConnectivityStateWatcherList
398 //
399
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)400 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
401 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
402 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
403 }
404
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)405 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
406 ConnectivityStateWatcherInterface* watcher) {
407 watchers_.erase(watcher);
408 }
409
NotifyLocked(Subchannel * subchannel,grpc_connectivity_state state,const absl::Status & status)410 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
411 Subchannel* subchannel, grpc_connectivity_state state,
412 const absl::Status& status) {
413 for (const auto& p : watchers_) {
414 new AsyncWatcherNotifierLocked(p.second, subchannel, state, status);
415 }
416 }
417
418 //
419 // Subchannel::HealthWatcherMap::HealthWatcher
420 //
421
422 // State needed for tracking the connectivity state with a particular
423 // health check service name.
424 class Subchannel::HealthWatcherMap::HealthWatcher
425 : public AsyncConnectivityStateWatcherInterface {
426 public:
HealthWatcher(Subchannel * c,std::string health_check_service_name,grpc_connectivity_state subchannel_state)427 HealthWatcher(Subchannel* c, std::string health_check_service_name,
428 grpc_connectivity_state subchannel_state)
429 : subchannel_(c),
430 health_check_service_name_(std::move(health_check_service_name)),
431 state_(subchannel_state == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
432 : subchannel_state) {
433 GRPC_SUBCHANNEL_WEAK_REF(subchannel_, "health_watcher");
434 // If the subchannel is already connected, start health checking.
435 if (subchannel_state == GRPC_CHANNEL_READY) StartHealthCheckingLocked();
436 }
437
~HealthWatcher()438 ~HealthWatcher() override {
439 GRPC_SUBCHANNEL_WEAK_UNREF(subchannel_, "health_watcher");
440 }
441
health_check_service_name() const442 const std::string& health_check_service_name() const {
443 return health_check_service_name_;
444 }
445
state() const446 grpc_connectivity_state state() const { return state_; }
447
AddWatcherLocked(grpc_connectivity_state initial_state,RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher)448 void AddWatcherLocked(
449 grpc_connectivity_state initial_state,
450 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface> watcher) {
451 if (state_ != initial_state) {
452 new AsyncWatcherNotifierLocked(watcher, subchannel_, state_, status_);
453 }
454 watcher_list_.AddWatcherLocked(std::move(watcher));
455 }
456
RemoveWatcherLocked(Subchannel::ConnectivityStateWatcherInterface * watcher)457 void RemoveWatcherLocked(
458 Subchannel::ConnectivityStateWatcherInterface* watcher) {
459 watcher_list_.RemoveWatcherLocked(watcher);
460 }
461
HasWatchers() const462 bool HasWatchers() const { return !watcher_list_.empty(); }
463
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)464 void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) {
465 if (state == GRPC_CHANNEL_READY) {
466 // If we had not already notified for CONNECTING state, do so now.
467 // (We may have missed this earlier, because if the transition
468 // from IDLE to CONNECTING to READY was too quick, the connected
469 // subchannel may not have sent us a notification for CONNECTING.)
470 if (state_ != GRPC_CHANNEL_CONNECTING) {
471 state_ = GRPC_CHANNEL_CONNECTING;
472 status_ = status;
473 watcher_list_.NotifyLocked(subchannel_, state_, status);
474 }
475 // If we've become connected, start health checking.
476 StartHealthCheckingLocked();
477 } else {
478 state_ = state;
479 status_ = status;
480 watcher_list_.NotifyLocked(subchannel_, state_, status);
481 // We're not connected, so stop health checking.
482 health_check_client_.reset();
483 }
484 }
485
Orphan()486 void Orphan() override {
487 watcher_list_.Clear();
488 health_check_client_.reset();
489 Unref();
490 }
491
492 private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)493 void OnConnectivityStateChange(grpc_connectivity_state new_state,
494 const absl::Status& status) override {
495 MutexLock lock(&subchannel_->mu_);
496 if (new_state != GRPC_CHANNEL_SHUTDOWN && health_check_client_ != nullptr) {
497 state_ = new_state;
498 status_ = status;
499 watcher_list_.NotifyLocked(subchannel_, new_state, status);
500 }
501 }
502
StartHealthCheckingLocked()503 void StartHealthCheckingLocked() {
504 GPR_ASSERT(health_check_client_ == nullptr);
505 health_check_client_ = MakeOrphanable<HealthCheckClient>(
506 health_check_service_name_, subchannel_->connected_subchannel_,
507 subchannel_->pollset_set_, subchannel_->channelz_node_, Ref());
508 }
509
510 Subchannel* subchannel_;
511 std::string health_check_service_name_;
512 OrphanablePtr<HealthCheckClient> health_check_client_;
513 grpc_connectivity_state state_;
514 absl::Status status_;
515 ConnectivityStateWatcherList watcher_list_;
516 };
517
518 //
519 // Subchannel::HealthWatcherMap
520 //
521
AddWatcherLocked(Subchannel * subchannel,grpc_connectivity_state initial_state,const std::string & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)522 void Subchannel::HealthWatcherMap::AddWatcherLocked(
523 Subchannel* subchannel, grpc_connectivity_state initial_state,
524 const std::string& health_check_service_name,
525 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
526 // If the health check service name is not already present in the map,
527 // add it.
528 auto it = map_.find(health_check_service_name);
529 HealthWatcher* health_watcher;
530 if (it == map_.end()) {
531 auto w = MakeOrphanable<HealthWatcher>(
532 subchannel, health_check_service_name, subchannel->state_);
533 health_watcher = w.get();
534 map_.emplace(health_check_service_name, std::move(w));
535 } else {
536 health_watcher = it->second.get();
537 }
538 // Add the watcher to the entry.
539 health_watcher->AddWatcherLocked(initial_state, std::move(watcher));
540 }
541
RemoveWatcherLocked(const std::string & health_check_service_name,ConnectivityStateWatcherInterface * watcher)542 void Subchannel::HealthWatcherMap::RemoveWatcherLocked(
543 const std::string& health_check_service_name,
544 ConnectivityStateWatcherInterface* watcher) {
545 auto it = map_.find(health_check_service_name);
546 GPR_ASSERT(it != map_.end());
547 it->second->RemoveWatcherLocked(watcher);
548 // If we just removed the last watcher for this service name, remove
549 // the map entry.
550 if (!it->second->HasWatchers()) map_.erase(it);
551 }
552
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)553 void Subchannel::HealthWatcherMap::NotifyLocked(grpc_connectivity_state state,
554 const absl::Status& status) {
555 for (const auto& p : map_) {
556 p.second->NotifyLocked(state, status);
557 }
558 }
559
560 grpc_connectivity_state
CheckConnectivityStateLocked(Subchannel * subchannel,const std::string & health_check_service_name)561 Subchannel::HealthWatcherMap::CheckConnectivityStateLocked(
562 Subchannel* subchannel, const std::string& health_check_service_name) {
563 auto it = map_.find(health_check_service_name);
564 if (it == map_.end()) {
565 // If the health check service name is not found in the map, we're
566 // not currently doing a health check for that service name. If the
567 // subchannel's state without health checking is READY, report
568 // CONNECTING, since that's what we'd be in as soon as we do start a
569 // watch. Otherwise, report the channel's state without health checking.
570 return subchannel->state_ == GRPC_CHANNEL_READY ? GRPC_CHANNEL_CONNECTING
571 : subchannel->state_;
572 }
573 HealthWatcher* health_watcher = it->second.get();
574 return health_watcher->state();
575 }
576
ShutdownLocked()577 void Subchannel::HealthWatcherMap::ShutdownLocked() { map_.clear(); }
578
579 //
580 // Subchannel
581 //
582
583 namespace {
584
ParseArgsForBackoffValues(const grpc_channel_args * args,grpc_millis * min_connect_timeout_ms)585 BackOff::Options ParseArgsForBackoffValues(
586 const grpc_channel_args* args, grpc_millis* min_connect_timeout_ms) {
587 grpc_millis initial_backoff_ms =
588 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
589 *min_connect_timeout_ms =
590 GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
591 grpc_millis max_backoff_ms =
592 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
593 bool fixed_reconnect_backoff = false;
594 if (args != nullptr) {
595 for (size_t i = 0; i < args->num_args; i++) {
596 if (0 == strcmp(args->args[i].key,
597 "grpc.testing.fixed_reconnect_backoff_ms")) {
598 fixed_reconnect_backoff = true;
599 initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
600 grpc_channel_arg_get_integer(
601 &args->args[i],
602 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
603 } else if (0 ==
604 strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
605 fixed_reconnect_backoff = false;
606 *min_connect_timeout_ms = grpc_channel_arg_get_integer(
607 &args->args[i],
608 {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
609 } else if (0 ==
610 strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
611 fixed_reconnect_backoff = false;
612 max_backoff_ms = grpc_channel_arg_get_integer(
613 &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
614 } else if (0 == strcmp(args->args[i].key,
615 GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
616 fixed_reconnect_backoff = false;
617 initial_backoff_ms = grpc_channel_arg_get_integer(
618 &args->args[i],
619 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
620 }
621 }
622 }
623 return BackOff::Options()
624 .set_initial_backoff(initial_backoff_ms)
625 .set_multiplier(fixed_reconnect_backoff
626 ? 1.0
627 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
628 .set_jitter(fixed_reconnect_backoff ? 0.0
629 : GRPC_SUBCHANNEL_RECONNECT_JITTER)
630 .set_max_backoff(max_backoff_ms);
631 }
632
633 } // namespace
634
PushConnectivityStateChange(ConnectivityStateChange state_change)635 void Subchannel::ConnectivityStateWatcherInterface::PushConnectivityStateChange(
636 ConnectivityStateChange state_change) {
637 MutexLock lock(&mu_);
638 connectivity_state_queue_.push_back(std::move(state_change));
639 }
640
641 Subchannel::ConnectivityStateWatcherInterface::ConnectivityStateChange
PopConnectivityStateChange()642 Subchannel::ConnectivityStateWatcherInterface::PopConnectivityStateChange() {
643 MutexLock lock(&mu_);
644 GPR_ASSERT(!connectivity_state_queue_.empty());
645 ConnectivityStateChange state_change = connectivity_state_queue_.front();
646 connectivity_state_queue_.pop_front();
647 return state_change;
648 }
649
Subchannel(SubchannelKey * key,OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)650 Subchannel::Subchannel(SubchannelKey* key,
651 OrphanablePtr<SubchannelConnector> connector,
652 const grpc_channel_args* args)
653 : key_(key),
654 connector_(std::move(connector)),
655 backoff_(ParseArgsForBackoffValues(args, &min_connect_timeout_ms_)) {
656 GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
657 gpr_atm_no_barrier_store(&ref_pair_, 1 << INTERNAL_REF_BITS);
658 pollset_set_ = grpc_pollset_set_create();
659 grpc_resolved_address* addr =
660 static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
661 GetAddressFromSubchannelAddressArg(args, addr);
662 grpc_resolved_address* new_address = nullptr;
663 grpc_channel_args* new_args = nullptr;
664 if (ProxyMapperRegistry::MapAddress(*addr, args, &new_address, &new_args)) {
665 GPR_ASSERT(new_address != nullptr);
666 gpr_free(addr);
667 addr = new_address;
668 }
669 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
670 grpc_arg new_arg = CreateSubchannelAddressArg(addr);
671 gpr_free(addr);
672 args_ = grpc_channel_args_copy_and_add_and_remove(
673 new_args != nullptr ? new_args : args, keys_to_remove,
674 GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
675 gpr_free(new_arg.value.string);
676 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
677 GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
678 grpc_schedule_on_exec_ctx);
679 const grpc_arg* arg = grpc_channel_args_find(args_, GRPC_ARG_ENABLE_CHANNELZ);
680 const bool channelz_enabled =
681 grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT);
682 arg = grpc_channel_args_find(
683 args_, GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE);
684 const grpc_integer_options options = {
685 GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT, 0, INT_MAX};
686 size_t channel_tracer_max_memory =
687 static_cast<size_t>(grpc_channel_arg_get_integer(arg, options));
688 if (channelz_enabled) {
689 channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
690 GetTargetAddress(), channel_tracer_max_memory);
691 channelz_node_->AddTraceEvent(
692 channelz::ChannelTrace::Severity::Info,
693 grpc_slice_from_static_string("subchannel created"));
694 }
695 }
696
~Subchannel()697 Subchannel::~Subchannel() {
698 if (channelz_node_ != nullptr) {
699 channelz_node_->AddTraceEvent(
700 channelz::ChannelTrace::Severity::Info,
701 grpc_slice_from_static_string("Subchannel destroyed"));
702 channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
703 }
704 grpc_channel_args_destroy(args_);
705 connector_.reset();
706 grpc_pollset_set_destroy(pollset_set_);
707 delete key_;
708 }
709
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_channel_args * args)710 Subchannel* Subchannel::Create(OrphanablePtr<SubchannelConnector> connector,
711 const grpc_channel_args* args) {
712 SubchannelKey* key = new SubchannelKey(args);
713 SubchannelPoolInterface* subchannel_pool =
714 SubchannelPoolInterface::GetSubchannelPoolFromChannelArgs(args);
715 GPR_ASSERT(subchannel_pool != nullptr);
716 Subchannel* c = subchannel_pool->FindSubchannel(key);
717 if (c != nullptr) {
718 delete key;
719 return c;
720 }
721 c = new Subchannel(key, std::move(connector), args);
722 // Try to register the subchannel before setting the subchannel pool.
723 // Otherwise, in case of a registration race, unreffing c in
724 // RegisterSubchannel() will cause c to be tried to be unregistered, while
725 // its key maps to a different subchannel.
726 Subchannel* registered = subchannel_pool->RegisterSubchannel(key, c);
727 if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
728 return registered;
729 }
730
ThrottleKeepaliveTime(int new_keepalive_time)731 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
732 MutexLock lock(&mu_);
733 // Only update the value if the new keepalive time is larger.
734 if (new_keepalive_time > keepalive_time_) {
735 keepalive_time_ = new_keepalive_time;
736 if (grpc_trace_subchannel.enabled()) {
737 gpr_log(GPR_INFO, "Subchannel=%p: Throttling keepalive time to %d", this,
738 new_keepalive_time);
739 }
740 const grpc_arg arg_to_add = grpc_channel_arg_integer_create(
741 const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS), new_keepalive_time);
742 const char* arg_to_remove = GRPC_ARG_KEEPALIVE_TIME_MS;
743 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
744 args_, &arg_to_remove, 1, &arg_to_add, 1);
745 grpc_channel_args_destroy(args_);
746 args_ = new_args;
747 }
748 }
749
Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)750 Subchannel* Subchannel::Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
751 gpr_atm old_refs;
752 old_refs = RefMutate((1 << INTERNAL_REF_BITS),
753 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_REF"));
754 GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
755 return this;
756 }
757
Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)758 void Subchannel::Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
759 gpr_atm old_refs;
760 // add a weak ref and subtract a strong ref (atomically)
761 old_refs = RefMutate(
762 static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS),
763 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("STRONG_UNREF"));
764 if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
765 Disconnect();
766 }
767 GRPC_SUBCHANNEL_WEAK_UNREF(this, "strong-unref");
768 }
769
WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)770 Subchannel* Subchannel::WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
771 gpr_atm old_refs;
772 old_refs = RefMutate(1, 0 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_REF"));
773 GPR_ASSERT(old_refs != 0);
774 return this;
775 }
776
777 namespace {
778
subchannel_destroy(void * arg,grpc_error *)779 void subchannel_destroy(void* arg, grpc_error* /*error*/) {
780 Subchannel* self = static_cast<Subchannel*>(arg);
781 delete self;
782 }
783
784 } // namespace
785
WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS)786 void Subchannel::WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
787 gpr_atm old_refs;
788 old_refs = RefMutate(-static_cast<gpr_atm>(1),
789 1 GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE("WEAK_UNREF"));
790 if (old_refs == 1) {
791 ExecCtx::Run(DEBUG_LOCATION,
792 GRPC_CLOSURE_CREATE(subchannel_destroy, this,
793 grpc_schedule_on_exec_ctx),
794 GRPC_ERROR_NONE);
795 }
796 }
797
RefFromWeakRef()798 Subchannel* Subchannel::RefFromWeakRef() {
799 for (;;) {
800 gpr_atm old_refs = gpr_atm_acq_load(&ref_pair_);
801 if (old_refs >= (1 << INTERNAL_REF_BITS)) {
802 gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
803 if (gpr_atm_rel_cas(&ref_pair_, old_refs, new_refs)) {
804 return this;
805 }
806 } else {
807 return nullptr;
808 }
809 }
810 }
811
GetTargetAddress()812 const char* Subchannel::GetTargetAddress() {
813 const grpc_arg* addr_arg =
814 grpc_channel_args_find(args_, GRPC_ARG_SUBCHANNEL_ADDRESS);
815 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
816 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
817 return addr_str;
818 }
819
channelz_node()820 channelz::SubchannelNode* Subchannel::channelz_node() {
821 return channelz_node_.get();
822 }
823
CheckConnectivityState(const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectedSubchannel> * connected_subchannel)824 grpc_connectivity_state Subchannel::CheckConnectivityState(
825 const absl::optional<std::string>& health_check_service_name,
826 RefCountedPtr<ConnectedSubchannel>* connected_subchannel) {
827 MutexLock lock(&mu_);
828 grpc_connectivity_state state;
829 if (!health_check_service_name.has_value()) {
830 state = state_;
831 } else {
832 state = health_watcher_map_.CheckConnectivityStateLocked(
833 this, *health_check_service_name);
834 }
835 if (connected_subchannel != nullptr && state == GRPC_CHANNEL_READY) {
836 *connected_subchannel = connected_subchannel_;
837 }
838 return state;
839 }
840
WatchConnectivityState(grpc_connectivity_state initial_state,const absl::optional<std::string> & health_check_service_name,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)841 void Subchannel::WatchConnectivityState(
842 grpc_connectivity_state initial_state,
843 const absl::optional<std::string>& health_check_service_name,
844 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
845 MutexLock lock(&mu_);
846 grpc_pollset_set* interested_parties = watcher->interested_parties();
847 if (interested_parties != nullptr) {
848 grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
849 }
850 if (!health_check_service_name.has_value()) {
851 if (state_ != initial_state) {
852 new AsyncWatcherNotifierLocked(watcher, this, state_, status_);
853 }
854 watcher_list_.AddWatcherLocked(std::move(watcher));
855 } else {
856 health_watcher_map_.AddWatcherLocked(
857 this, initial_state, *health_check_service_name, std::move(watcher));
858 }
859 }
860
CancelConnectivityStateWatch(const absl::optional<std::string> & health_check_service_name,ConnectivityStateWatcherInterface * watcher)861 void Subchannel::CancelConnectivityStateWatch(
862 const absl::optional<std::string>& health_check_service_name,
863 ConnectivityStateWatcherInterface* watcher) {
864 MutexLock lock(&mu_);
865 grpc_pollset_set* interested_parties = watcher->interested_parties();
866 if (interested_parties != nullptr) {
867 grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
868 }
869 if (!health_check_service_name.has_value()) {
870 watcher_list_.RemoveWatcherLocked(watcher);
871 } else {
872 health_watcher_map_.RemoveWatcherLocked(*health_check_service_name,
873 watcher);
874 }
875 }
876
AttemptToConnect()877 void Subchannel::AttemptToConnect() {
878 MutexLock lock(&mu_);
879 MaybeStartConnectingLocked();
880 }
881
ResetBackoff()882 void Subchannel::ResetBackoff() {
883 MutexLock lock(&mu_);
884 backoff_.Reset();
885 if (have_retry_alarm_) {
886 retry_immediately_ = true;
887 grpc_timer_cancel(&retry_alarm_);
888 } else {
889 backoff_begun_ = false;
890 MaybeStartConnectingLocked();
891 }
892 }
893
CreateSubchannelAddressArg(const grpc_resolved_address * addr)894 grpc_arg Subchannel::CreateSubchannelAddressArg(
895 const grpc_resolved_address* addr) {
896 return grpc_channel_arg_string_create(
897 const_cast<char*>(GRPC_ARG_SUBCHANNEL_ADDRESS),
898 gpr_strdup(addr->len > 0 ? grpc_sockaddr_to_uri(addr).c_str() : ""));
899 }
900
GetUriFromSubchannelAddressArg(const grpc_channel_args * args)901 const char* Subchannel::GetUriFromSubchannelAddressArg(
902 const grpc_channel_args* args) {
903 const grpc_arg* addr_arg =
904 grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
905 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
906 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
907 return addr_str;
908 }
909
910 namespace {
911
UriToSockaddr(const char * uri_str,grpc_resolved_address * addr)912 void UriToSockaddr(const char* uri_str, grpc_resolved_address* addr) {
913 absl::StatusOr<URI> uri = URI::Parse(uri_str);
914 if (!uri.ok()) {
915 gpr_log(GPR_ERROR, "%s", uri.status().ToString().c_str());
916 GPR_ASSERT(uri.ok());
917 }
918 if (!grpc_parse_uri(*uri, addr)) memset(addr, 0, sizeof(*addr));
919 }
920
921 } // namespace
922
GetAddressFromSubchannelAddressArg(const grpc_channel_args * args,grpc_resolved_address * addr)923 void Subchannel::GetAddressFromSubchannelAddressArg(
924 const grpc_channel_args* args, grpc_resolved_address* addr) {
925 const char* addr_uri_str = GetUriFromSubchannelAddressArg(args);
926 memset(addr, 0, sizeof(*addr));
927 if (*addr_uri_str != '\0') {
928 UriToSockaddr(addr_uri_str, addr);
929 }
930 }
931
932 namespace {
933
934 // Returns a string indicating the subchannel's connectivity state change to
935 // \a state.
SubchannelConnectivityStateChangeString(grpc_connectivity_state state)936 const char* SubchannelConnectivityStateChangeString(
937 grpc_connectivity_state state) {
938 switch (state) {
939 case GRPC_CHANNEL_IDLE:
940 return "Subchannel state change to IDLE";
941 case GRPC_CHANNEL_CONNECTING:
942 return "Subchannel state change to CONNECTING";
943 case GRPC_CHANNEL_READY:
944 return "Subchannel state change to READY";
945 case GRPC_CHANNEL_TRANSIENT_FAILURE:
946 return "Subchannel state change to TRANSIENT_FAILURE";
947 case GRPC_CHANNEL_SHUTDOWN:
948 return "Subchannel state change to SHUTDOWN";
949 }
950 GPR_UNREACHABLE_CODE(return "UNKNOWN");
951 }
952
953 } // namespace
954
955 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)956 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
957 const absl::Status& status) {
958 state_ = state;
959 status_ = status;
960 if (channelz_node_ != nullptr) {
961 channelz_node_->UpdateConnectivityState(state);
962 channelz_node_->AddTraceEvent(
963 channelz::ChannelTrace::Severity::Info,
964 grpc_slice_from_static_string(
965 SubchannelConnectivityStateChangeString(state)));
966 }
967 // Notify non-health watchers.
968 watcher_list_.NotifyLocked(this, state, status);
969 // Notify health watchers.
970 health_watcher_map_.NotifyLocked(state, status);
971 }
972
MaybeStartConnectingLocked()973 void Subchannel::MaybeStartConnectingLocked() {
974 if (disconnected_) {
975 // Don't try to connect if we're already disconnected.
976 return;
977 }
978 if (connecting_) {
979 // Already connecting: don't restart.
980 return;
981 }
982 if (connected_subchannel_ != nullptr) {
983 // Already connected: don't restart.
984 return;
985 }
986 connecting_ = true;
987 GRPC_SUBCHANNEL_WEAK_REF(this, "connecting");
988 if (!backoff_begun_) {
989 backoff_begun_ = true;
990 ContinueConnectingLocked();
991 } else {
992 GPR_ASSERT(!have_retry_alarm_);
993 have_retry_alarm_ = true;
994 const grpc_millis time_til_next =
995 next_attempt_deadline_ - ExecCtx::Get()->Now();
996 if (time_til_next <= 0) {
997 gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", this);
998 } else {
999 gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds",
1000 this, time_til_next);
1001 }
1002 GRPC_CLOSURE_INIT(&on_retry_alarm_, OnRetryAlarm, this,
1003 grpc_schedule_on_exec_ctx);
1004 grpc_timer_init(&retry_alarm_, next_attempt_deadline_, &on_retry_alarm_);
1005 }
1006 }
1007
OnRetryAlarm(void * arg,grpc_error * error)1008 void Subchannel::OnRetryAlarm(void* arg, grpc_error* error) {
1009 Subchannel* c = static_cast<Subchannel*>(arg);
1010 // TODO(soheilhy): Once subchannel refcounting is simplified, we can get use
1011 // MutexLock instead of ReleasableMutexLock, here.
1012 ReleasableMutexLock lock(&c->mu_);
1013 c->have_retry_alarm_ = false;
1014 if (c->disconnected_) {
1015 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
1016 &error, 1);
1017 } else if (c->retry_immediately_) {
1018 c->retry_immediately_ = false;
1019 error = GRPC_ERROR_NONE;
1020 } else {
1021 GRPC_ERROR_REF(error);
1022 }
1023 if (error == GRPC_ERROR_NONE) {
1024 gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
1025 c->ContinueConnectingLocked();
1026 lock.Unlock();
1027 } else {
1028 lock.Unlock();
1029 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1030 }
1031 GRPC_ERROR_UNREF(error);
1032 }
1033
ContinueConnectingLocked()1034 void Subchannel::ContinueConnectingLocked() {
1035 SubchannelConnector::Args args;
1036 args.interested_parties = pollset_set_;
1037 const grpc_millis min_deadline =
1038 min_connect_timeout_ms_ + ExecCtx::Get()->Now();
1039 next_attempt_deadline_ = backoff_.NextAttemptTime();
1040 args.deadline = std::max(next_attempt_deadline_, min_deadline);
1041 args.channel_args = args_;
1042 SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status());
1043 connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
1044 }
1045
OnConnectingFinished(void * arg,grpc_error * error)1046 void Subchannel::OnConnectingFinished(void* arg, grpc_error* error) {
1047 auto* c = static_cast<Subchannel*>(arg);
1048 const grpc_channel_args* delete_channel_args =
1049 c->connecting_result_.channel_args;
1050 GRPC_SUBCHANNEL_WEAK_REF(c, "on_connecting_finished");
1051 {
1052 MutexLock lock(&c->mu_);
1053 c->connecting_ = false;
1054 if (c->connecting_result_.transport != nullptr &&
1055 c->PublishTransportLocked()) {
1056 // Do nothing, transport was published.
1057 } else if (c->disconnected_) {
1058 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1059 } else {
1060 gpr_log(GPR_INFO, "Connect failed: %s", grpc_error_string(error));
1061 c->SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
1062 grpc_error_to_absl_status(error));
1063 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
1064 }
1065 }
1066 GRPC_SUBCHANNEL_WEAK_UNREF(c, "on_connecting_finished");
1067 grpc_channel_args_destroy(delete_channel_args);
1068 }
1069
1070 namespace {
1071
ConnectionDestroy(void * arg,grpc_error *)1072 void ConnectionDestroy(void* arg, grpc_error* /*error*/) {
1073 grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
1074 grpc_channel_stack_destroy(stk);
1075 gpr_free(stk);
1076 }
1077
1078 } // namespace
1079
PublishTransportLocked()1080 bool Subchannel::PublishTransportLocked() {
1081 // Construct channel stack.
1082 grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
1083 grpc_channel_stack_builder_set_channel_arguments(
1084 builder, connecting_result_.channel_args);
1085 grpc_channel_stack_builder_set_transport(builder,
1086 connecting_result_.transport);
1087 if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
1088 grpc_channel_stack_builder_destroy(builder);
1089 return false;
1090 }
1091 grpc_channel_stack* stk;
1092 grpc_error* error = grpc_channel_stack_builder_finish(
1093 builder, 0, 1, ConnectionDestroy, nullptr,
1094 reinterpret_cast<void**>(&stk));
1095 if (error != GRPC_ERROR_NONE) {
1096 grpc_transport_destroy(connecting_result_.transport);
1097 gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
1098 grpc_error_string(error));
1099 GRPC_ERROR_UNREF(error);
1100 return false;
1101 }
1102 RefCountedPtr<channelz::SocketNode> socket =
1103 std::move(connecting_result_.socket_node);
1104 connecting_result_.Reset();
1105 if (disconnected_) {
1106 grpc_channel_stack_destroy(stk);
1107 gpr_free(stk);
1108 return false;
1109 }
1110 // Publish.
1111 connected_subchannel_.reset(
1112 new ConnectedSubchannel(stk, args_, channelz_node_));
1113 gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
1114 connected_subchannel_.get(), this);
1115 if (channelz_node_ != nullptr) {
1116 channelz_node_->SetChildSocket(std::move(socket));
1117 }
1118 // Start watching connected subchannel.
1119 connected_subchannel_->StartWatch(
1120 pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(this));
1121 // Report initial state.
1122 SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
1123 return true;
1124 }
1125
Disconnect()1126 void Subchannel::Disconnect() {
1127 // The subchannel_pool is only used once here in this subchannel, so the
1128 // access can be outside of the lock.
1129 if (subchannel_pool_ != nullptr) {
1130 subchannel_pool_->UnregisterSubchannel(key_);
1131 subchannel_pool_.reset();
1132 }
1133 MutexLock lock(&mu_);
1134 GPR_ASSERT(!disconnected_);
1135 disconnected_ = true;
1136 connector_.reset();
1137 connected_subchannel_.reset();
1138 health_watcher_map_.ShutdownLocked();
1139 }
1140
RefMutate(gpr_atm delta,int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS)1141 gpr_atm Subchannel::RefMutate(
1142 gpr_atm delta, int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS) {
1143 gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&ref_pair_, delta)
1144 : gpr_atm_no_barrier_fetch_add(&ref_pair_, delta);
1145 #ifndef NDEBUG
1146 if (grpc_trace_subchannel_refcount.enabled()) {
1147 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
1148 "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", this,
1149 purpose, old_val, old_val + delta, reason);
1150 }
1151 #endif
1152 return old_val;
1153 }
1154
1155 } // namespace grpc_core
1156