1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/subchannel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25
26 #include <algorithm>
27 #include <cstring>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31
32 #include "src/core/ext/filters/client_channel/client_channel.h"
33 #include "src/core/ext/filters/client_channel/parse_address.h"
34 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
35 #include "src/core/ext/filters/client_channel/subchannel_index.h"
36 #include "src/core/ext/filters/client_channel/uri_parser.h"
37 #include "src/core/lib/backoff/backoff.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/connected_channel.h"
40 #include "src/core/lib/debug/stats.h"
41 #include "src/core/lib/gpr/alloc.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/gprpp/manual_constructor.h"
44 #include "src/core/lib/gprpp/ref_counted_ptr.h"
45 #include "src/core/lib/iomgr/sockaddr_utils.h"
46 #include "src/core/lib/iomgr/timer.h"
47 #include "src/core/lib/profiling/timers.h"
48 #include "src/core/lib/slice/slice_internal.h"
49 #include "src/core/lib/surface/channel.h"
50 #include "src/core/lib/surface/channel_init.h"
51 #include "src/core/lib/transport/connectivity_state.h"
52
53 #define INTERNAL_REF_BITS 16
54 #define STRONG_REF_MASK (~(gpr_atm)((1 << INTERNAL_REF_BITS) - 1))
55
56 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
57 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
58 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
59 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
60 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
61
62 namespace {
63 struct state_watcher {
64 grpc_closure closure;
65 grpc_subchannel* subchannel;
66 grpc_connectivity_state connectivity_state;
67 };
68 } // namespace
69
70 typedef struct external_state_watcher {
71 grpc_subchannel* subchannel;
72 grpc_pollset_set* pollset_set;
73 grpc_closure* notify;
74 grpc_closure closure;
75 struct external_state_watcher* next;
76 struct external_state_watcher* prev;
77 } external_state_watcher;
78
79 struct grpc_subchannel {
80 grpc_connector* connector;
81
82 /** refcount
83 - lower INTERNAL_REF_BITS bits are for internal references:
84 these do not keep the subchannel open.
85 - upper remaining bits are for public references: these do
86 keep the subchannel open */
87 gpr_atm ref_pair;
88
89 /** non-transport related channel filters */
90 const grpc_channel_filter** filters;
91 size_t num_filters;
92 /** channel arguments */
93 grpc_channel_args* args;
94
95 grpc_subchannel_key* key;
96
97 /** set during connection */
98 grpc_connect_out_args connecting_result;
99
100 /** callback for connection finishing */
101 grpc_closure on_connected;
102
103 /** callback for our alarm */
104 grpc_closure on_alarm;
105
106 /** pollset_set tracking who's interested in a connection
107 being setup */
108 grpc_pollset_set* pollset_set;
109
110 /** mutex protecting remaining elements */
111 gpr_mu mu;
112
113 /** active connection, or null; of type grpc_core::ConnectedSubchannel
114 */
115 grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel> connected_subchannel;
116
117 /** have we seen a disconnection? */
118 bool disconnected;
119 /** are we connecting */
120 bool connecting;
121 /** connectivity state tracking */
122 grpc_connectivity_state_tracker state_tracker;
123
124 external_state_watcher root_external_state_watcher;
125
126 /** backoff state */
127 grpc_core::ManualConstructor<grpc_core::BackOff> backoff;
128 grpc_millis next_attempt_deadline;
129 grpc_millis min_connect_timeout_ms;
130
131 /** do we have an active alarm? */
132 bool have_alarm;
133 /** have we started the backoff loop */
134 bool backoff_begun;
135 // reset_backoff() was called while alarm was pending
136 bool deferred_reset_backoff;
137 /** our alarm */
138 grpc_timer alarm;
139
140 grpc_core::RefCountedPtr<grpc_core::channelz::SubchannelNode>
141 channelz_subchannel;
142 };
143
144 struct grpc_subchannel_call {
145 grpc_core::ConnectedSubchannel* connection;
146 grpc_closure* schedule_closure_after_destroy;
147 };
148
149 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
150 (grpc_call_stack*)((char*)(call) + GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
151 sizeof(grpc_subchannel_call)))
152 #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
153 (grpc_subchannel_call*)(((char*)(call_stack)) - \
154 GPR_ROUND_UP_TO_ALIGNMENT_SIZE( \
155 sizeof(grpc_subchannel_call)))
156
157 static void on_subchannel_connected(void* subchannel, grpc_error* error);
158
159 #ifndef NDEBUG
160 #define REF_REASON reason
161 #define REF_MUTATE_EXTRA_ARGS \
162 GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose
163 #define REF_MUTATE_PURPOSE(x) , file, line, reason, x
164 #else
165 #define REF_REASON ""
166 #define REF_MUTATE_EXTRA_ARGS
167 #define REF_MUTATE_PURPOSE(x)
168 #endif
169
170 /*
171 * connection implementation
172 */
173
connection_destroy(void * arg,grpc_error * error)174 static void connection_destroy(void* arg, grpc_error* error) {
175 grpc_channel_stack* stk = static_cast<grpc_channel_stack*>(arg);
176 grpc_channel_stack_destroy(stk);
177 gpr_free(stk);
178 }
179
180 /*
181 * grpc_subchannel implementation
182 */
183
subchannel_destroy(void * arg,grpc_error * error)184 static void subchannel_destroy(void* arg, grpc_error* error) {
185 grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
186 if (c->channelz_subchannel != nullptr) {
187 c->channelz_subchannel->AddTraceEvent(
188 grpc_core::channelz::ChannelTrace::Severity::Info,
189 grpc_slice_from_static_string("Subchannel destroyed"));
190 c->channelz_subchannel->MarkSubchannelDestroyed();
191 c->channelz_subchannel.reset();
192 }
193 gpr_free((void*)c->filters);
194 grpc_channel_args_destroy(c->args);
195 grpc_connectivity_state_destroy(&c->state_tracker);
196 grpc_connector_unref(c->connector);
197 grpc_pollset_set_destroy(c->pollset_set);
198 grpc_subchannel_key_destroy(c->key);
199 gpr_mu_destroy(&c->mu);
200 gpr_free(c);
201 }
202
ref_mutate(grpc_subchannel * c,gpr_atm delta,int barrier REF_MUTATE_EXTRA_ARGS)203 static gpr_atm ref_mutate(grpc_subchannel* c, gpr_atm delta,
204 int barrier REF_MUTATE_EXTRA_ARGS) {
205 gpr_atm old_val = barrier ? gpr_atm_full_fetch_add(&c->ref_pair, delta)
206 : gpr_atm_no_barrier_fetch_add(&c->ref_pair, delta);
207 #ifndef NDEBUG
208 if (grpc_trace_stream_refcount.enabled()) {
209 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
210 "SUBCHANNEL: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c,
211 purpose, old_val, old_val + delta, reason);
212 }
213 #endif
214 return old_val;
215 }
216
grpc_subchannel_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)217 grpc_subchannel* grpc_subchannel_ref(
218 grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
219 gpr_atm old_refs;
220 old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS),
221 0 REF_MUTATE_PURPOSE("STRONG_REF"));
222 GPR_ASSERT((old_refs & STRONG_REF_MASK) != 0);
223 return c;
224 }
225
grpc_subchannel_weak_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)226 grpc_subchannel* grpc_subchannel_weak_ref(
227 grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
228 gpr_atm old_refs;
229 old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF"));
230 GPR_ASSERT(old_refs != 0);
231 return c;
232 }
233
grpc_subchannel_ref_from_weak_ref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)234 grpc_subchannel* grpc_subchannel_ref_from_weak_ref(
235 grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
236 if (!c) return nullptr;
237 for (;;) {
238 gpr_atm old_refs = gpr_atm_acq_load(&c->ref_pair);
239 if (old_refs >= (1 << INTERNAL_REF_BITS)) {
240 gpr_atm new_refs = old_refs + (1 << INTERNAL_REF_BITS);
241 if (gpr_atm_rel_cas(&c->ref_pair, old_refs, new_refs)) {
242 return c;
243 }
244 } else {
245 return nullptr;
246 }
247 }
248 }
249
disconnect(grpc_subchannel * c)250 static void disconnect(grpc_subchannel* c) {
251 grpc_subchannel_index_unregister(c->key, c);
252 gpr_mu_lock(&c->mu);
253 GPR_ASSERT(!c->disconnected);
254 c->disconnected = true;
255 grpc_connector_shutdown(c->connector, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
256 "Subchannel disconnected"));
257 c->connected_subchannel.reset();
258 gpr_mu_unlock(&c->mu);
259 }
260
grpc_subchannel_unref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)261 void grpc_subchannel_unref(grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
262 gpr_atm old_refs;
263 // add a weak ref and subtract a strong ref (atomically)
264 old_refs = ref_mutate(
265 c, static_cast<gpr_atm>(1) - static_cast<gpr_atm>(1 << INTERNAL_REF_BITS),
266 1 REF_MUTATE_PURPOSE("STRONG_UNREF"));
267 if ((old_refs & STRONG_REF_MASK) == (1 << INTERNAL_REF_BITS)) {
268 disconnect(c);
269 }
270 GRPC_SUBCHANNEL_WEAK_UNREF(c, "strong-unref");
271 }
272
grpc_subchannel_weak_unref(grpc_subchannel * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)273 void grpc_subchannel_weak_unref(
274 grpc_subchannel* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
275 gpr_atm old_refs;
276 old_refs = ref_mutate(c, -static_cast<gpr_atm>(1),
277 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
278 if (old_refs == 1) {
279 GRPC_CLOSURE_SCHED(
280 GRPC_CLOSURE_CREATE(subchannel_destroy, c, grpc_schedule_on_exec_ctx),
281 GRPC_ERROR_NONE);
282 }
283 }
284
parse_args_for_backoff_values(const grpc_channel_args * args,grpc_core::BackOff::Options * backoff_options,grpc_millis * min_connect_timeout_ms)285 static void parse_args_for_backoff_values(
286 const grpc_channel_args* args, grpc_core::BackOff::Options* backoff_options,
287 grpc_millis* min_connect_timeout_ms) {
288 grpc_millis initial_backoff_ms =
289 GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000;
290 *min_connect_timeout_ms =
291 GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS * 1000;
292 grpc_millis max_backoff_ms =
293 GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
294 bool fixed_reconnect_backoff = false;
295 if (args != nullptr) {
296 for (size_t i = 0; i < args->num_args; i++) {
297 if (0 == strcmp(args->args[i].key,
298 "grpc.testing.fixed_reconnect_backoff_ms")) {
299 fixed_reconnect_backoff = true;
300 initial_backoff_ms = *min_connect_timeout_ms = max_backoff_ms =
301 grpc_channel_arg_get_integer(
302 &args->args[i],
303 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
304 } else if (0 ==
305 strcmp(args->args[i].key, GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)) {
306 fixed_reconnect_backoff = false;
307 *min_connect_timeout_ms = grpc_channel_arg_get_integer(
308 &args->args[i],
309 {static_cast<int>(*min_connect_timeout_ms), 100, INT_MAX});
310 } else if (0 ==
311 strcmp(args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) {
312 fixed_reconnect_backoff = false;
313 max_backoff_ms = grpc_channel_arg_get_integer(
314 &args->args[i], {static_cast<int>(max_backoff_ms), 100, INT_MAX});
315 } else if (0 == strcmp(args->args[i].key,
316 GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)) {
317 fixed_reconnect_backoff = false;
318 initial_backoff_ms = grpc_channel_arg_get_integer(
319 &args->args[i],
320 {static_cast<int>(initial_backoff_ms), 100, INT_MAX});
321 }
322 }
323 }
324 backoff_options->set_initial_backoff(initial_backoff_ms)
325 .set_multiplier(fixed_reconnect_backoff
326 ? 1.0
327 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
328 .set_jitter(fixed_reconnect_backoff ? 0.0
329 : GRPC_SUBCHANNEL_RECONNECT_JITTER)
330 .set_max_backoff(max_backoff_ms);
331 }
332
grpc_subchannel_create(grpc_connector * connector,const grpc_subchannel_args * args)333 grpc_subchannel* grpc_subchannel_create(grpc_connector* connector,
334 const grpc_subchannel_args* args) {
335 grpc_subchannel_key* key = grpc_subchannel_key_create(args);
336 grpc_subchannel* c = grpc_subchannel_index_find(key);
337 if (c) {
338 grpc_subchannel_key_destroy(key);
339 return c;
340 }
341
342 GRPC_STATS_INC_CLIENT_SUBCHANNELS_CREATED();
343 c = static_cast<grpc_subchannel*>(gpr_zalloc(sizeof(*c)));
344 c->key = key;
345 gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS);
346 c->connector = connector;
347 grpc_connector_ref(c->connector);
348 c->num_filters = args->filter_count;
349 if (c->num_filters > 0) {
350 c->filters = static_cast<const grpc_channel_filter**>(
351 gpr_malloc(sizeof(grpc_channel_filter*) * c->num_filters));
352 memcpy((void*)c->filters, args->filters,
353 sizeof(grpc_channel_filter*) * c->num_filters);
354 } else {
355 c->filters = nullptr;
356 }
357 c->pollset_set = grpc_pollset_set_create();
358 grpc_resolved_address* addr =
359 static_cast<grpc_resolved_address*>(gpr_malloc(sizeof(*addr)));
360 grpc_get_subchannel_address_arg(args->args, addr);
361 grpc_resolved_address* new_address = nullptr;
362 grpc_channel_args* new_args = nullptr;
363 if (grpc_proxy_mappers_map_address(addr, args->args, &new_address,
364 &new_args)) {
365 GPR_ASSERT(new_address != nullptr);
366 gpr_free(addr);
367 addr = new_address;
368 }
369 static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS};
370 grpc_arg new_arg = grpc_create_subchannel_address_arg(addr);
371 gpr_free(addr);
372 c->args = grpc_channel_args_copy_and_add_and_remove(
373 new_args != nullptr ? new_args : args->args, keys_to_remove,
374 GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1);
375 gpr_free(new_arg.value.string);
376 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
377 c->root_external_state_watcher.next = c->root_external_state_watcher.prev =
378 &c->root_external_state_watcher;
379 GRPC_CLOSURE_INIT(&c->on_connected, on_subchannel_connected, c,
380 grpc_schedule_on_exec_ctx);
381 grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
382 "subchannel");
383 grpc_core::BackOff::Options backoff_options;
384 parse_args_for_backoff_values(args->args, &backoff_options,
385 &c->min_connect_timeout_ms);
386 c->backoff.Init(backoff_options);
387 gpr_mu_init(&c->mu);
388
389 const grpc_arg* arg =
390 grpc_channel_args_find(c->args, GRPC_ARG_ENABLE_CHANNELZ);
391 bool channelz_enabled = grpc_channel_arg_get_bool(arg, false);
392 arg = grpc_channel_args_find(c->args,
393 GRPC_ARG_MAX_CHANNEL_TRACE_EVENTS_PER_NODE);
394 const grpc_integer_options options = {0, 0, INT_MAX};
395 size_t channel_tracer_max_nodes =
396 (size_t)grpc_channel_arg_get_integer(arg, options);
397 if (channelz_enabled) {
398 c->channelz_subchannel =
399 grpc_core::MakeRefCounted<grpc_core::channelz::SubchannelNode>(
400 c, channel_tracer_max_nodes);
401 c->channelz_subchannel->AddTraceEvent(
402 grpc_core::channelz::ChannelTrace::Severity::Info,
403 grpc_slice_from_static_string("Subchannel created"));
404 }
405
406 return grpc_subchannel_index_register(key, c);
407 }
408
grpc_subchannel_get_channelz_node(grpc_subchannel * subchannel)409 grpc_core::channelz::SubchannelNode* grpc_subchannel_get_channelz_node(
410 grpc_subchannel* subchannel) {
411 return subchannel->channelz_subchannel.get();
412 }
413
continue_connect_locked(grpc_subchannel * c)414 static void continue_connect_locked(grpc_subchannel* c) {
415 grpc_connect_in_args args;
416 args.interested_parties = c->pollset_set;
417 const grpc_millis min_deadline =
418 c->min_connect_timeout_ms + grpc_core::ExecCtx::Get()->Now();
419 c->next_attempt_deadline = c->backoff->NextAttemptTime();
420 args.deadline = std::max(c->next_attempt_deadline, min_deadline);
421 args.channel_args = c->args;
422 grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_CONNECTING,
423 GRPC_ERROR_NONE, "connecting");
424 grpc_connector_connect(c->connector, &args, &c->connecting_result,
425 &c->on_connected);
426 }
427
grpc_subchannel_check_connectivity(grpc_subchannel * c,grpc_error ** error)428 grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel* c,
429 grpc_error** error) {
430 grpc_connectivity_state state;
431 gpr_mu_lock(&c->mu);
432 state = grpc_connectivity_state_get(&c->state_tracker, error);
433 gpr_mu_unlock(&c->mu);
434 return state;
435 }
436
on_external_state_watcher_done(void * arg,grpc_error * error)437 static void on_external_state_watcher_done(void* arg, grpc_error* error) {
438 external_state_watcher* w = static_cast<external_state_watcher*>(arg);
439 grpc_closure* follow_up = w->notify;
440 if (w->pollset_set != nullptr) {
441 grpc_pollset_set_del_pollset_set(w->subchannel->pollset_set,
442 w->pollset_set);
443 }
444 gpr_mu_lock(&w->subchannel->mu);
445 w->next->prev = w->prev;
446 w->prev->next = w->next;
447 gpr_mu_unlock(&w->subchannel->mu);
448 GRPC_SUBCHANNEL_WEAK_UNREF(w->subchannel, "external_state_watcher");
449 gpr_free(w);
450 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
451 }
452
on_alarm(void * arg,grpc_error * error)453 static void on_alarm(void* arg, grpc_error* error) {
454 grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
455 gpr_mu_lock(&c->mu);
456 c->have_alarm = false;
457 if (c->disconnected) {
458 error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING("Disconnected",
459 &error, 1);
460 } else if (c->deferred_reset_backoff) {
461 c->deferred_reset_backoff = false;
462 error = GRPC_ERROR_NONE;
463 } else {
464 GRPC_ERROR_REF(error);
465 }
466 if (error == GRPC_ERROR_NONE) {
467 gpr_log(GPR_INFO, "Failed to connect to channel, retrying");
468 continue_connect_locked(c);
469 gpr_mu_unlock(&c->mu);
470 } else {
471 gpr_mu_unlock(&c->mu);
472 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
473 }
474 GRPC_ERROR_UNREF(error);
475 }
476
maybe_start_connecting_locked(grpc_subchannel * c)477 static void maybe_start_connecting_locked(grpc_subchannel* c) {
478 if (c->disconnected) {
479 /* Don't try to connect if we're already disconnected */
480 return;
481 }
482 if (c->connecting) {
483 /* Already connecting: don't restart */
484 return;
485 }
486 if (c->connected_subchannel != nullptr) {
487 /* Already connected: don't restart */
488 return;
489 }
490 if (!grpc_connectivity_state_has_watchers(&c->state_tracker)) {
491 /* Nobody is interested in connecting: so don't just yet */
492 return;
493 }
494 c->connecting = true;
495 GRPC_SUBCHANNEL_WEAK_REF(c, "connecting");
496 if (!c->backoff_begun) {
497 c->backoff_begun = true;
498 continue_connect_locked(c);
499 } else {
500 GPR_ASSERT(!c->have_alarm);
501 c->have_alarm = true;
502 const grpc_millis time_til_next =
503 c->next_attempt_deadline - grpc_core::ExecCtx::Get()->Now();
504 if (time_til_next <= 0) {
505 gpr_log(GPR_INFO, "Subchannel %p: Retry immediately", c);
506 } else {
507 gpr_log(GPR_INFO, "Subchannel %p: Retry in %" PRId64 " milliseconds", c,
508 time_til_next);
509 }
510 GRPC_CLOSURE_INIT(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx);
511 grpc_timer_init(&c->alarm, c->next_attempt_deadline, &c->on_alarm);
512 }
513 }
514
grpc_subchannel_notify_on_state_change(grpc_subchannel * c,grpc_pollset_set * interested_parties,grpc_connectivity_state * state,grpc_closure * notify)515 void grpc_subchannel_notify_on_state_change(
516 grpc_subchannel* c, grpc_pollset_set* interested_parties,
517 grpc_connectivity_state* state, grpc_closure* notify) {
518 external_state_watcher* w;
519
520 if (state == nullptr) {
521 gpr_mu_lock(&c->mu);
522 for (w = c->root_external_state_watcher.next;
523 w != &c->root_external_state_watcher; w = w->next) {
524 if (w->notify == notify) {
525 grpc_connectivity_state_notify_on_state_change(&c->state_tracker,
526 nullptr, &w->closure);
527 }
528 }
529 gpr_mu_unlock(&c->mu);
530 } else {
531 w = static_cast<external_state_watcher*>(gpr_malloc(sizeof(*w)));
532 w->subchannel = c;
533 w->pollset_set = interested_parties;
534 w->notify = notify;
535 GRPC_CLOSURE_INIT(&w->closure, on_external_state_watcher_done, w,
536 grpc_schedule_on_exec_ctx);
537 if (interested_parties != nullptr) {
538 grpc_pollset_set_add_pollset_set(c->pollset_set, interested_parties);
539 }
540 GRPC_SUBCHANNEL_WEAK_REF(c, "external_state_watcher");
541 gpr_mu_lock(&c->mu);
542 w->next = &c->root_external_state_watcher;
543 w->prev = w->next->prev;
544 w->next->prev = w->prev->next = w;
545 grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
546 &w->closure);
547 maybe_start_connecting_locked(c);
548 gpr_mu_unlock(&c->mu);
549 }
550 }
551
on_connected_subchannel_connectivity_changed(void * p,grpc_error * error)552 static void on_connected_subchannel_connectivity_changed(void* p,
553 grpc_error* error) {
554 state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(p);
555 grpc_subchannel* c = connected_subchannel_watcher->subchannel;
556 gpr_mu* mu = &c->mu;
557
558 gpr_mu_lock(mu);
559
560 switch (connected_subchannel_watcher->connectivity_state) {
561 case GRPC_CHANNEL_TRANSIENT_FAILURE:
562 case GRPC_CHANNEL_SHUTDOWN: {
563 if (!c->disconnected && c->connected_subchannel != nullptr) {
564 if (grpc_trace_stream_refcount.enabled()) {
565 gpr_log(GPR_INFO,
566 "Connected subchannel %p of subchannel %p has gone into %s. "
567 "Attempting to reconnect.",
568 c->connected_subchannel.get(), c,
569 grpc_connectivity_state_name(
570 connected_subchannel_watcher->connectivity_state));
571 }
572 c->connected_subchannel.reset();
573 grpc_connectivity_state_set(&c->state_tracker,
574 GRPC_CHANNEL_TRANSIENT_FAILURE,
575 GRPC_ERROR_REF(error), "reflect_child");
576 c->backoff_begun = false;
577 c->backoff->Reset();
578 maybe_start_connecting_locked(c);
579 } else {
580 connected_subchannel_watcher->connectivity_state =
581 GRPC_CHANNEL_SHUTDOWN;
582 }
583 break;
584 }
585 default: {
586 grpc_connectivity_state_set(
587 &c->state_tracker, connected_subchannel_watcher->connectivity_state,
588 GRPC_ERROR_REF(error), "reflect_child");
589 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
590 c->connected_subchannel->NotifyOnStateChange(
591 nullptr, &connected_subchannel_watcher->connectivity_state,
592 &connected_subchannel_watcher->closure);
593 connected_subchannel_watcher = nullptr;
594 }
595 }
596 gpr_mu_unlock(mu);
597 GRPC_SUBCHANNEL_WEAK_UNREF(c, "state_watcher");
598 gpr_free(connected_subchannel_watcher);
599 }
600
publish_transport_locked(grpc_subchannel * c)601 static bool publish_transport_locked(grpc_subchannel* c) {
602 /* construct channel stack */
603 grpc_channel_stack_builder* builder = grpc_channel_stack_builder_create();
604 grpc_channel_stack_builder_set_channel_arguments(
605 builder, c->connecting_result.channel_args);
606 grpc_channel_stack_builder_set_transport(builder,
607 c->connecting_result.transport);
608
609 if (!grpc_channel_init_create_stack(builder, GRPC_CLIENT_SUBCHANNEL)) {
610 grpc_channel_stack_builder_destroy(builder);
611 return false;
612 }
613 grpc_channel_stack* stk;
614 grpc_error* error = grpc_channel_stack_builder_finish(
615 builder, 0, 1, connection_destroy, nullptr,
616 reinterpret_cast<void**>(&stk));
617 if (error != GRPC_ERROR_NONE) {
618 grpc_transport_destroy(c->connecting_result.transport);
619 gpr_log(GPR_ERROR, "error initializing subchannel stack: %s",
620 grpc_error_string(error));
621 GRPC_ERROR_UNREF(error);
622 return false;
623 }
624 memset(&c->connecting_result, 0, sizeof(c->connecting_result));
625
626 /* initialize state watcher */
627 state_watcher* connected_subchannel_watcher = static_cast<state_watcher*>(
628 gpr_zalloc(sizeof(*connected_subchannel_watcher)));
629 connected_subchannel_watcher->subchannel = c;
630 connected_subchannel_watcher->connectivity_state = GRPC_CHANNEL_READY;
631 GRPC_CLOSURE_INIT(&connected_subchannel_watcher->closure,
632 on_connected_subchannel_connectivity_changed,
633 connected_subchannel_watcher, grpc_schedule_on_exec_ctx);
634
635 if (c->disconnected) {
636 gpr_free(connected_subchannel_watcher);
637 grpc_channel_stack_destroy(stk);
638 gpr_free(stk);
639 return false;
640 }
641
642 /* publish */
643 c->connected_subchannel.reset(grpc_core::New<grpc_core::ConnectedSubchannel>(
644 stk, c->channelz_subchannel.get()));
645 gpr_log(GPR_INFO, "New connected subchannel at %p for subchannel %p",
646 c->connected_subchannel.get(), c);
647
648 /* setup subchannel watching connected subchannel for changes; subchannel
649 ref for connecting is donated to the state watcher */
650 GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher");
651 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
652 c->connected_subchannel->NotifyOnStateChange(
653 c->pollset_set, &connected_subchannel_watcher->connectivity_state,
654 &connected_subchannel_watcher->closure);
655
656 /* signal completion */
657 grpc_connectivity_state_set(&c->state_tracker, GRPC_CHANNEL_READY,
658 GRPC_ERROR_NONE, "connected");
659 return true;
660 }
661
on_subchannel_connected(void * arg,grpc_error * error)662 static void on_subchannel_connected(void* arg, grpc_error* error) {
663 grpc_subchannel* c = static_cast<grpc_subchannel*>(arg);
664 grpc_channel_args* delete_channel_args = c->connecting_result.channel_args;
665
666 GRPC_SUBCHANNEL_WEAK_REF(c, "on_subchannel_connected");
667 gpr_mu_lock(&c->mu);
668 c->connecting = false;
669 if (c->connecting_result.transport != nullptr &&
670 publish_transport_locked(c)) {
671 /* do nothing, transport was published */
672 } else if (c->disconnected) {
673 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
674 } else {
675 grpc_connectivity_state_set(
676 &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
677 grpc_error_set_int(GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
678 "Connect Failed", &error, 1),
679 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE),
680 "connect_failed");
681
682 const char* errmsg = grpc_error_string(error);
683 gpr_log(GPR_INFO, "Connect failed: %s", errmsg);
684
685 maybe_start_connecting_locked(c);
686 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connecting");
687 }
688 gpr_mu_unlock(&c->mu);
689 GRPC_SUBCHANNEL_WEAK_UNREF(c, "connected");
690 grpc_channel_args_destroy(delete_channel_args);
691 }
692
grpc_subchannel_reset_backoff(grpc_subchannel * subchannel)693 void grpc_subchannel_reset_backoff(grpc_subchannel* subchannel) {
694 gpr_mu_lock(&subchannel->mu);
695 if (subchannel->have_alarm) {
696 subchannel->deferred_reset_backoff = true;
697 grpc_timer_cancel(&subchannel->alarm);
698 } else {
699 subchannel->backoff_begun = false;
700 subchannel->backoff->Reset();
701 maybe_start_connecting_locked(subchannel);
702 }
703 gpr_mu_unlock(&subchannel->mu);
704 }
705
706 /*
707 * grpc_subchannel_call implementation
708 */
709
subchannel_call_destroy(void * call,grpc_error * error)710 static void subchannel_call_destroy(void* call, grpc_error* error) {
711 GPR_TIMER_SCOPE("grpc_subchannel_call_unref.destroy", 0);
712 grpc_subchannel_call* c = static_cast<grpc_subchannel_call*>(call);
713 grpc_core::ConnectedSubchannel* connection = c->connection;
714 grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), nullptr,
715 c->schedule_closure_after_destroy);
716 connection->Unref(DEBUG_LOCATION, "subchannel_call");
717 }
718
grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call * call,grpc_closure * closure)719 void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call* call,
720 grpc_closure* closure) {
721 GPR_ASSERT(call->schedule_closure_after_destroy == nullptr);
722 GPR_ASSERT(closure != nullptr);
723 call->schedule_closure_after_destroy = closure;
724 }
725
grpc_subchannel_call_ref(grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)726 grpc_subchannel_call* grpc_subchannel_call_ref(
727 grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
728 GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
729 return c;
730 }
731
grpc_subchannel_call_unref(grpc_subchannel_call * c GRPC_SUBCHANNEL_REF_EXTRA_ARGS)732 void grpc_subchannel_call_unref(
733 grpc_subchannel_call* c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
734 GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON);
735 }
736
grpc_subchannel_call_process_op(grpc_subchannel_call * call,grpc_transport_stream_op_batch * batch)737 void grpc_subchannel_call_process_op(grpc_subchannel_call* call,
738 grpc_transport_stream_op_batch* batch) {
739 GPR_TIMER_SCOPE("grpc_subchannel_call_process_op", 0);
740 grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
741 grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
742 GRPC_CALL_LOG_OP(GPR_INFO, top_elem, batch);
743 top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
744 }
745
746 grpc_core::RefCountedPtr<grpc_core::ConnectedSubchannel>
grpc_subchannel_get_connected_subchannel(grpc_subchannel * c)747 grpc_subchannel_get_connected_subchannel(grpc_subchannel* c) {
748 gpr_mu_lock(&c->mu);
749 auto copy = c->connected_subchannel;
750 gpr_mu_unlock(&c->mu);
751 return copy;
752 }
753
grpc_subchannel_get_key(const grpc_subchannel * subchannel)754 const grpc_subchannel_key* grpc_subchannel_get_key(
755 const grpc_subchannel* subchannel) {
756 return subchannel->key;
757 }
758
grpc_connected_subchannel_call_get_parent_data(grpc_subchannel_call * subchannel_call)759 void* grpc_connected_subchannel_call_get_parent_data(
760 grpc_subchannel_call* subchannel_call) {
761 grpc_channel_stack* chanstk = subchannel_call->connection->channel_stack();
762 return (char*)subchannel_call + sizeof(grpc_subchannel_call) +
763 chanstk->call_stack_size;
764 }
765
grpc_subchannel_call_get_call_stack(grpc_subchannel_call * subchannel_call)766 grpc_call_stack* grpc_subchannel_call_get_call_stack(
767 grpc_subchannel_call* subchannel_call) {
768 return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
769 }
770
grpc_uri_to_sockaddr(const char * uri_str,grpc_resolved_address * addr)771 static void grpc_uri_to_sockaddr(const char* uri_str,
772 grpc_resolved_address* addr) {
773 grpc_uri* uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
774 GPR_ASSERT(uri != nullptr);
775 if (!grpc_parse_uri(uri, addr)) memset(addr, 0, sizeof(*addr));
776 grpc_uri_destroy(uri);
777 }
778
grpc_get_subchannel_address_arg(const grpc_channel_args * args,grpc_resolved_address * addr)779 void grpc_get_subchannel_address_arg(const grpc_channel_args* args,
780 grpc_resolved_address* addr) {
781 const char* addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
782 memset(addr, 0, sizeof(*addr));
783 if (*addr_uri_str != '\0') {
784 grpc_uri_to_sockaddr(addr_uri_str, addr);
785 }
786 }
787
grpc_subchannel_get_target(grpc_subchannel * subchannel)788 const char* grpc_subchannel_get_target(grpc_subchannel* subchannel) {
789 const grpc_arg* addr_arg =
790 grpc_channel_args_find(subchannel->args, GRPC_ARG_SUBCHANNEL_ADDRESS);
791 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
792 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
793 return addr_str;
794 }
795
grpc_get_subchannel_address_uri_arg(const grpc_channel_args * args)796 const char* grpc_get_subchannel_address_uri_arg(const grpc_channel_args* args) {
797 const grpc_arg* addr_arg =
798 grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS);
799 const char* addr_str = grpc_channel_arg_get_string(addr_arg);
800 GPR_ASSERT(addr_str != nullptr); // Should have been set by LB policy.
801 return addr_str;
802 }
803
grpc_create_subchannel_address_arg(const grpc_resolved_address * addr)804 grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address* addr) {
805 return grpc_channel_arg_string_create(
806 (char*)GRPC_ARG_SUBCHANNEL_ADDRESS,
807 addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""));
808 }
809
810 namespace grpc_core {
811
ConnectedSubchannel(grpc_channel_stack * channel_stack,channelz::SubchannelNode * channelz_subchannel)812 ConnectedSubchannel::ConnectedSubchannel(
813 grpc_channel_stack* channel_stack,
814 channelz::SubchannelNode* channelz_subchannel)
815 : RefCountedWithTracing<ConnectedSubchannel>(&grpc_trace_stream_refcount),
816 channel_stack_(channel_stack),
817 channelz_subchannel_(channelz_subchannel) {}
818
~ConnectedSubchannel()819 ConnectedSubchannel::~ConnectedSubchannel() {
820 GRPC_CHANNEL_STACK_UNREF(channel_stack_, "connected_subchannel_dtor");
821 }
822
NotifyOnStateChange(grpc_pollset_set * interested_parties,grpc_connectivity_state * state,grpc_closure * closure)823 void ConnectedSubchannel::NotifyOnStateChange(
824 grpc_pollset_set* interested_parties, grpc_connectivity_state* state,
825 grpc_closure* closure) {
826 grpc_transport_op* op = grpc_make_transport_op(nullptr);
827 grpc_channel_element* elem;
828 op->connectivity_state = state;
829 op->on_connectivity_state_change = closure;
830 op->bind_pollset_set = interested_parties;
831 elem = grpc_channel_stack_element(channel_stack_, 0);
832 elem->filter->start_transport_op(elem, op);
833 }
834
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)835 void ConnectedSubchannel::Ping(grpc_closure* on_initiate,
836 grpc_closure* on_ack) {
837 grpc_transport_op* op = grpc_make_transport_op(nullptr);
838 grpc_channel_element* elem;
839 op->send_ping.on_initiate = on_initiate;
840 op->send_ping.on_ack = on_ack;
841 elem = grpc_channel_stack_element(channel_stack_, 0);
842 elem->filter->start_transport_op(elem, op);
843 }
844
CreateCall(const CallArgs & args,grpc_subchannel_call ** call)845 grpc_error* ConnectedSubchannel::CreateCall(const CallArgs& args,
846 grpc_subchannel_call** call) {
847 size_t allocation_size =
848 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(grpc_subchannel_call));
849 if (args.parent_data_size > 0) {
850 allocation_size +=
851 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(channel_stack_->call_stack_size) +
852 args.parent_data_size;
853 } else {
854 allocation_size += channel_stack_->call_stack_size;
855 }
856 *call = static_cast<grpc_subchannel_call*>(
857 gpr_arena_alloc(args.arena, allocation_size));
858 grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
859 RefCountedPtr<ConnectedSubchannel> connection =
860 Ref(DEBUG_LOCATION, "subchannel_call");
861 connection.release(); // Ref is passed to the grpc_subchannel_call object.
862 (*call)->connection = this;
863 const grpc_call_element_args call_args = {
864 callstk, /* call_stack */
865 nullptr, /* server_transport_data */
866 args.context, /* context */
867 args.path, /* path */
868 args.start_time, /* start_time */
869 args.deadline, /* deadline */
870 args.arena, /* arena */
871 args.call_combiner /* call_combiner */
872 };
873 grpc_error* error = grpc_call_stack_init(
874 channel_stack_, 1, subchannel_call_destroy, *call, &call_args);
875 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
876 const char* error_string = grpc_error_string(error);
877 gpr_log(GPR_ERROR, "error: %s", error_string);
878 return error;
879 }
880 grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
881 return GRPC_ERROR_NONE;
882 }
883
884 } // namespace grpc_core
885