1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/client_channel.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <string.h>
28
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32 #include <grpc/support/sync.h>
33
34 #include "src/core/ext/filters/client_channel/backup_poller.h"
35 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
36 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
37 #include "src/core/ext/filters/client_channel/method_params.h"
38 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
39 #include "src/core/ext/filters/client_channel/resolver_registry.h"
40 #include "src/core/ext/filters/client_channel/retry_throttle.h"
41 #include "src/core/ext/filters/client_channel/subchannel.h"
42 #include "src/core/ext/filters/deadline/deadline_filter.h"
43 #include "src/core/lib/backoff/backoff.h"
44 #include "src/core/lib/channel/channel_args.h"
45 #include "src/core/lib/channel/connected_channel.h"
46 #include "src/core/lib/channel/status_util.h"
47 #include "src/core/lib/gpr/string.h"
48 #include "src/core/lib/gprpp/inlined_vector.h"
49 #include "src/core/lib/gprpp/manual_constructor.h"
50 #include "src/core/lib/iomgr/combiner.h"
51 #include "src/core/lib/iomgr/iomgr.h"
52 #include "src/core/lib/iomgr/polling_entity.h"
53 #include "src/core/lib/profiling/timers.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/slice/slice_string_helpers.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/transport/connectivity_state.h"
58 #include "src/core/lib/transport/error_utils.h"
59 #include "src/core/lib/transport/metadata.h"
60 #include "src/core/lib/transport/metadata_batch.h"
61 #include "src/core/lib/transport/service_config.h"
62 #include "src/core/lib/transport/static_metadata.h"
63 #include "src/core/lib/transport/status_metadata.h"
64
65 using grpc_core::internal::ClientChannelMethodParams;
66 using grpc_core::internal::ServerRetryThrottleData;
67
68 /* Client channel implementation */
69
70 // By default, we buffer 256 KiB per RPC for retries.
71 // TODO(roth): Do we have any data to suggest a better value?
72 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
73
74 // This value was picked arbitrarily. It can be changed if there is
75 // any even moderately compelling reason to do so.
76 #define RETRY_BACKOFF_JITTER 0.2
77
78 grpc_core::TraceFlag grpc_client_channel_trace(false, "client_channel");
79
80 /*************************************************************************
81 * CHANNEL-WIDE FUNCTIONS
82 */
83
84 struct external_connectivity_watcher;
85
86 typedef grpc_core::SliceHashTable<
87 grpc_core::RefCountedPtr<ClientChannelMethodParams>>
88 MethodParamsTable;
89
90 typedef struct client_channel_channel_data {
91 grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
92 bool started_resolving;
93 bool deadline_checking_enabled;
94 grpc_client_channel_factory* client_channel_factory;
95 bool enable_retries;
96 size_t per_rpc_retry_buffer_size;
97
98 /** combiner protecting all variables below in this data structure */
99 grpc_combiner* combiner;
100 /** currently active load balancer */
101 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> lb_policy;
102 /** retry throttle data */
103 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
104 /** maps method names to method_parameters structs */
105 grpc_core::RefCountedPtr<MethodParamsTable> method_params_table;
106 /** incoming resolver result - set by resolver.next() */
107 grpc_channel_args* resolver_result;
108 /** a list of closures that are all waiting for resolver result to come in */
109 grpc_closure_list waiting_for_resolver_result_closures;
110 /** resolver callback */
111 grpc_closure on_resolver_result_changed;
112 /** connectivity state being tracked */
113 grpc_connectivity_state_tracker state_tracker;
114 /** when an lb_policy arrives, should we try to exit idle */
115 bool exit_idle_when_lb_policy_arrives;
116 /** owning stack */
117 grpc_channel_stack* owning_stack;
118 /** interested parties (owned) */
119 grpc_pollset_set* interested_parties;
120
121 /* external_connectivity_watcher_list head is guarded by its own mutex, since
122 * counts need to be grabbed immediately without polling on a cq */
123 gpr_mu external_connectivity_watcher_list_mu;
124 struct external_connectivity_watcher* external_connectivity_watcher_list_head;
125
126 /* the following properties are guarded by a mutex since APIs require them
127 to be instantaneously available */
128 gpr_mu info_mu;
129 grpc_core::UniquePtr<char> info_lb_policy_name;
130 /** service config in JSON form */
131 grpc_core::UniquePtr<char> info_service_config_json;
132 } channel_data;
133
134 typedef struct {
135 channel_data* chand;
136 /** used as an identifier, don't dereference it because the LB policy may be
137 * non-existing when the callback is run */
138 grpc_core::LoadBalancingPolicy* lb_policy;
139 grpc_closure closure;
140 } reresolution_request_args;
141
142 /** We create one watcher for each new lb_policy that is returned from a
143 resolver, to watch for state changes from the lb_policy. When a state
144 change is seen, we update the channel, and create a new watcher. */
145 typedef struct {
146 channel_data* chand;
147 grpc_closure on_changed;
148 grpc_connectivity_state state;
149 grpc_core::LoadBalancingPolicy* lb_policy;
150 } lb_policy_connectivity_watcher;
151
152 static void watch_lb_policy_locked(channel_data* chand,
153 grpc_core::LoadBalancingPolicy* lb_policy,
154 grpc_connectivity_state current_state);
155
set_channel_connectivity_state_locked(channel_data * chand,grpc_connectivity_state state,grpc_error * error,const char * reason)156 static void set_channel_connectivity_state_locked(channel_data* chand,
157 grpc_connectivity_state state,
158 grpc_error* error,
159 const char* reason) {
160 /* TODO: Improve failure handling:
161 * - Make it possible for policies to return GRPC_CHANNEL_TRANSIENT_FAILURE.
162 * - Hand over pending picks from old policies during the switch that happens
163 * when resolver provides an update. */
164 if (chand->lb_policy != nullptr) {
165 if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
166 /* cancel picks with wait_for_ready=false */
167 chand->lb_policy->CancelMatchingPicksLocked(
168 /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY,
169 /* check= */ 0, GRPC_ERROR_REF(error));
170 } else if (state == GRPC_CHANNEL_SHUTDOWN) {
171 /* cancel all picks */
172 chand->lb_policy->CancelMatchingPicksLocked(/* mask= */ 0, /* check= */ 0,
173 GRPC_ERROR_REF(error));
174 }
175 }
176 if (grpc_client_channel_trace.enabled()) {
177 gpr_log(GPR_INFO, "chand=%p: setting connectivity state to %s", chand,
178 grpc_connectivity_state_name(state));
179 }
180 grpc_connectivity_state_set(&chand->state_tracker, state, error, reason);
181 }
182
on_lb_policy_state_changed_locked(void * arg,grpc_error * error)183 static void on_lb_policy_state_changed_locked(void* arg, grpc_error* error) {
184 lb_policy_connectivity_watcher* w =
185 static_cast<lb_policy_connectivity_watcher*>(arg);
186 /* check if the notification is for the latest policy */
187 if (w->lb_policy == w->chand->lb_policy.get()) {
188 if (grpc_client_channel_trace.enabled()) {
189 gpr_log(GPR_INFO, "chand=%p: lb_policy=%p state changed to %s", w->chand,
190 w->lb_policy, grpc_connectivity_state_name(w->state));
191 }
192 set_channel_connectivity_state_locked(w->chand, w->state,
193 GRPC_ERROR_REF(error), "lb_changed");
194 if (w->state != GRPC_CHANNEL_SHUTDOWN) {
195 watch_lb_policy_locked(w->chand, w->lb_policy, w->state);
196 }
197 }
198 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack, "watch_lb_policy");
199 gpr_free(w);
200 }
201
watch_lb_policy_locked(channel_data * chand,grpc_core::LoadBalancingPolicy * lb_policy,grpc_connectivity_state current_state)202 static void watch_lb_policy_locked(channel_data* chand,
203 grpc_core::LoadBalancingPolicy* lb_policy,
204 grpc_connectivity_state current_state) {
205 lb_policy_connectivity_watcher* w =
206 static_cast<lb_policy_connectivity_watcher*>(gpr_malloc(sizeof(*w)));
207 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy");
208 w->chand = chand;
209 GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w,
210 grpc_combiner_scheduler(chand->combiner));
211 w->state = current_state;
212 w->lb_policy = lb_policy;
213 lb_policy->NotifyOnStateChangeLocked(&w->state, &w->on_changed);
214 }
215
start_resolving_locked(channel_data * chand)216 static void start_resolving_locked(channel_data* chand) {
217 if (grpc_client_channel_trace.enabled()) {
218 gpr_log(GPR_INFO, "chand=%p: starting name resolution", chand);
219 }
220 GPR_ASSERT(!chand->started_resolving);
221 chand->started_resolving = true;
222 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver");
223 chand->resolver->NextLocked(&chand->resolver_result,
224 &chand->on_resolver_result_changed);
225 }
226
227 typedef struct {
228 char* server_name;
229 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
230 } service_config_parsing_state;
231
parse_retry_throttle_params(const grpc_json * field,service_config_parsing_state * parsing_state)232 static void parse_retry_throttle_params(
233 const grpc_json* field, service_config_parsing_state* parsing_state) {
234 if (strcmp(field->key, "retryThrottling") == 0) {
235 if (parsing_state->retry_throttle_data != nullptr) return; // Duplicate.
236 if (field->type != GRPC_JSON_OBJECT) return;
237 int max_milli_tokens = 0;
238 int milli_token_ratio = 0;
239 for (grpc_json* sub_field = field->child; sub_field != nullptr;
240 sub_field = sub_field->next) {
241 if (sub_field->key == nullptr) return;
242 if (strcmp(sub_field->key, "maxTokens") == 0) {
243 if (max_milli_tokens != 0) return; // Duplicate.
244 if (sub_field->type != GRPC_JSON_NUMBER) return;
245 max_milli_tokens = gpr_parse_nonnegative_int(sub_field->value);
246 if (max_milli_tokens == -1) return;
247 max_milli_tokens *= 1000;
248 } else if (strcmp(sub_field->key, "tokenRatio") == 0) {
249 if (milli_token_ratio != 0) return; // Duplicate.
250 if (sub_field->type != GRPC_JSON_NUMBER) return;
251 // We support up to 3 decimal digits.
252 size_t whole_len = strlen(sub_field->value);
253 uint32_t multiplier = 1;
254 uint32_t decimal_value = 0;
255 const char* decimal_point = strchr(sub_field->value, '.');
256 if (decimal_point != nullptr) {
257 whole_len = static_cast<size_t>(decimal_point - sub_field->value);
258 multiplier = 1000;
259 size_t decimal_len = strlen(decimal_point + 1);
260 if (decimal_len > 3) decimal_len = 3;
261 if (!gpr_parse_bytes_to_uint32(decimal_point + 1, decimal_len,
262 &decimal_value)) {
263 return;
264 }
265 uint32_t decimal_multiplier = 1;
266 for (size_t i = 0; i < (3 - decimal_len); ++i) {
267 decimal_multiplier *= 10;
268 }
269 decimal_value *= decimal_multiplier;
270 }
271 uint32_t whole_value;
272 if (!gpr_parse_bytes_to_uint32(sub_field->value, whole_len,
273 &whole_value)) {
274 return;
275 }
276 milli_token_ratio =
277 static_cast<int>((whole_value * multiplier) + decimal_value);
278 if (milli_token_ratio <= 0) return;
279 }
280 }
281 parsing_state->retry_throttle_data =
282 grpc_core::internal::ServerRetryThrottleMap::GetDataForServer(
283 parsing_state->server_name, max_milli_tokens, milli_token_ratio);
284 }
285 }
286
287 // Invoked from the resolver NextLocked() callback when the resolver
288 // is shutting down.
on_resolver_shutdown_locked(channel_data * chand,grpc_error * error)289 static void on_resolver_shutdown_locked(channel_data* chand,
290 grpc_error* error) {
291 if (grpc_client_channel_trace.enabled()) {
292 gpr_log(GPR_INFO, "chand=%p: shutting down", chand);
293 }
294 if (chand->lb_policy != nullptr) {
295 if (grpc_client_channel_trace.enabled()) {
296 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
297 chand->lb_policy.get());
298 }
299 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
300 chand->interested_parties);
301 chand->lb_policy.reset();
302 }
303 if (chand->resolver != nullptr) {
304 // This should never happen; it can only be triggered by a resolver
305 // implementation spotaneously deciding to report shutdown without
306 // being orphaned. This code is included just to be defensive.
307 if (grpc_client_channel_trace.enabled()) {
308 gpr_log(GPR_INFO, "chand=%p: spontaneous shutdown from resolver %p",
309 chand, chand->resolver.get());
310 }
311 chand->resolver.reset();
312 set_channel_connectivity_state_locked(
313 chand, GRPC_CHANNEL_SHUTDOWN,
314 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
315 "Resolver spontaneous shutdown", &error, 1),
316 "resolver_spontaneous_shutdown");
317 }
318 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
319 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
320 "Channel disconnected", &error, 1));
321 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
322 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "resolver");
323 grpc_channel_args_destroy(chand->resolver_result);
324 chand->resolver_result = nullptr;
325 GRPC_ERROR_UNREF(error);
326 }
327
328 // Returns the LB policy name from the resolver result.
329 static grpc_core::UniquePtr<char>
get_lb_policy_name_from_resolver_result_locked(channel_data * chand)330 get_lb_policy_name_from_resolver_result_locked(channel_data* chand) {
331 // Find LB policy name in channel args.
332 const grpc_arg* channel_arg =
333 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_POLICY_NAME);
334 const char* lb_policy_name = grpc_channel_arg_get_string(channel_arg);
335 // Special case: If at least one balancer address is present, we use
336 // the grpclb policy, regardless of what the resolver actually specified.
337 channel_arg =
338 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES);
339 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_POINTER) {
340 grpc_lb_addresses* addresses =
341 static_cast<grpc_lb_addresses*>(channel_arg->value.pointer.p);
342 if (grpc_lb_addresses_contains_balancer_address(*addresses)) {
343 if (lb_policy_name != nullptr &&
344 gpr_stricmp(lb_policy_name, "grpclb") != 0) {
345 gpr_log(GPR_INFO,
346 "resolver requested LB policy %s but provided at least one "
347 "balancer address -- forcing use of grpclb LB policy",
348 lb_policy_name);
349 }
350 lb_policy_name = "grpclb";
351 }
352 }
353 // Use pick_first if nothing was specified and we didn't select grpclb
354 // above.
355 if (lb_policy_name == nullptr) lb_policy_name = "pick_first";
356 return grpc_core::UniquePtr<char>(gpr_strdup(lb_policy_name));
357 }
358
request_reresolution_locked(void * arg,grpc_error * error)359 static void request_reresolution_locked(void* arg, grpc_error* error) {
360 reresolution_request_args* args =
361 static_cast<reresolution_request_args*>(arg);
362 channel_data* chand = args->chand;
363 // If this invocation is for a stale LB policy, treat it as an LB shutdown
364 // signal.
365 if (args->lb_policy != chand->lb_policy.get() || error != GRPC_ERROR_NONE ||
366 chand->resolver == nullptr) {
367 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "re-resolution");
368 gpr_free(args);
369 return;
370 }
371 if (grpc_client_channel_trace.enabled()) {
372 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand);
373 }
374 chand->resolver->RequestReresolutionLocked();
375 // Give back the closure to the LB policy.
376 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
377 }
378
379 // Creates a new LB policy, replacing any previous one.
380 // If the new policy is created successfully, sets *connectivity_state and
381 // *connectivity_error to its initial connectivity state; otherwise,
382 // leaves them unchanged.
create_new_lb_policy_locked(channel_data * chand,char * lb_policy_name,grpc_connectivity_state * connectivity_state,grpc_error ** connectivity_error)383 static void create_new_lb_policy_locked(
384 channel_data* chand, char* lb_policy_name,
385 grpc_connectivity_state* connectivity_state,
386 grpc_error** connectivity_error) {
387 grpc_core::LoadBalancingPolicy::Args lb_policy_args;
388 lb_policy_args.combiner = chand->combiner;
389 lb_policy_args.client_channel_factory = chand->client_channel_factory;
390 lb_policy_args.args = chand->resolver_result;
391 grpc_core::OrphanablePtr<grpc_core::LoadBalancingPolicy> new_lb_policy =
392 grpc_core::LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
393 lb_policy_name, lb_policy_args);
394 if (GPR_UNLIKELY(new_lb_policy == nullptr)) {
395 gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name);
396 } else {
397 if (grpc_client_channel_trace.enabled()) {
398 gpr_log(GPR_INFO, "chand=%p: created new LB policy \"%s\" (%p)", chand,
399 lb_policy_name, new_lb_policy.get());
400 }
401 // Swap out the LB policy and update the fds in
402 // chand->interested_parties.
403 if (chand->lb_policy != nullptr) {
404 if (grpc_client_channel_trace.enabled()) {
405 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", chand,
406 chand->lb_policy.get());
407 }
408 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
409 chand->interested_parties);
410 chand->lb_policy->HandOffPendingPicksLocked(new_lb_policy.get());
411 }
412 chand->lb_policy = std::move(new_lb_policy);
413 grpc_pollset_set_add_pollset_set(chand->lb_policy->interested_parties(),
414 chand->interested_parties);
415 // Set up re-resolution callback.
416 reresolution_request_args* args =
417 static_cast<reresolution_request_args*>(gpr_zalloc(sizeof(*args)));
418 args->chand = chand;
419 args->lb_policy = chand->lb_policy.get();
420 GRPC_CLOSURE_INIT(&args->closure, request_reresolution_locked, args,
421 grpc_combiner_scheduler(chand->combiner));
422 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "re-resolution");
423 chand->lb_policy->SetReresolutionClosureLocked(&args->closure);
424 // Get the new LB policy's initial connectivity state and start a
425 // connectivity watch.
426 GRPC_ERROR_UNREF(*connectivity_error);
427 *connectivity_state =
428 chand->lb_policy->CheckConnectivityLocked(connectivity_error);
429 if (chand->exit_idle_when_lb_policy_arrives) {
430 chand->lb_policy->ExitIdleLocked();
431 chand->exit_idle_when_lb_policy_arrives = false;
432 }
433 watch_lb_policy_locked(chand, chand->lb_policy.get(), *connectivity_state);
434 }
435 }
436
437 // Returns the service config (as a JSON string) from the resolver result.
438 // Also updates state in chand.
439 static grpc_core::UniquePtr<char>
get_service_config_from_resolver_result_locked(channel_data * chand)440 get_service_config_from_resolver_result_locked(channel_data* chand) {
441 const grpc_arg* channel_arg =
442 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG);
443 const char* service_config_json = grpc_channel_arg_get_string(channel_arg);
444 if (service_config_json != nullptr) {
445 if (grpc_client_channel_trace.enabled()) {
446 gpr_log(GPR_INFO, "chand=%p: resolver returned service config: \"%s\"",
447 chand, service_config_json);
448 }
449 grpc_core::UniquePtr<grpc_core::ServiceConfig> service_config =
450 grpc_core::ServiceConfig::Create(service_config_json);
451 if (service_config != nullptr) {
452 if (chand->enable_retries) {
453 channel_arg =
454 grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVER_URI);
455 const char* server_uri = grpc_channel_arg_get_string(channel_arg);
456 GPR_ASSERT(server_uri != nullptr);
457 grpc_uri* uri = grpc_uri_parse(server_uri, true);
458 GPR_ASSERT(uri->path[0] != '\0');
459 service_config_parsing_state parsing_state;
460 parsing_state.server_name =
461 uri->path[0] == '/' ? uri->path + 1 : uri->path;
462 service_config->ParseGlobalParams(parse_retry_throttle_params,
463 &parsing_state);
464 grpc_uri_destroy(uri);
465 chand->retry_throttle_data =
466 std::move(parsing_state.retry_throttle_data);
467 }
468 chand->method_params_table = service_config->CreateMethodConfigTable(
469 ClientChannelMethodParams::CreateFromJson);
470 }
471 }
472 return grpc_core::UniquePtr<char>(gpr_strdup(service_config_json));
473 }
474
475 // Callback invoked when a resolver result is available.
on_resolver_result_changed_locked(void * arg,grpc_error * error)476 static void on_resolver_result_changed_locked(void* arg, grpc_error* error) {
477 channel_data* chand = static_cast<channel_data*>(arg);
478 if (grpc_client_channel_trace.enabled()) {
479 const char* disposition =
480 chand->resolver_result != nullptr
481 ? ""
482 : (error == GRPC_ERROR_NONE ? " (transient error)"
483 : " (resolver shutdown)");
484 gpr_log(GPR_INFO,
485 "chand=%p: got resolver result: resolver_result=%p error=%s%s",
486 chand, chand->resolver_result, grpc_error_string(error),
487 disposition);
488 }
489 // Handle shutdown.
490 if (error != GRPC_ERROR_NONE || chand->resolver == nullptr) {
491 on_resolver_shutdown_locked(chand, GRPC_ERROR_REF(error));
492 return;
493 }
494 // Data used to set the channel's connectivity state.
495 bool set_connectivity_state = true;
496 grpc_connectivity_state connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE;
497 grpc_error* connectivity_error =
498 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy");
499 // chand->resolver_result will be null in the case of a transient
500 // resolution error. In that case, we don't have any new result to
501 // process, which means that we keep using the previous result (if any).
502 if (chand->resolver_result == nullptr) {
503 if (grpc_client_channel_trace.enabled()) {
504 gpr_log(GPR_INFO, "chand=%p: resolver transient failure", chand);
505 }
506 } else {
507 grpc_core::UniquePtr<char> lb_policy_name =
508 get_lb_policy_name_from_resolver_result_locked(chand);
509 // Check to see if we're already using the right LB policy.
510 // Note: It's safe to use chand->info_lb_policy_name here without
511 // taking a lock on chand->info_mu, because this function is the
512 // only thing that modifies its value, and it can only be invoked
513 // once at any given time.
514 bool lb_policy_name_changed = chand->info_lb_policy_name == nullptr ||
515 gpr_stricmp(chand->info_lb_policy_name.get(),
516 lb_policy_name.get()) != 0;
517 if (chand->lb_policy != nullptr && !lb_policy_name_changed) {
518 // Continue using the same LB policy. Update with new addresses.
519 if (grpc_client_channel_trace.enabled()) {
520 gpr_log(GPR_INFO, "chand=%p: updating existing LB policy \"%s\" (%p)",
521 chand, lb_policy_name.get(), chand->lb_policy.get());
522 }
523 chand->lb_policy->UpdateLocked(*chand->resolver_result);
524 // No need to set the channel's connectivity state; the existing
525 // watch on the LB policy will take care of that.
526 set_connectivity_state = false;
527 } else {
528 // Instantiate new LB policy.
529 create_new_lb_policy_locked(chand, lb_policy_name.get(),
530 &connectivity_state, &connectivity_error);
531 }
532 // Find service config.
533 grpc_core::UniquePtr<char> service_config_json =
534 get_service_config_from_resolver_result_locked(chand);
535 // Swap out the data used by cc_get_channel_info().
536 gpr_mu_lock(&chand->info_mu);
537 chand->info_lb_policy_name = std::move(lb_policy_name);
538 chand->info_service_config_json = std::move(service_config_json);
539 gpr_mu_unlock(&chand->info_mu);
540 // Clean up.
541 grpc_channel_args_destroy(chand->resolver_result);
542 chand->resolver_result = nullptr;
543 }
544 // Set the channel's connectivity state if needed.
545 if (set_connectivity_state) {
546 set_channel_connectivity_state_locked(
547 chand, connectivity_state, connectivity_error, "resolver_result");
548 } else {
549 GRPC_ERROR_UNREF(connectivity_error);
550 }
551 // Invoke closures that were waiting for results and renew the watch.
552 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
553 chand->resolver->NextLocked(&chand->resolver_result,
554 &chand->on_resolver_result_changed);
555 }
556
start_transport_op_locked(void * arg,grpc_error * error_ignored)557 static void start_transport_op_locked(void* arg, grpc_error* error_ignored) {
558 grpc_transport_op* op = static_cast<grpc_transport_op*>(arg);
559 grpc_channel_element* elem =
560 static_cast<grpc_channel_element*>(op->handler_private.extra_arg);
561 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
562
563 if (op->on_connectivity_state_change != nullptr) {
564 grpc_connectivity_state_notify_on_state_change(
565 &chand->state_tracker, op->connectivity_state,
566 op->on_connectivity_state_change);
567 op->on_connectivity_state_change = nullptr;
568 op->connectivity_state = nullptr;
569 }
570
571 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
572 if (chand->lb_policy == nullptr) {
573 grpc_error* error =
574 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Ping with no load balancing");
575 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
576 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
577 } else {
578 grpc_error* error = GRPC_ERROR_NONE;
579 grpc_core::LoadBalancingPolicy::PickState pick_state;
580 pick_state.initial_metadata = nullptr;
581 pick_state.initial_metadata_flags = 0;
582 pick_state.on_complete = nullptr;
583 memset(&pick_state.subchannel_call_context, 0,
584 sizeof(pick_state.subchannel_call_context));
585 pick_state.user_data = nullptr;
586 // Pick must return synchronously, because pick_state.on_complete is null.
587 GPR_ASSERT(chand->lb_policy->PickLocked(&pick_state, &error));
588 if (pick_state.connected_subchannel != nullptr) {
589 pick_state.connected_subchannel->Ping(op->send_ping.on_initiate,
590 op->send_ping.on_ack);
591 } else {
592 if (error == GRPC_ERROR_NONE) {
593 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
594 "LB policy dropped call on ping");
595 }
596 GRPC_CLOSURE_SCHED(op->send_ping.on_initiate, GRPC_ERROR_REF(error));
597 GRPC_CLOSURE_SCHED(op->send_ping.on_ack, error);
598 }
599 op->bind_pollset = nullptr;
600 }
601 op->send_ping.on_initiate = nullptr;
602 op->send_ping.on_ack = nullptr;
603 }
604
605 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
606 if (chand->resolver != nullptr) {
607 set_channel_connectivity_state_locked(
608 chand, GRPC_CHANNEL_SHUTDOWN,
609 GRPC_ERROR_REF(op->disconnect_with_error), "disconnect");
610 chand->resolver.reset();
611 if (!chand->started_resolving) {
612 grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures,
613 GRPC_ERROR_REF(op->disconnect_with_error));
614 GRPC_CLOSURE_LIST_SCHED(&chand->waiting_for_resolver_result_closures);
615 }
616 if (chand->lb_policy != nullptr) {
617 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
618 chand->interested_parties);
619 chand->lb_policy.reset();
620 }
621 }
622 GRPC_ERROR_UNREF(op->disconnect_with_error);
623 }
624
625 if (op->reset_connect_backoff) {
626 if (chand->resolver != nullptr) {
627 chand->resolver->ResetBackoffLocked();
628 chand->resolver->RequestReresolutionLocked();
629 }
630 if (chand->lb_policy != nullptr) {
631 chand->lb_policy->ResetBackoffLocked();
632 }
633 }
634
635 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "start_transport_op");
636
637 GRPC_CLOSURE_SCHED(op->on_consumed, GRPC_ERROR_NONE);
638 }
639
cc_start_transport_op(grpc_channel_element * elem,grpc_transport_op * op)640 static void cc_start_transport_op(grpc_channel_element* elem,
641 grpc_transport_op* op) {
642 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
643
644 GPR_ASSERT(op->set_accept_stream == false);
645 if (op->bind_pollset != nullptr) {
646 grpc_pollset_set_add_pollset(chand->interested_parties, op->bind_pollset);
647 }
648
649 op->handler_private.extra_arg = elem;
650 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
651 GRPC_CLOSURE_SCHED(
652 GRPC_CLOSURE_INIT(&op->handler_private.closure, start_transport_op_locked,
653 op, grpc_combiner_scheduler(chand->combiner)),
654 GRPC_ERROR_NONE);
655 }
656
cc_get_channel_info(grpc_channel_element * elem,const grpc_channel_info * info)657 static void cc_get_channel_info(grpc_channel_element* elem,
658 const grpc_channel_info* info) {
659 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
660 gpr_mu_lock(&chand->info_mu);
661 if (info->lb_policy_name != nullptr) {
662 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name.get());
663 }
664 if (info->service_config_json != nullptr) {
665 *info->service_config_json =
666 gpr_strdup(chand->info_service_config_json.get());
667 }
668 gpr_mu_unlock(&chand->info_mu);
669 }
670
671 /* Constructor for channel_data */
cc_init_channel_elem(grpc_channel_element * elem,grpc_channel_element_args * args)672 static grpc_error* cc_init_channel_elem(grpc_channel_element* elem,
673 grpc_channel_element_args* args) {
674 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
675 GPR_ASSERT(args->is_last);
676 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
677 // Initialize data members.
678 chand->combiner = grpc_combiner_create();
679 gpr_mu_init(&chand->info_mu);
680 gpr_mu_init(&chand->external_connectivity_watcher_list_mu);
681
682 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
683 chand->external_connectivity_watcher_list_head = nullptr;
684 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
685
686 chand->owning_stack = args->channel_stack;
687 GRPC_CLOSURE_INIT(&chand->on_resolver_result_changed,
688 on_resolver_result_changed_locked, chand,
689 grpc_combiner_scheduler(chand->combiner));
690 chand->interested_parties = grpc_pollset_set_create();
691 grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
692 "client_channel");
693 grpc_client_channel_start_backup_polling(chand->interested_parties);
694 // Record max per-RPC retry buffer size.
695 const grpc_arg* arg = grpc_channel_args_find(
696 args->channel_args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE);
697 chand->per_rpc_retry_buffer_size = (size_t)grpc_channel_arg_get_integer(
698 arg, {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX});
699 // Record enable_retries.
700 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_ENABLE_RETRIES);
701 chand->enable_retries = grpc_channel_arg_get_bool(arg, true);
702 // Record client channel factory.
703 arg = grpc_channel_args_find(args->channel_args,
704 GRPC_ARG_CLIENT_CHANNEL_FACTORY);
705 if (arg == nullptr) {
706 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
707 "Missing client channel factory in args for client channel filter");
708 }
709 if (arg->type != GRPC_ARG_POINTER) {
710 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
711 "client channel factory arg must be a pointer");
712 }
713 grpc_client_channel_factory_ref(
714 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p));
715 chand->client_channel_factory =
716 static_cast<grpc_client_channel_factory*>(arg->value.pointer.p);
717 // Get server name to resolve, using proxy mapper if needed.
718 arg = grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI);
719 if (arg == nullptr) {
720 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
721 "Missing server uri in args for client channel filter");
722 }
723 if (arg->type != GRPC_ARG_STRING) {
724 return GRPC_ERROR_CREATE_FROM_STATIC_STRING(
725 "server uri arg must be a string");
726 }
727 char* proxy_name = nullptr;
728 grpc_channel_args* new_args = nullptr;
729 grpc_proxy_mappers_map_name(arg->value.string, args->channel_args,
730 &proxy_name, &new_args);
731 // Instantiate resolver.
732 chand->resolver = grpc_core::ResolverRegistry::CreateResolver(
733 proxy_name != nullptr ? proxy_name : arg->value.string,
734 new_args != nullptr ? new_args : args->channel_args,
735 chand->interested_parties, chand->combiner);
736 if (proxy_name != nullptr) gpr_free(proxy_name);
737 if (new_args != nullptr) grpc_channel_args_destroy(new_args);
738 if (chand->resolver == nullptr) {
739 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("resolver creation failed");
740 }
741 chand->deadline_checking_enabled =
742 grpc_deadline_checking_enabled(args->channel_args);
743 return GRPC_ERROR_NONE;
744 }
745
746 /* Destructor for channel_data */
cc_destroy_channel_elem(grpc_channel_element * elem)747 static void cc_destroy_channel_elem(grpc_channel_element* elem) {
748 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
749 if (chand->resolver != nullptr) {
750 // The only way we can get here is if we never started resolving,
751 // because we take a ref to the channel stack when we start
752 // resolving and do not release it until the resolver callback is
753 // invoked after the resolver shuts down.
754 chand->resolver.reset();
755 }
756 if (chand->client_channel_factory != nullptr) {
757 grpc_client_channel_factory_unref(chand->client_channel_factory);
758 }
759 if (chand->lb_policy != nullptr) {
760 grpc_pollset_set_del_pollset_set(chand->lb_policy->interested_parties(),
761 chand->interested_parties);
762 chand->lb_policy.reset();
763 }
764 // TODO(roth): Once we convert the filter API to C++, there will no
765 // longer be any need to explicitly reset these smart pointer data members.
766 chand->info_lb_policy_name.reset();
767 chand->info_service_config_json.reset();
768 chand->retry_throttle_data.reset();
769 chand->method_params_table.reset();
770 grpc_client_channel_stop_backup_polling(chand->interested_parties);
771 grpc_connectivity_state_destroy(&chand->state_tracker);
772 grpc_pollset_set_destroy(chand->interested_parties);
773 GRPC_COMBINER_UNREF(chand->combiner, "client_channel");
774 gpr_mu_destroy(&chand->info_mu);
775 gpr_mu_destroy(&chand->external_connectivity_watcher_list_mu);
776 }
777
778 /*************************************************************************
779 * PER-CALL FUNCTIONS
780 */
781
782 // Max number of batches that can be pending on a call at any given
783 // time. This includes one batch for each of the following ops:
784 // recv_initial_metadata
785 // send_initial_metadata
786 // recv_message
787 // send_message
788 // recv_trailing_metadata
789 // send_trailing_metadata
790 #define MAX_PENDING_BATCHES 6
791
792 // Retry support:
793 //
794 // In order to support retries, we act as a proxy for stream op batches.
795 // When we get a batch from the surface, we add it to our list of pending
796 // batches, and we then use those batches to construct separate "child"
797 // batches to be started on the subchannel call. When the child batches
798 // return, we then decide which pending batches have been completed and
799 // schedule their callbacks accordingly. If a subchannel call fails and
800 // we want to retry it, we do a new pick and start again, constructing
801 // new "child" batches for the new subchannel call.
802 //
803 // Note that retries are committed when receiving data from the server
804 // (except for Trailers-Only responses). However, there may be many
805 // send ops started before receiving any data, so we may have already
806 // completed some number of send ops (and returned the completions up to
807 // the surface) by the time we realize that we need to retry. To deal
808 // with this, we cache data for send ops, so that we can replay them on a
809 // different subchannel call even after we have completed the original
810 // batches.
811 //
812 // There are two sets of data to maintain:
813 // - In call_data (in the parent channel), we maintain a list of pending
814 // ops and cached data for send ops.
815 // - In the subchannel call, we maintain state to indicate what ops have
816 // already been sent down to that call.
817 //
818 // When constructing the "child" batches, we compare those two sets of
819 // data to see which batches need to be sent to the subchannel call.
820
821 // TODO(roth): In subsequent PRs:
822 // - add support for transparent retries (including initial metadata)
823 // - figure out how to record stats in census for retries
824 // (census filter is on top of this one)
825 // - add census stats for retries
826
827 // State used for starting a retryable batch on a subchannel call.
828 // This provides its own grpc_transport_stream_op_batch and other data
829 // structures needed to populate the ops in the batch.
830 // We allocate one struct on the arena for each attempt at starting a
831 // batch on a given subchannel call.
832 typedef struct {
833 gpr_refcount refs;
834 grpc_call_element* elem;
835 grpc_subchannel_call* subchannel_call; // Holds a ref.
836 // The batch to use in the subchannel call.
837 // Its payload field points to subchannel_call_retry_state.batch_payload.
838 grpc_transport_stream_op_batch batch;
839 // For intercepting on_complete.
840 grpc_closure on_complete;
841 } subchannel_batch_data;
842
843 // Retry state associated with a subchannel call.
844 // Stored in the parent_data of the subchannel call object.
845 typedef struct {
846 // subchannel_batch_data.batch.payload points to this.
847 grpc_transport_stream_op_batch_payload batch_payload;
848 // For send_initial_metadata.
849 // Note that we need to make a copy of the initial metadata for each
850 // subchannel call instead of just referring to the copy in call_data,
851 // because filters in the subchannel stack will probably add entries,
852 // so we need to start in a pristine state for each attempt of the call.
853 grpc_linked_mdelem* send_initial_metadata_storage;
854 grpc_metadata_batch send_initial_metadata;
855 // For send_message.
856 grpc_core::ManualConstructor<grpc_core::ByteStreamCache::CachingByteStream>
857 send_message;
858 // For send_trailing_metadata.
859 grpc_linked_mdelem* send_trailing_metadata_storage;
860 grpc_metadata_batch send_trailing_metadata;
861 // For intercepting recv_initial_metadata.
862 grpc_metadata_batch recv_initial_metadata;
863 grpc_closure recv_initial_metadata_ready;
864 bool trailing_metadata_available;
865 // For intercepting recv_message.
866 grpc_closure recv_message_ready;
867 grpc_core::OrphanablePtr<grpc_core::ByteStream> recv_message;
868 // For intercepting recv_trailing_metadata.
869 grpc_metadata_batch recv_trailing_metadata;
870 grpc_transport_stream_stats collect_stats;
871 grpc_closure recv_trailing_metadata_ready;
872 // These fields indicate which ops have been started and completed on
873 // this subchannel call.
874 size_t started_send_message_count;
875 size_t completed_send_message_count;
876 size_t started_recv_message_count;
877 size_t completed_recv_message_count;
878 bool started_send_initial_metadata : 1;
879 bool completed_send_initial_metadata : 1;
880 bool started_send_trailing_metadata : 1;
881 bool completed_send_trailing_metadata : 1;
882 bool started_recv_initial_metadata : 1;
883 bool completed_recv_initial_metadata : 1;
884 bool started_recv_trailing_metadata : 1;
885 bool completed_recv_trailing_metadata : 1;
886 // State for callback processing.
887 bool retry_dispatched : 1;
888 subchannel_batch_data* recv_initial_metadata_ready_deferred_batch;
889 grpc_error* recv_initial_metadata_error;
890 subchannel_batch_data* recv_message_ready_deferred_batch;
891 grpc_error* recv_message_error;
892 subchannel_batch_data* recv_trailing_metadata_internal_batch;
893 } subchannel_call_retry_state;
894
895 // Pending batches stored in call data.
896 typedef struct {
897 // The pending batch. If nullptr, this slot is empty.
898 grpc_transport_stream_op_batch* batch;
899 // Indicates whether payload for send ops has been cached in call data.
900 bool send_ops_cached;
901 } pending_batch;
902
903 /** Call data. Holds a pointer to grpc_subchannel_call and the
904 associated machinery to create such a pointer.
905 Handles queueing of stream ops until a call object is ready, waiting
906 for initial metadata before trying to create a call object,
907 and handling cancellation gracefully. */
908 typedef struct client_channel_call_data {
909 // State for handling deadlines.
910 // The code in deadline_filter.c requires this to be the first field.
911 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
912 // and this struct both independently store pointers to the call stack
913 // and call combiner. If/when we have time, find a way to avoid this
914 // without breaking the grpc_deadline_state abstraction.
915 grpc_deadline_state deadline_state;
916
917 grpc_slice path; // Request path.
918 gpr_timespec call_start_time;
919 grpc_millis deadline;
920 gpr_arena* arena;
921 grpc_call_stack* owning_call;
922 grpc_call_combiner* call_combiner;
923
924 grpc_core::RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
925 grpc_core::RefCountedPtr<ClientChannelMethodParams> method_params;
926
927 grpc_subchannel_call* subchannel_call;
928
929 // Set when we get a cancel_stream op.
930 grpc_error* cancel_error;
931
932 grpc_core::LoadBalancingPolicy::PickState pick;
933 grpc_closure pick_closure;
934 grpc_closure pick_cancel_closure;
935
936 // state needed to support channelz interception of recv trailing metadata.
937 grpc_closure recv_trailing_metadata_ready_channelz;
938 grpc_closure* original_recv_trailing_metadata;
939 grpc_metadata_batch* recv_trailing_metadata;
940
941 grpc_polling_entity* pollent;
942 bool pollent_added_to_interested_parties;
943
944 // Batches are added to this list when received from above.
945 // They are removed when we are done handling the batch (i.e., when
946 // either we have invoked all of the batch's callbacks or we have
947 // passed the batch down to the subchannel call and are not
948 // intercepting any of its callbacks).
949 pending_batch pending_batches[MAX_PENDING_BATCHES];
950 bool pending_send_initial_metadata : 1;
951 bool pending_send_message : 1;
952 bool pending_send_trailing_metadata : 1;
953
954 // Retry state.
955 bool enable_retries : 1;
956 bool retry_committed : 1;
957 bool last_attempt_got_server_pushback : 1;
958 int num_attempts_completed;
959 size_t bytes_buffered_for_retry;
960 grpc_core::ManualConstructor<grpc_core::BackOff> retry_backoff;
961 grpc_timer retry_timer;
962
963 // The number of pending retriable subchannel batches containing send ops.
964 // We hold a ref to the call stack while this is non-zero, since replay
965 // batches may not complete until after all callbacks have been returned
966 // to the surface, and we need to make sure that the call is not destroyed
967 // until all of these batches have completed.
968 // Note that we actually only need to track replay batches, but it's
969 // easier to track all batches with send ops.
970 int num_pending_retriable_subchannel_send_batches;
971
972 // Cached data for retrying send ops.
973 // send_initial_metadata
974 bool seen_send_initial_metadata;
975 grpc_linked_mdelem* send_initial_metadata_storage;
976 grpc_metadata_batch send_initial_metadata;
977 uint32_t send_initial_metadata_flags;
978 gpr_atm* peer_string;
979 // send_message
980 // When we get a send_message op, we replace the original byte stream
981 // with a CachingByteStream that caches the slices to a local buffer for
982 // use in retries.
983 // Note: We inline the cache for the first 3 send_message ops and use
984 // dynamic allocation after that. This number was essentially picked
985 // at random; it could be changed in the future to tune performance.
986 grpc_core::ManualConstructor<
987 grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>>
988 send_messages;
989 // send_trailing_metadata
990 bool seen_send_trailing_metadata;
991 grpc_linked_mdelem* send_trailing_metadata_storage;
992 grpc_metadata_batch send_trailing_metadata;
993 } call_data;
994
995 // Forward declarations.
996 static void retry_commit(grpc_call_element* elem,
997 subchannel_call_retry_state* retry_state);
998 static void start_internal_recv_trailing_metadata(grpc_call_element* elem);
999 static void on_complete(void* arg, grpc_error* error);
1000 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored);
1001 static void start_pick_locked(void* arg, grpc_error* ignored);
1002 static void maybe_intercept_recv_trailing_metadata_for_channelz(
1003 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
1004
1005 //
1006 // send op data caching
1007 //
1008
1009 // Caches data for send ops so that it can be retried later, if not
1010 // already cached.
maybe_cache_send_ops_for_batch(call_data * calld,pending_batch * pending)1011 static void maybe_cache_send_ops_for_batch(call_data* calld,
1012 pending_batch* pending) {
1013 if (pending->send_ops_cached) return;
1014 pending->send_ops_cached = true;
1015 grpc_transport_stream_op_batch* batch = pending->batch;
1016 // Save a copy of metadata for send_initial_metadata ops.
1017 if (batch->send_initial_metadata) {
1018 calld->seen_send_initial_metadata = true;
1019 GPR_ASSERT(calld->send_initial_metadata_storage == nullptr);
1020 grpc_metadata_batch* send_initial_metadata =
1021 batch->payload->send_initial_metadata.send_initial_metadata;
1022 calld->send_initial_metadata_storage = (grpc_linked_mdelem*)gpr_arena_alloc(
1023 calld->arena,
1024 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count);
1025 grpc_metadata_batch_copy(send_initial_metadata,
1026 &calld->send_initial_metadata,
1027 calld->send_initial_metadata_storage);
1028 calld->send_initial_metadata_flags =
1029 batch->payload->send_initial_metadata.send_initial_metadata_flags;
1030 calld->peer_string = batch->payload->send_initial_metadata.peer_string;
1031 }
1032 // Set up cache for send_message ops.
1033 if (batch->send_message) {
1034 grpc_core::ByteStreamCache* cache =
1035 static_cast<grpc_core::ByteStreamCache*>(
1036 gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache)));
1037 new (cache) grpc_core::ByteStreamCache(
1038 std::move(batch->payload->send_message.send_message));
1039 calld->send_messages->push_back(cache);
1040 }
1041 // Save metadata batch for send_trailing_metadata ops.
1042 if (batch->send_trailing_metadata) {
1043 calld->seen_send_trailing_metadata = true;
1044 GPR_ASSERT(calld->send_trailing_metadata_storage == nullptr);
1045 grpc_metadata_batch* send_trailing_metadata =
1046 batch->payload->send_trailing_metadata.send_trailing_metadata;
1047 calld->send_trailing_metadata_storage =
1048 (grpc_linked_mdelem*)gpr_arena_alloc(
1049 calld->arena,
1050 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count);
1051 grpc_metadata_batch_copy(send_trailing_metadata,
1052 &calld->send_trailing_metadata,
1053 calld->send_trailing_metadata_storage);
1054 }
1055 }
1056
1057 // Frees cached send_initial_metadata.
free_cached_send_initial_metadata(channel_data * chand,call_data * calld)1058 static void free_cached_send_initial_metadata(channel_data* chand,
1059 call_data* calld) {
1060 if (grpc_client_channel_trace.enabled()) {
1061 gpr_log(GPR_INFO,
1062 "chand=%p calld=%p: destroying calld->send_initial_metadata", chand,
1063 calld);
1064 }
1065 grpc_metadata_batch_destroy(&calld->send_initial_metadata);
1066 }
1067
1068 // Frees cached send_message at index idx.
free_cached_send_message(channel_data * chand,call_data * calld,size_t idx)1069 static void free_cached_send_message(channel_data* chand, call_data* calld,
1070 size_t idx) {
1071 if (grpc_client_channel_trace.enabled()) {
1072 gpr_log(GPR_INFO,
1073 "chand=%p calld=%p: destroying calld->send_messages[%" PRIuPTR "]",
1074 chand, calld, idx);
1075 }
1076 (*calld->send_messages)[idx]->Destroy();
1077 }
1078
1079 // Frees cached send_trailing_metadata.
free_cached_send_trailing_metadata(channel_data * chand,call_data * calld)1080 static void free_cached_send_trailing_metadata(channel_data* chand,
1081 call_data* calld) {
1082 if (grpc_client_channel_trace.enabled()) {
1083 gpr_log(GPR_INFO,
1084 "chand=%p calld=%p: destroying calld->send_trailing_metadata",
1085 chand, calld);
1086 }
1087 grpc_metadata_batch_destroy(&calld->send_trailing_metadata);
1088 }
1089
1090 // Frees cached send ops that have already been completed after
1091 // committing the call.
free_cached_send_op_data_after_commit(grpc_call_element * elem,subchannel_call_retry_state * retry_state)1092 static void free_cached_send_op_data_after_commit(
1093 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
1094 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1095 call_data* calld = static_cast<call_data*>(elem->call_data);
1096 if (retry_state->completed_send_initial_metadata) {
1097 free_cached_send_initial_metadata(chand, calld);
1098 }
1099 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
1100 free_cached_send_message(chand, calld, i);
1101 }
1102 if (retry_state->completed_send_trailing_metadata) {
1103 free_cached_send_trailing_metadata(chand, calld);
1104 }
1105 }
1106
1107 // Frees cached send ops that were completed by the completed batch in
1108 // batch_data. Used when batches are completed after the call is committed.
free_cached_send_op_data_for_completed_batch(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state)1109 static void free_cached_send_op_data_for_completed_batch(
1110 grpc_call_element* elem, subchannel_batch_data* batch_data,
1111 subchannel_call_retry_state* retry_state) {
1112 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1113 call_data* calld = static_cast<call_data*>(elem->call_data);
1114 if (batch_data->batch.send_initial_metadata) {
1115 free_cached_send_initial_metadata(chand, calld);
1116 }
1117 if (batch_data->batch.send_message) {
1118 free_cached_send_message(chand, calld,
1119 retry_state->completed_send_message_count - 1);
1120 }
1121 if (batch_data->batch.send_trailing_metadata) {
1122 free_cached_send_trailing_metadata(chand, calld);
1123 }
1124 }
1125
1126 //
1127 // pending_batches management
1128 //
1129
1130 // Returns the index into calld->pending_batches to be used for batch.
get_batch_index(grpc_transport_stream_op_batch * batch)1131 static size_t get_batch_index(grpc_transport_stream_op_batch* batch) {
1132 // Note: It is important the send_initial_metadata be the first entry
1133 // here, since the code in pick_subchannel_locked() assumes it will be.
1134 if (batch->send_initial_metadata) return 0;
1135 if (batch->send_message) return 1;
1136 if (batch->send_trailing_metadata) return 2;
1137 if (batch->recv_initial_metadata) return 3;
1138 if (batch->recv_message) return 4;
1139 if (batch->recv_trailing_metadata) return 5;
1140 GPR_UNREACHABLE_CODE(return (size_t)-1);
1141 }
1142
1143 // This is called via the call combiner, so access to calld is synchronized.
pending_batches_add(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1144 static void pending_batches_add(grpc_call_element* elem,
1145 grpc_transport_stream_op_batch* batch) {
1146 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1147 call_data* calld = static_cast<call_data*>(elem->call_data);
1148 const size_t idx = get_batch_index(batch);
1149 if (grpc_client_channel_trace.enabled()) {
1150 gpr_log(GPR_INFO,
1151 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
1152 calld, idx);
1153 }
1154 pending_batch* pending = &calld->pending_batches[idx];
1155 GPR_ASSERT(pending->batch == nullptr);
1156 pending->batch = batch;
1157 pending->send_ops_cached = false;
1158 if (calld->enable_retries) {
1159 // Update state in calld about pending batches.
1160 // Also check if the batch takes us over the retry buffer limit.
1161 // Note: We don't check the size of trailing metadata here, because
1162 // gRPC clients do not send trailing metadata.
1163 if (batch->send_initial_metadata) {
1164 calld->pending_send_initial_metadata = true;
1165 calld->bytes_buffered_for_retry += grpc_metadata_batch_size(
1166 batch->payload->send_initial_metadata.send_initial_metadata);
1167 }
1168 if (batch->send_message) {
1169 calld->pending_send_message = true;
1170 calld->bytes_buffered_for_retry +=
1171 batch->payload->send_message.send_message->length();
1172 }
1173 if (batch->send_trailing_metadata) {
1174 calld->pending_send_trailing_metadata = true;
1175 }
1176 if (GPR_UNLIKELY(calld->bytes_buffered_for_retry >
1177 chand->per_rpc_retry_buffer_size)) {
1178 if (grpc_client_channel_trace.enabled()) {
1179 gpr_log(GPR_INFO,
1180 "chand=%p calld=%p: exceeded retry buffer size, committing",
1181 chand, calld);
1182 }
1183 subchannel_call_retry_state* retry_state =
1184 calld->subchannel_call == nullptr
1185 ? nullptr
1186 : static_cast<subchannel_call_retry_state*>(
1187 grpc_connected_subchannel_call_get_parent_data(
1188 calld->subchannel_call));
1189 retry_commit(elem, retry_state);
1190 // If we are not going to retry and have not yet started, pretend
1191 // retries are disabled so that we don't bother with retry overhead.
1192 if (calld->num_attempts_completed == 0) {
1193 if (grpc_client_channel_trace.enabled()) {
1194 gpr_log(GPR_INFO,
1195 "chand=%p calld=%p: disabling retries before first attempt",
1196 chand, calld);
1197 }
1198 calld->enable_retries = false;
1199 }
1200 }
1201 }
1202 }
1203
pending_batch_clear(call_data * calld,pending_batch * pending)1204 static void pending_batch_clear(call_data* calld, pending_batch* pending) {
1205 if (calld->enable_retries) {
1206 if (pending->batch->send_initial_metadata) {
1207 calld->pending_send_initial_metadata = false;
1208 }
1209 if (pending->batch->send_message) {
1210 calld->pending_send_message = false;
1211 }
1212 if (pending->batch->send_trailing_metadata) {
1213 calld->pending_send_trailing_metadata = false;
1214 }
1215 }
1216 pending->batch = nullptr;
1217 }
1218
1219 // This is called via the call combiner, so access to calld is synchronized.
fail_pending_batch_in_call_combiner(void * arg,grpc_error * error)1220 static void fail_pending_batch_in_call_combiner(void* arg, grpc_error* error) {
1221 grpc_transport_stream_op_batch* batch =
1222 static_cast<grpc_transport_stream_op_batch*>(arg);
1223 call_data* calld = static_cast<call_data*>(batch->handler_private.extra_arg);
1224 // Note: This will release the call combiner.
1225 grpc_transport_stream_op_batch_finish_with_failure(
1226 batch, GRPC_ERROR_REF(error), calld->call_combiner);
1227 }
1228
1229 // This is called via the call combiner, so access to calld is synchronized.
1230 // If yield_call_combiner is true, assumes responsibility for yielding
1231 // the call combiner.
pending_batches_fail(grpc_call_element * elem,grpc_error * error,bool yield_call_combiner)1232 static void pending_batches_fail(grpc_call_element* elem, grpc_error* error,
1233 bool yield_call_combiner) {
1234 GPR_ASSERT(error != GRPC_ERROR_NONE);
1235 call_data* calld = static_cast<call_data*>(elem->call_data);
1236 if (grpc_client_channel_trace.enabled()) {
1237 size_t num_batches = 0;
1238 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1239 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1240 }
1241 gpr_log(GPR_INFO,
1242 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
1243 elem->channel_data, calld, num_batches, grpc_error_string(error));
1244 }
1245 grpc_core::CallCombinerClosureList closures;
1246 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1247 pending_batch* pending = &calld->pending_batches[i];
1248 grpc_transport_stream_op_batch* batch = pending->batch;
1249 if (batch != nullptr) {
1250 batch->handler_private.extra_arg = calld;
1251 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1252 fail_pending_batch_in_call_combiner, batch,
1253 grpc_schedule_on_exec_ctx);
1254 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
1255 "pending_batches_fail");
1256 pending_batch_clear(calld, pending);
1257 }
1258 }
1259 if (yield_call_combiner) {
1260 closures.RunClosures(calld->call_combiner);
1261 } else {
1262 closures.RunClosuresWithoutYielding(calld->call_combiner);
1263 }
1264 GRPC_ERROR_UNREF(error);
1265 }
1266
1267 // This is called via the call combiner, so access to calld is synchronized.
resume_pending_batch_in_call_combiner(void * arg,grpc_error * ignored)1268 static void resume_pending_batch_in_call_combiner(void* arg,
1269 grpc_error* ignored) {
1270 grpc_transport_stream_op_batch* batch =
1271 static_cast<grpc_transport_stream_op_batch*>(arg);
1272 grpc_subchannel_call* subchannel_call =
1273 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
1274 // Note: This will release the call combiner.
1275 grpc_subchannel_call_process_op(subchannel_call, batch);
1276 }
1277
1278 // This is called via the call combiner, so access to calld is synchronized.
pending_batches_resume(grpc_call_element * elem)1279 static void pending_batches_resume(grpc_call_element* elem) {
1280 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1281 call_data* calld = static_cast<call_data*>(elem->call_data);
1282 if (calld->enable_retries) {
1283 start_retriable_subchannel_batches(elem, GRPC_ERROR_NONE);
1284 return;
1285 }
1286 // Retries not enabled; send down batches as-is.
1287 if (grpc_client_channel_trace.enabled()) {
1288 size_t num_batches = 0;
1289 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1290 if (calld->pending_batches[i].batch != nullptr) ++num_batches;
1291 }
1292 gpr_log(GPR_INFO,
1293 "chand=%p calld=%p: starting %" PRIuPTR
1294 " pending batches on subchannel_call=%p",
1295 chand, calld, num_batches, calld->subchannel_call);
1296 }
1297 grpc_core::CallCombinerClosureList closures;
1298 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1299 pending_batch* pending = &calld->pending_batches[i];
1300 grpc_transport_stream_op_batch* batch = pending->batch;
1301 if (batch != nullptr) {
1302 maybe_intercept_recv_trailing_metadata_for_channelz(elem, batch);
1303 batch->handler_private.extra_arg = calld->subchannel_call;
1304 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
1305 resume_pending_batch_in_call_combiner, batch,
1306 grpc_schedule_on_exec_ctx);
1307 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1308 "pending_batches_resume");
1309 pending_batch_clear(calld, pending);
1310 }
1311 }
1312 // Note: This will release the call combiner.
1313 closures.RunClosures(calld->call_combiner);
1314 }
1315
maybe_clear_pending_batch(grpc_call_element * elem,pending_batch * pending)1316 static void maybe_clear_pending_batch(grpc_call_element* elem,
1317 pending_batch* pending) {
1318 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1319 call_data* calld = static_cast<call_data*>(elem->call_data);
1320 grpc_transport_stream_op_batch* batch = pending->batch;
1321 // We clear the pending batch if all of its callbacks have been
1322 // scheduled and reset to nullptr.
1323 if (batch->on_complete == nullptr &&
1324 (!batch->recv_initial_metadata ||
1325 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
1326 nullptr) &&
1327 (!batch->recv_message ||
1328 batch->payload->recv_message.recv_message_ready == nullptr) &&
1329 (!batch->recv_trailing_metadata ||
1330 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
1331 nullptr)) {
1332 if (grpc_client_channel_trace.enabled()) {
1333 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand,
1334 calld);
1335 }
1336 pending_batch_clear(calld, pending);
1337 }
1338 }
1339
1340 // Returns a pointer to the first pending batch for which predicate(batch)
1341 // returns true, or null if not found.
1342 template <typename Predicate>
pending_batch_find(grpc_call_element * elem,const char * log_message,Predicate predicate)1343 static pending_batch* pending_batch_find(grpc_call_element* elem,
1344 const char* log_message,
1345 Predicate predicate) {
1346 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1347 call_data* calld = static_cast<call_data*>(elem->call_data);
1348 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1349 pending_batch* pending = &calld->pending_batches[i];
1350 grpc_transport_stream_op_batch* batch = pending->batch;
1351 if (batch != nullptr && predicate(batch)) {
1352 if (grpc_client_channel_trace.enabled()) {
1353 gpr_log(GPR_INFO,
1354 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR, chand,
1355 calld, log_message, i);
1356 }
1357 return pending;
1358 }
1359 }
1360 return nullptr;
1361 }
1362
1363 //
1364 // retry code
1365 //
1366
1367 // Commits the call so that no further retry attempts will be performed.
retry_commit(grpc_call_element * elem,subchannel_call_retry_state * retry_state)1368 static void retry_commit(grpc_call_element* elem,
1369 subchannel_call_retry_state* retry_state) {
1370 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1371 call_data* calld = static_cast<call_data*>(elem->call_data);
1372 if (calld->retry_committed) return;
1373 calld->retry_committed = true;
1374 if (grpc_client_channel_trace.enabled()) {
1375 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand, calld);
1376 }
1377 if (retry_state != nullptr) {
1378 free_cached_send_op_data_after_commit(elem, retry_state);
1379 }
1380 }
1381
1382 // Starts a retry after appropriate back-off.
do_retry(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_millis server_pushback_ms)1383 static void do_retry(grpc_call_element* elem,
1384 subchannel_call_retry_state* retry_state,
1385 grpc_millis server_pushback_ms) {
1386 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1387 call_data* calld = static_cast<call_data*>(elem->call_data);
1388 GPR_ASSERT(calld->method_params != nullptr);
1389 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1390 calld->method_params->retry_policy();
1391 GPR_ASSERT(retry_policy != nullptr);
1392 // Reset subchannel call and connected subchannel.
1393 if (calld->subchannel_call != nullptr) {
1394 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
1395 "client_channel_call_retry");
1396 calld->subchannel_call = nullptr;
1397 }
1398 if (calld->pick.connected_subchannel != nullptr) {
1399 calld->pick.connected_subchannel.reset();
1400 }
1401 // Compute backoff delay.
1402 grpc_millis next_attempt_time;
1403 if (server_pushback_ms >= 0) {
1404 next_attempt_time = grpc_core::ExecCtx::Get()->Now() + server_pushback_ms;
1405 calld->last_attempt_got_server_pushback = true;
1406 } else {
1407 if (calld->num_attempts_completed == 1 ||
1408 calld->last_attempt_got_server_pushback) {
1409 calld->retry_backoff.Init(
1410 grpc_core::BackOff::Options()
1411 .set_initial_backoff(retry_policy->initial_backoff)
1412 .set_multiplier(retry_policy->backoff_multiplier)
1413 .set_jitter(RETRY_BACKOFF_JITTER)
1414 .set_max_backoff(retry_policy->max_backoff));
1415 calld->last_attempt_got_server_pushback = false;
1416 }
1417 next_attempt_time = calld->retry_backoff->NextAttemptTime();
1418 }
1419 if (grpc_client_channel_trace.enabled()) {
1420 gpr_log(GPR_INFO,
1421 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand,
1422 calld, next_attempt_time - grpc_core::ExecCtx::Get()->Now());
1423 }
1424 // Schedule retry after computed delay.
1425 GRPC_CLOSURE_INIT(&calld->pick_closure, start_pick_locked, elem,
1426 grpc_combiner_scheduler(chand->combiner));
1427 grpc_timer_init(&calld->retry_timer, next_attempt_time, &calld->pick_closure);
1428 // Update bookkeeping.
1429 if (retry_state != nullptr) retry_state->retry_dispatched = true;
1430 }
1431
1432 // Returns true if the call is being retried.
maybe_retry(grpc_call_element * elem,subchannel_batch_data * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)1433 static bool maybe_retry(grpc_call_element* elem,
1434 subchannel_batch_data* batch_data,
1435 grpc_status_code status,
1436 grpc_mdelem* server_pushback_md) {
1437 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1438 call_data* calld = static_cast<call_data*>(elem->call_data);
1439 // Get retry policy.
1440 if (calld->method_params == nullptr) return false;
1441 const ClientChannelMethodParams::RetryPolicy* retry_policy =
1442 calld->method_params->retry_policy();
1443 if (retry_policy == nullptr) return false;
1444 // If we've already dispatched a retry from this call, return true.
1445 // This catches the case where the batch has multiple callbacks
1446 // (i.e., it includes either recv_message or recv_initial_metadata).
1447 subchannel_call_retry_state* retry_state = nullptr;
1448 if (batch_data != nullptr) {
1449 retry_state = static_cast<subchannel_call_retry_state*>(
1450 grpc_connected_subchannel_call_get_parent_data(
1451 batch_data->subchannel_call));
1452 if (retry_state->retry_dispatched) {
1453 if (grpc_client_channel_trace.enabled()) {
1454 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched", chand,
1455 calld);
1456 }
1457 return true;
1458 }
1459 }
1460 // Check status.
1461 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
1462 if (calld->retry_throttle_data != nullptr) {
1463 calld->retry_throttle_data->RecordSuccess();
1464 }
1465 if (grpc_client_channel_trace.enabled()) {
1466 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", chand, calld);
1467 }
1468 return false;
1469 }
1470 // Status is not OK. Check whether the status is retryable.
1471 if (!retry_policy->retryable_status_codes.Contains(status)) {
1472 if (grpc_client_channel_trace.enabled()) {
1473 gpr_log(GPR_INFO,
1474 "chand=%p calld=%p: status %s not configured as retryable", chand,
1475 calld, grpc_status_code_to_string(status));
1476 }
1477 return false;
1478 }
1479 // Record the failure and check whether retries are throttled.
1480 // Note that it's important for this check to come after the status
1481 // code check above, since we should only record failures whose statuses
1482 // match the configured retryable status codes, so that we don't count
1483 // things like failures due to malformed requests (INVALID_ARGUMENT).
1484 // Conversely, it's important for this to come before the remaining
1485 // checks, so that we don't fail to record failures due to other factors.
1486 if (calld->retry_throttle_data != nullptr &&
1487 !calld->retry_throttle_data->RecordFailure()) {
1488 if (grpc_client_channel_trace.enabled()) {
1489 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", chand, calld);
1490 }
1491 return false;
1492 }
1493 // Check whether the call is committed.
1494 if (calld->retry_committed) {
1495 if (grpc_client_channel_trace.enabled()) {
1496 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed", chand,
1497 calld);
1498 }
1499 return false;
1500 }
1501 // Check whether we have retries remaining.
1502 ++calld->num_attempts_completed;
1503 if (calld->num_attempts_completed >= retry_policy->max_attempts) {
1504 if (grpc_client_channel_trace.enabled()) {
1505 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts", chand,
1506 calld, retry_policy->max_attempts);
1507 }
1508 return false;
1509 }
1510 // If the call was cancelled from the surface, don't retry.
1511 if (calld->cancel_error != GRPC_ERROR_NONE) {
1512 if (grpc_client_channel_trace.enabled()) {
1513 gpr_log(GPR_INFO,
1514 "chand=%p calld=%p: call cancelled from surface, not retrying",
1515 chand, calld);
1516 }
1517 return false;
1518 }
1519 // Check server push-back.
1520 grpc_millis server_pushback_ms = -1;
1521 if (server_pushback_md != nullptr) {
1522 // If the value is "-1" or any other unparseable string, we do not retry.
1523 uint32_t ms;
1524 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1525 if (grpc_client_channel_trace.enabled()) {
1526 gpr_log(GPR_INFO,
1527 "chand=%p calld=%p: not retrying due to server push-back",
1528 chand, calld);
1529 }
1530 return false;
1531 } else {
1532 if (grpc_client_channel_trace.enabled()) {
1533 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1534 chand, calld, ms);
1535 }
1536 server_pushback_ms = (grpc_millis)ms;
1537 }
1538 }
1539 do_retry(elem, retry_state, server_pushback_ms);
1540 return true;
1541 }
1542
1543 //
1544 // subchannel_batch_data
1545 //
1546
1547 // Creates a subchannel_batch_data object on the call's arena with the
1548 // specified refcount. If set_on_complete is true, the batch's
1549 // on_complete callback will be set to point to on_complete();
1550 // otherwise, the batch's on_complete callback will be null.
batch_data_create(grpc_call_element * elem,int refcount,bool set_on_complete)1551 static subchannel_batch_data* batch_data_create(grpc_call_element* elem,
1552 int refcount,
1553 bool set_on_complete) {
1554 call_data* calld = static_cast<call_data*>(elem->call_data);
1555 subchannel_call_retry_state* retry_state =
1556 static_cast<subchannel_call_retry_state*>(
1557 grpc_connected_subchannel_call_get_parent_data(
1558 calld->subchannel_call));
1559 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(
1560 gpr_arena_alloc(calld->arena, sizeof(*batch_data)));
1561 batch_data->elem = elem;
1562 batch_data->subchannel_call =
1563 GRPC_SUBCHANNEL_CALL_REF(calld->subchannel_call, "batch_data_create");
1564 batch_data->batch.payload = &retry_state->batch_payload;
1565 gpr_ref_init(&batch_data->refs, refcount);
1566 if (set_on_complete) {
1567 GRPC_CLOSURE_INIT(&batch_data->on_complete, on_complete, batch_data,
1568 grpc_schedule_on_exec_ctx);
1569 batch_data->batch.on_complete = &batch_data->on_complete;
1570 }
1571 GRPC_CALL_STACK_REF(calld->owning_call, "batch_data");
1572 return batch_data;
1573 }
1574
batch_data_unref(subchannel_batch_data * batch_data)1575 static void batch_data_unref(subchannel_batch_data* batch_data) {
1576 if (gpr_unref(&batch_data->refs)) {
1577 subchannel_call_retry_state* retry_state =
1578 static_cast<subchannel_call_retry_state*>(
1579 grpc_connected_subchannel_call_get_parent_data(
1580 batch_data->subchannel_call));
1581 if (batch_data->batch.send_initial_metadata) {
1582 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
1583 }
1584 if (batch_data->batch.send_trailing_metadata) {
1585 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
1586 }
1587 if (batch_data->batch.recv_initial_metadata) {
1588 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
1589 }
1590 if (batch_data->batch.recv_trailing_metadata) {
1591 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
1592 }
1593 GRPC_SUBCHANNEL_CALL_UNREF(batch_data->subchannel_call, "batch_data_unref");
1594 call_data* calld = static_cast<call_data*>(batch_data->elem->call_data);
1595 GRPC_CALL_STACK_UNREF(calld->owning_call, "batch_data");
1596 }
1597 }
1598
1599 //
1600 // recv_initial_metadata callback handling
1601 //
1602
1603 // Invokes recv_initial_metadata_ready for a subchannel batch.
invoke_recv_initial_metadata_callback(void * arg,grpc_error * error)1604 static void invoke_recv_initial_metadata_callback(void* arg,
1605 grpc_error* error) {
1606 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1607 // Find pending batch.
1608 pending_batch* pending = pending_batch_find(
1609 batch_data->elem, "invoking recv_initial_metadata_ready for",
1610 [](grpc_transport_stream_op_batch* batch) {
1611 return batch->recv_initial_metadata &&
1612 batch->payload->recv_initial_metadata
1613 .recv_initial_metadata_ready != nullptr;
1614 });
1615 GPR_ASSERT(pending != nullptr);
1616 // Return metadata.
1617 subchannel_call_retry_state* retry_state =
1618 static_cast<subchannel_call_retry_state*>(
1619 grpc_connected_subchannel_call_get_parent_data(
1620 batch_data->subchannel_call));
1621 grpc_metadata_batch_move(
1622 &retry_state->recv_initial_metadata,
1623 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1624 // Update bookkeeping.
1625 // Note: Need to do this before invoking the callback, since invoking
1626 // the callback will result in yielding the call combiner.
1627 grpc_closure* recv_initial_metadata_ready =
1628 pending->batch->payload->recv_initial_metadata
1629 .recv_initial_metadata_ready;
1630 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1631 nullptr;
1632 maybe_clear_pending_batch(batch_data->elem, pending);
1633 batch_data_unref(batch_data);
1634 // Invoke callback.
1635 GRPC_CLOSURE_RUN(recv_initial_metadata_ready, GRPC_ERROR_REF(error));
1636 }
1637
1638 // Intercepts recv_initial_metadata_ready callback for retries.
1639 // Commits the call and returns the initial metadata up the stack.
recv_initial_metadata_ready(void * arg,grpc_error * error)1640 static void recv_initial_metadata_ready(void* arg, grpc_error* error) {
1641 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1642 grpc_call_element* elem = batch_data->elem;
1643 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1644 call_data* calld = static_cast<call_data*>(elem->call_data);
1645 if (grpc_client_channel_trace.enabled()) {
1646 gpr_log(GPR_INFO,
1647 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1648 chand, calld, grpc_error_string(error));
1649 }
1650 subchannel_call_retry_state* retry_state =
1651 static_cast<subchannel_call_retry_state*>(
1652 grpc_connected_subchannel_call_get_parent_data(
1653 batch_data->subchannel_call));
1654 retry_state->completed_recv_initial_metadata = true;
1655 // If a retry was already dispatched, then we're not going to use the
1656 // result of this recv_initial_metadata op, so do nothing.
1657 if (retry_state->retry_dispatched) {
1658 GRPC_CALL_COMBINER_STOP(
1659 calld->call_combiner,
1660 "recv_initial_metadata_ready after retry dispatched");
1661 return;
1662 }
1663 // If we got an error or a Trailers-Only response and have not yet gotten
1664 // the recv_trailing_metadata_ready callback, then defer propagating this
1665 // callback back to the surface. We can evaluate whether to retry when
1666 // recv_trailing_metadata comes back.
1667 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
1668 error != GRPC_ERROR_NONE) &&
1669 !retry_state->completed_recv_trailing_metadata)) {
1670 if (grpc_client_channel_trace.enabled()) {
1671 gpr_log(GPR_INFO,
1672 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1673 "(Trailers-Only)",
1674 chand, calld);
1675 }
1676 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
1677 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
1678 if (!retry_state->started_recv_trailing_metadata) {
1679 // recv_trailing_metadata not yet started by application; start it
1680 // ourselves to get status.
1681 start_internal_recv_trailing_metadata(elem);
1682 } else {
1683 GRPC_CALL_COMBINER_STOP(
1684 calld->call_combiner,
1685 "recv_initial_metadata_ready trailers-only or error");
1686 }
1687 return;
1688 }
1689 // Received valid initial metadata, so commit the call.
1690 retry_commit(elem, retry_state);
1691 // Invoke the callback to return the result to the surface.
1692 // Manually invoking a callback function; it does not take ownership of error.
1693 invoke_recv_initial_metadata_callback(batch_data, error);
1694 }
1695
1696 //
1697 // recv_message callback handling
1698 //
1699
1700 // Invokes recv_message_ready for a subchannel batch.
invoke_recv_message_callback(void * arg,grpc_error * error)1701 static void invoke_recv_message_callback(void* arg, grpc_error* error) {
1702 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1703 // Find pending op.
1704 pending_batch* pending = pending_batch_find(
1705 batch_data->elem, "invoking recv_message_ready for",
1706 [](grpc_transport_stream_op_batch* batch) {
1707 return batch->recv_message &&
1708 batch->payload->recv_message.recv_message_ready != nullptr;
1709 });
1710 GPR_ASSERT(pending != nullptr);
1711 // Return payload.
1712 subchannel_call_retry_state* retry_state =
1713 static_cast<subchannel_call_retry_state*>(
1714 grpc_connected_subchannel_call_get_parent_data(
1715 batch_data->subchannel_call));
1716 *pending->batch->payload->recv_message.recv_message =
1717 std::move(retry_state->recv_message);
1718 // Update bookkeeping.
1719 // Note: Need to do this before invoking the callback, since invoking
1720 // the callback will result in yielding the call combiner.
1721 grpc_closure* recv_message_ready =
1722 pending->batch->payload->recv_message.recv_message_ready;
1723 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1724 maybe_clear_pending_batch(batch_data->elem, pending);
1725 batch_data_unref(batch_data);
1726 // Invoke callback.
1727 GRPC_CLOSURE_RUN(recv_message_ready, GRPC_ERROR_REF(error));
1728 }
1729
1730 // Intercepts recv_message_ready callback for retries.
1731 // Commits the call and returns the message up the stack.
recv_message_ready(void * arg,grpc_error * error)1732 static void recv_message_ready(void* arg, grpc_error* error) {
1733 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1734 grpc_call_element* elem = batch_data->elem;
1735 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1736 call_data* calld = static_cast<call_data*>(elem->call_data);
1737 if (grpc_client_channel_trace.enabled()) {
1738 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1739 chand, calld, grpc_error_string(error));
1740 }
1741 subchannel_call_retry_state* retry_state =
1742 static_cast<subchannel_call_retry_state*>(
1743 grpc_connected_subchannel_call_get_parent_data(
1744 batch_data->subchannel_call));
1745 ++retry_state->completed_recv_message_count;
1746 // If a retry was already dispatched, then we're not going to use the
1747 // result of this recv_message op, so do nothing.
1748 if (retry_state->retry_dispatched) {
1749 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
1750 "recv_message_ready after retry dispatched");
1751 return;
1752 }
1753 // If we got an error or the payload was nullptr and we have not yet gotten
1754 // the recv_trailing_metadata_ready callback, then defer propagating this
1755 // callback back to the surface. We can evaluate whether to retry when
1756 // recv_trailing_metadata comes back.
1757 if (GPR_UNLIKELY(
1758 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
1759 !retry_state->completed_recv_trailing_metadata)) {
1760 if (grpc_client_channel_trace.enabled()) {
1761 gpr_log(GPR_INFO,
1762 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1763 "message and recv_trailing_metadata pending)",
1764 chand, calld);
1765 }
1766 retry_state->recv_message_ready_deferred_batch = batch_data;
1767 retry_state->recv_message_error = GRPC_ERROR_REF(error);
1768 if (!retry_state->started_recv_trailing_metadata) {
1769 // recv_trailing_metadata not yet started by application; start it
1770 // ourselves to get status.
1771 start_internal_recv_trailing_metadata(elem);
1772 } else {
1773 GRPC_CALL_COMBINER_STOP(calld->call_combiner, "recv_message_ready null");
1774 }
1775 return;
1776 }
1777 // Received a valid message, so commit the call.
1778 retry_commit(elem, retry_state);
1779 // Invoke the callback to return the result to the surface.
1780 // Manually invoking a callback function; it does not take ownership of error.
1781 invoke_recv_message_callback(batch_data, error);
1782 }
1783
1784 //
1785 // recv_trailing_metadata handling
1786 //
1787
1788 // Sets *status and *server_pushback_md based on md_batch and error.
1789 // Only sets *server_pushback_md if server_pushback_md != nullptr.
get_call_status(grpc_call_element * elem,grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)1790 static void get_call_status(grpc_call_element* elem,
1791 grpc_metadata_batch* md_batch, grpc_error* error,
1792 grpc_status_code* status,
1793 grpc_mdelem** server_pushback_md) {
1794 call_data* calld = static_cast<call_data*>(elem->call_data);
1795 if (error != GRPC_ERROR_NONE) {
1796 grpc_error_get_status(error, calld->deadline, status, nullptr, nullptr,
1797 nullptr);
1798 } else {
1799 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1800 *status =
1801 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1802 if (server_pushback_md != nullptr &&
1803 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1804 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1805 }
1806 }
1807 GRPC_ERROR_UNREF(error);
1808 }
1809
1810 // Adds recv_trailing_metadata_ready closure to closures.
add_closure_for_recv_trailing_metadata_ready(grpc_call_element * elem,subchannel_batch_data * batch_data,grpc_error * error,grpc_core::CallCombinerClosureList * closures)1811 static void add_closure_for_recv_trailing_metadata_ready(
1812 grpc_call_element* elem, subchannel_batch_data* batch_data,
1813 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1814 // Find pending batch.
1815 pending_batch* pending = pending_batch_find(
1816 elem, "invoking recv_trailing_metadata for",
1817 [](grpc_transport_stream_op_batch* batch) {
1818 return batch->recv_trailing_metadata &&
1819 batch->payload->recv_trailing_metadata
1820 .recv_trailing_metadata_ready != nullptr;
1821 });
1822 // If we generated the recv_trailing_metadata op internally via
1823 // start_internal_recv_trailing_metadata(), then there will be no
1824 // pending batch.
1825 if (pending == nullptr) {
1826 GRPC_ERROR_UNREF(error);
1827 return;
1828 }
1829 // Return metadata.
1830 subchannel_call_retry_state* retry_state =
1831 static_cast<subchannel_call_retry_state*>(
1832 grpc_connected_subchannel_call_get_parent_data(
1833 batch_data->subchannel_call));
1834 grpc_metadata_batch_move(
1835 &retry_state->recv_trailing_metadata,
1836 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1837 // Add closure.
1838 closures->Add(pending->batch->payload->recv_trailing_metadata
1839 .recv_trailing_metadata_ready,
1840 error, "recv_trailing_metadata_ready for pending batch");
1841 // Update bookkeeping.
1842 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1843 nullptr;
1844 maybe_clear_pending_batch(elem, pending);
1845 }
1846
1847 // Adds any necessary closures for deferred recv_initial_metadata and
1848 // recv_message callbacks to closures.
add_closures_for_deferred_recv_callbacks(subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)1849 static void add_closures_for_deferred_recv_callbacks(
1850 subchannel_batch_data* batch_data, subchannel_call_retry_state* retry_state,
1851 grpc_core::CallCombinerClosureList* closures) {
1852 if (batch_data->batch.recv_trailing_metadata) {
1853 // Add closure for deferred recv_initial_metadata_ready.
1854 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
1855 nullptr)) {
1856 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
1857 invoke_recv_initial_metadata_callback,
1858 retry_state->recv_initial_metadata_ready_deferred_batch,
1859 grpc_schedule_on_exec_ctx);
1860 closures->Add(&retry_state->recv_initial_metadata_ready,
1861 retry_state->recv_initial_metadata_error,
1862 "resuming recv_initial_metadata_ready");
1863 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
1864 }
1865 // Add closure for deferred recv_message_ready.
1866 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
1867 nullptr)) {
1868 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
1869 invoke_recv_message_callback,
1870 retry_state->recv_message_ready_deferred_batch,
1871 grpc_schedule_on_exec_ctx);
1872 closures->Add(&retry_state->recv_message_ready,
1873 retry_state->recv_message_error,
1874 "resuming recv_message_ready");
1875 retry_state->recv_message_ready_deferred_batch = nullptr;
1876 }
1877 }
1878 }
1879
1880 // Returns true if any op in the batch was not yet started.
1881 // Only looks at send ops, since recv ops are always started immediately.
pending_batch_is_unstarted(pending_batch * pending,call_data * calld,subchannel_call_retry_state * retry_state)1882 static bool pending_batch_is_unstarted(
1883 pending_batch* pending, call_data* calld,
1884 subchannel_call_retry_state* retry_state) {
1885 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
1886 return false;
1887 }
1888 if (pending->batch->send_initial_metadata &&
1889 !retry_state->started_send_initial_metadata) {
1890 return true;
1891 }
1892 if (pending->batch->send_message &&
1893 retry_state->started_send_message_count < calld->send_messages->size()) {
1894 return true;
1895 }
1896 if (pending->batch->send_trailing_metadata &&
1897 !retry_state->started_send_trailing_metadata) {
1898 return true;
1899 }
1900 return false;
1901 }
1902
1903 // For any pending batch containing an op that has not yet been started,
1904 // adds the pending batch's completion closures to closures.
add_closures_to_fail_unstarted_pending_batches(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_error * error,grpc_core::CallCombinerClosureList * closures)1905 static void add_closures_to_fail_unstarted_pending_batches(
1906 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
1907 grpc_error* error, grpc_core::CallCombinerClosureList* closures) {
1908 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1909 call_data* calld = static_cast<call_data*>(elem->call_data);
1910 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
1911 pending_batch* pending = &calld->pending_batches[i];
1912 if (pending_batch_is_unstarted(pending, calld, retry_state)) {
1913 if (grpc_client_channel_trace.enabled()) {
1914 gpr_log(GPR_INFO,
1915 "chand=%p calld=%p: failing unstarted pending batch at index "
1916 "%" PRIuPTR,
1917 chand, calld, i);
1918 }
1919 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1920 "failing on_complete for pending batch");
1921 pending->batch->on_complete = nullptr;
1922 maybe_clear_pending_batch(elem, pending);
1923 }
1924 }
1925 GRPC_ERROR_UNREF(error);
1926 }
1927
1928 // Runs necessary closures upon completion of a call attempt.
run_closures_for_completed_call(subchannel_batch_data * batch_data,grpc_error * error)1929 static void run_closures_for_completed_call(subchannel_batch_data* batch_data,
1930 grpc_error* error) {
1931 grpc_call_element* elem = batch_data->elem;
1932 call_data* calld = static_cast<call_data*>(elem->call_data);
1933 subchannel_call_retry_state* retry_state =
1934 static_cast<subchannel_call_retry_state*>(
1935 grpc_connected_subchannel_call_get_parent_data(
1936 batch_data->subchannel_call));
1937 // Construct list of closures to execute.
1938 grpc_core::CallCombinerClosureList closures;
1939 // First, add closure for recv_trailing_metadata_ready.
1940 add_closure_for_recv_trailing_metadata_ready(
1941 elem, batch_data, GRPC_ERROR_REF(error), &closures);
1942 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1943 // callbacks, add them to closures.
1944 add_closures_for_deferred_recv_callbacks(batch_data, retry_state, &closures);
1945 // Add closures to fail any pending batches that have not yet been started.
1946 add_closures_to_fail_unstarted_pending_batches(
1947 elem, retry_state, GRPC_ERROR_REF(error), &closures);
1948 // Don't need batch_data anymore.
1949 batch_data_unref(batch_data);
1950 // Schedule all of the closures identified above.
1951 // Note: This will release the call combiner.
1952 closures.RunClosures(calld->call_combiner);
1953 GRPC_ERROR_UNREF(error);
1954 }
1955
1956 // Intercepts recv_trailing_metadata_ready callback for retries.
1957 // Commits the call and returns the trailing metadata up the stack.
recv_trailing_metadata_ready(void * arg,grpc_error * error)1958 static void recv_trailing_metadata_ready(void* arg, grpc_error* error) {
1959 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
1960 grpc_call_element* elem = batch_data->elem;
1961 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
1962 call_data* calld = static_cast<call_data*>(elem->call_data);
1963 if (grpc_client_channel_trace.enabled()) {
1964 gpr_log(GPR_INFO,
1965 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1966 chand, calld, grpc_error_string(error));
1967 }
1968 subchannel_call_retry_state* retry_state =
1969 static_cast<subchannel_call_retry_state*>(
1970 grpc_connected_subchannel_call_get_parent_data(
1971 batch_data->subchannel_call));
1972 retry_state->completed_recv_trailing_metadata = true;
1973 // Get the call's status and check for server pushback metadata.
1974 grpc_status_code status = GRPC_STATUS_OK;
1975 grpc_mdelem* server_pushback_md = nullptr;
1976 grpc_metadata_batch* md_batch =
1977 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
1978 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status,
1979 &server_pushback_md);
1980 grpc_core::channelz::SubchannelNode* channelz_subchannel =
1981 calld->pick.connected_subchannel->channelz_subchannel();
1982 if (channelz_subchannel != nullptr) {
1983 if (status == GRPC_STATUS_OK) {
1984 channelz_subchannel->RecordCallSucceeded();
1985 } else {
1986 channelz_subchannel->RecordCallFailed();
1987 }
1988 }
1989 if (grpc_client_channel_trace.enabled()) {
1990 gpr_log(GPR_INFO, "chand=%p calld=%p: call finished, status=%s", chand,
1991 calld, grpc_status_code_to_string(status));
1992 }
1993 // Check if we should retry.
1994 if (maybe_retry(elem, batch_data, status, server_pushback_md)) {
1995 // Unref batch_data for deferred recv_initial_metadata_ready or
1996 // recv_message_ready callbacks, if any.
1997 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
1998 batch_data_unref(batch_data);
1999 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
2000 }
2001 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
2002 batch_data_unref(batch_data);
2003 GRPC_ERROR_UNREF(retry_state->recv_message_error);
2004 }
2005 batch_data_unref(batch_data);
2006 return;
2007 }
2008 // Not retrying, so commit the call.
2009 retry_commit(elem, retry_state);
2010 // Run any necessary closures.
2011 run_closures_for_completed_call(batch_data, GRPC_ERROR_REF(error));
2012 }
2013
2014 //
2015 // on_complete callback handling
2016 //
2017
2018 // Adds the on_complete closure for the pending batch completed in
2019 // batch_data to closures.
add_closure_for_completed_pending_batch(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_error * error,grpc_core::CallCombinerClosureList * closures)2020 static void add_closure_for_completed_pending_batch(
2021 grpc_call_element* elem, subchannel_batch_data* batch_data,
2022 subchannel_call_retry_state* retry_state, grpc_error* error,
2023 grpc_core::CallCombinerClosureList* closures) {
2024 pending_batch* pending = pending_batch_find(
2025 elem, "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
2026 // Match the pending batch with the same set of send ops as the
2027 // subchannel batch we've just completed.
2028 return batch->on_complete != nullptr &&
2029 batch_data->batch.send_initial_metadata ==
2030 batch->send_initial_metadata &&
2031 batch_data->batch.send_message == batch->send_message &&
2032 batch_data->batch.send_trailing_metadata ==
2033 batch->send_trailing_metadata;
2034 });
2035 // If batch_data is a replay batch, then there will be no pending
2036 // batch to complete.
2037 if (pending == nullptr) {
2038 GRPC_ERROR_UNREF(error);
2039 return;
2040 }
2041 // Add closure.
2042 closures->Add(pending->batch->on_complete, error,
2043 "on_complete for pending batch");
2044 pending->batch->on_complete = nullptr;
2045 maybe_clear_pending_batch(elem, pending);
2046 }
2047
2048 // If there are any cached ops to replay or pending ops to start on the
2049 // subchannel call, adds a closure to closures to invoke
2050 // start_retriable_subchannel_batches().
add_closures_for_replay_or_pending_send_ops(grpc_call_element * elem,subchannel_batch_data * batch_data,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)2051 static void add_closures_for_replay_or_pending_send_ops(
2052 grpc_call_element* elem, subchannel_batch_data* batch_data,
2053 subchannel_call_retry_state* retry_state,
2054 grpc_core::CallCombinerClosureList* closures) {
2055 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2056 call_data* calld = static_cast<call_data*>(elem->call_data);
2057 bool have_pending_send_message_ops =
2058 retry_state->started_send_message_count < calld->send_messages->size();
2059 bool have_pending_send_trailing_metadata_op =
2060 calld->seen_send_trailing_metadata &&
2061 !retry_state->started_send_trailing_metadata;
2062 if (!have_pending_send_message_ops &&
2063 !have_pending_send_trailing_metadata_op) {
2064 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2065 pending_batch* pending = &calld->pending_batches[i];
2066 grpc_transport_stream_op_batch* batch = pending->batch;
2067 if (batch == nullptr || pending->send_ops_cached) continue;
2068 if (batch->send_message) have_pending_send_message_ops = true;
2069 if (batch->send_trailing_metadata) {
2070 have_pending_send_trailing_metadata_op = true;
2071 }
2072 }
2073 }
2074 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
2075 if (grpc_client_channel_trace.enabled()) {
2076 gpr_log(GPR_INFO,
2077 "chand=%p calld=%p: starting next batch for pending send op(s)",
2078 chand, calld);
2079 }
2080 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
2081 start_retriable_subchannel_batches, elem,
2082 grpc_schedule_on_exec_ctx);
2083 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
2084 "starting next batch for send_* op(s)");
2085 }
2086 }
2087
2088 // Callback used to intercept on_complete from subchannel calls.
2089 // Called only when retries are enabled.
on_complete(void * arg,grpc_error * error)2090 static void on_complete(void* arg, grpc_error* error) {
2091 subchannel_batch_data* batch_data = static_cast<subchannel_batch_data*>(arg);
2092 grpc_call_element* elem = batch_data->elem;
2093 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2094 call_data* calld = static_cast<call_data*>(elem->call_data);
2095 if (grpc_client_channel_trace.enabled()) {
2096 char* batch_str = grpc_transport_stream_op_batch_string(&batch_data->batch);
2097 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
2098 chand, calld, grpc_error_string(error), batch_str);
2099 gpr_free(batch_str);
2100 }
2101 subchannel_call_retry_state* retry_state =
2102 static_cast<subchannel_call_retry_state*>(
2103 grpc_connected_subchannel_call_get_parent_data(
2104 batch_data->subchannel_call));
2105 // Update bookkeeping in retry_state.
2106 if (batch_data->batch.send_initial_metadata) {
2107 retry_state->completed_send_initial_metadata = true;
2108 }
2109 if (batch_data->batch.send_message) {
2110 ++retry_state->completed_send_message_count;
2111 }
2112 if (batch_data->batch.send_trailing_metadata) {
2113 retry_state->completed_send_trailing_metadata = true;
2114 }
2115 // If the call is committed, free cached data for send ops that we've just
2116 // completed.
2117 if (calld->retry_committed) {
2118 free_cached_send_op_data_for_completed_batch(elem, batch_data, retry_state);
2119 }
2120 // Construct list of closures to execute.
2121 grpc_core::CallCombinerClosureList closures;
2122 // If a retry was already dispatched, that means we saw
2123 // recv_trailing_metadata before this, so we do nothing here.
2124 // Otherwise, invoke the callback to return the result to the surface.
2125 if (!retry_state->retry_dispatched) {
2126 // Add closure for the completed pending batch, if any.
2127 add_closure_for_completed_pending_batch(elem, batch_data, retry_state,
2128 GRPC_ERROR_REF(error), &closures);
2129 // If needed, add a callback to start any replay or pending send ops on
2130 // the subchannel call.
2131 if (!retry_state->completed_recv_trailing_metadata) {
2132 add_closures_for_replay_or_pending_send_ops(elem, batch_data, retry_state,
2133 &closures);
2134 }
2135 }
2136 // Track number of pending subchannel send batches and determine if this
2137 // was the last one.
2138 --calld->num_pending_retriable_subchannel_send_batches;
2139 const bool last_send_batch_complete =
2140 calld->num_pending_retriable_subchannel_send_batches == 0;
2141 // Don't need batch_data anymore.
2142 batch_data_unref(batch_data);
2143 // Schedule all of the closures identified above.
2144 // Note: This yeilds the call combiner.
2145 closures.RunClosures(calld->call_combiner);
2146 // If this was the last subchannel send batch, unref the call stack.
2147 if (last_send_batch_complete) {
2148 GRPC_CALL_STACK_UNREF(calld->owning_call, "subchannel_send_batches");
2149 }
2150 }
2151
2152 //
2153 // subchannel batch construction
2154 //
2155
2156 // Helper function used to start a subchannel batch in the call combiner.
start_batch_in_call_combiner(void * arg,grpc_error * ignored)2157 static void start_batch_in_call_combiner(void* arg, grpc_error* ignored) {
2158 grpc_transport_stream_op_batch* batch =
2159 static_cast<grpc_transport_stream_op_batch*>(arg);
2160 grpc_subchannel_call* subchannel_call =
2161 static_cast<grpc_subchannel_call*>(batch->handler_private.extra_arg);
2162 // Note: This will release the call combiner.
2163 grpc_subchannel_call_process_op(subchannel_call, batch);
2164 }
2165
2166 // Adds a closure to closures that will execute batch in the call combiner.
add_closure_for_subchannel_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch,grpc_core::CallCombinerClosureList * closures)2167 static void add_closure_for_subchannel_batch(
2168 grpc_call_element* elem, grpc_transport_stream_op_batch* batch,
2169 grpc_core::CallCombinerClosureList* closures) {
2170 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2171 call_data* calld = static_cast<call_data*>(elem->call_data);
2172 batch->handler_private.extra_arg = calld->subchannel_call;
2173 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2174 start_batch_in_call_combiner, batch,
2175 grpc_schedule_on_exec_ctx);
2176 if (grpc_client_channel_trace.enabled()) {
2177 char* batch_str = grpc_transport_stream_op_batch_string(batch);
2178 gpr_log(GPR_INFO, "chand=%p calld=%p: starting subchannel batch: %s", chand,
2179 calld, batch_str);
2180 gpr_free(batch_str);
2181 }
2182 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2183 "start_subchannel_batch");
2184 }
2185
2186 // Adds retriable send_initial_metadata op to batch_data.
add_retriable_send_initial_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2187 static void add_retriable_send_initial_metadata_op(
2188 call_data* calld, subchannel_call_retry_state* retry_state,
2189 subchannel_batch_data* batch_data) {
2190 // Maps the number of retries to the corresponding metadata value slice.
2191 static const grpc_slice* retry_count_strings[] = {
2192 &GRPC_MDSTR_1, &GRPC_MDSTR_2, &GRPC_MDSTR_3, &GRPC_MDSTR_4};
2193 // We need to make a copy of the metadata batch for each attempt, since
2194 // the filters in the subchannel stack may modify this batch, and we don't
2195 // want those modifications to be passed forward to subsequent attempts.
2196 //
2197 // If we've already completed one or more attempts, add the
2198 // grpc-retry-attempts header.
2199 retry_state->send_initial_metadata_storage =
2200 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2201 calld->arena, sizeof(grpc_linked_mdelem) *
2202 (calld->send_initial_metadata.list.count +
2203 (calld->num_attempts_completed > 0))));
2204 grpc_metadata_batch_copy(&calld->send_initial_metadata,
2205 &retry_state->send_initial_metadata,
2206 retry_state->send_initial_metadata_storage);
2207 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
2208 .grpc_previous_rpc_attempts != nullptr)) {
2209 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
2210 retry_state->send_initial_metadata.idx.named
2211 .grpc_previous_rpc_attempts);
2212 }
2213 if (GPR_UNLIKELY(calld->num_attempts_completed > 0)) {
2214 grpc_mdelem retry_md = grpc_mdelem_from_slices(
2215 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
2216 *retry_count_strings[calld->num_attempts_completed - 1]);
2217 grpc_error* error = grpc_metadata_batch_add_tail(
2218 &retry_state->send_initial_metadata,
2219 &retry_state->send_initial_metadata_storage[calld->send_initial_metadata
2220 .list.count],
2221 retry_md);
2222 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2223 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
2224 grpc_error_string(error));
2225 GPR_ASSERT(false);
2226 }
2227 }
2228 retry_state->started_send_initial_metadata = true;
2229 batch_data->batch.send_initial_metadata = true;
2230 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
2231 &retry_state->send_initial_metadata;
2232 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
2233 calld->send_initial_metadata_flags;
2234 batch_data->batch.payload->send_initial_metadata.peer_string =
2235 calld->peer_string;
2236 }
2237
2238 // Adds retriable send_message op to batch_data.
add_retriable_send_message_op(grpc_call_element * elem,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2239 static void add_retriable_send_message_op(
2240 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2241 subchannel_batch_data* batch_data) {
2242 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2243 call_data* calld = static_cast<call_data*>(elem->call_data);
2244 if (grpc_client_channel_trace.enabled()) {
2245 gpr_log(GPR_INFO,
2246 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
2247 chand, calld, retry_state->started_send_message_count);
2248 }
2249 grpc_core::ByteStreamCache* cache =
2250 (*calld->send_messages)[retry_state->started_send_message_count];
2251 ++retry_state->started_send_message_count;
2252 retry_state->send_message.Init(cache);
2253 batch_data->batch.send_message = true;
2254 batch_data->batch.payload->send_message.send_message.reset(
2255 retry_state->send_message.get());
2256 }
2257
2258 // Adds retriable send_trailing_metadata op to batch_data.
add_retriable_send_trailing_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2259 static void add_retriable_send_trailing_metadata_op(
2260 call_data* calld, subchannel_call_retry_state* retry_state,
2261 subchannel_batch_data* batch_data) {
2262 // We need to make a copy of the metadata batch for each attempt, since
2263 // the filters in the subchannel stack may modify this batch, and we don't
2264 // want those modifications to be passed forward to subsequent attempts.
2265 retry_state->send_trailing_metadata_storage =
2266 static_cast<grpc_linked_mdelem*>(gpr_arena_alloc(
2267 calld->arena, sizeof(grpc_linked_mdelem) *
2268 calld->send_trailing_metadata.list.count));
2269 grpc_metadata_batch_copy(&calld->send_trailing_metadata,
2270 &retry_state->send_trailing_metadata,
2271 retry_state->send_trailing_metadata_storage);
2272 retry_state->started_send_trailing_metadata = true;
2273 batch_data->batch.send_trailing_metadata = true;
2274 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
2275 &retry_state->send_trailing_metadata;
2276 }
2277
2278 // Adds retriable recv_initial_metadata op to batch_data.
add_retriable_recv_initial_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2279 static void add_retriable_recv_initial_metadata_op(
2280 call_data* calld, subchannel_call_retry_state* retry_state,
2281 subchannel_batch_data* batch_data) {
2282 retry_state->started_recv_initial_metadata = true;
2283 batch_data->batch.recv_initial_metadata = true;
2284 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
2285 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
2286 &retry_state->recv_initial_metadata;
2287 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
2288 &retry_state->trailing_metadata_available;
2289 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
2290 recv_initial_metadata_ready, batch_data,
2291 grpc_schedule_on_exec_ctx);
2292 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
2293 &retry_state->recv_initial_metadata_ready;
2294 }
2295
2296 // Adds retriable recv_message op to batch_data.
add_retriable_recv_message_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2297 static void add_retriable_recv_message_op(
2298 call_data* calld, subchannel_call_retry_state* retry_state,
2299 subchannel_batch_data* batch_data) {
2300 ++retry_state->started_recv_message_count;
2301 batch_data->batch.recv_message = true;
2302 batch_data->batch.payload->recv_message.recv_message =
2303 &retry_state->recv_message;
2304 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, recv_message_ready,
2305 batch_data, grpc_schedule_on_exec_ctx);
2306 batch_data->batch.payload->recv_message.recv_message_ready =
2307 &retry_state->recv_message_ready;
2308 }
2309
2310 // Adds retriable recv_trailing_metadata op to batch_data.
add_retriable_recv_trailing_metadata_op(call_data * calld,subchannel_call_retry_state * retry_state,subchannel_batch_data * batch_data)2311 static void add_retriable_recv_trailing_metadata_op(
2312 call_data* calld, subchannel_call_retry_state* retry_state,
2313 subchannel_batch_data* batch_data) {
2314 retry_state->started_recv_trailing_metadata = true;
2315 batch_data->batch.recv_trailing_metadata = true;
2316 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
2317 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
2318 &retry_state->recv_trailing_metadata;
2319 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
2320 &retry_state->collect_stats;
2321 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
2322 recv_trailing_metadata_ready, batch_data,
2323 grpc_schedule_on_exec_ctx);
2324 batch_data->batch.payload->recv_trailing_metadata
2325 .recv_trailing_metadata_ready =
2326 &retry_state->recv_trailing_metadata_ready;
2327 }
2328
2329 // Helper function used to start a recv_trailing_metadata batch. This
2330 // is used in the case where a recv_initial_metadata or recv_message
2331 // op fails in a way that we know the call is over but when the application
2332 // has not yet started its own recv_trailing_metadata op.
start_internal_recv_trailing_metadata(grpc_call_element * elem)2333 static void start_internal_recv_trailing_metadata(grpc_call_element* elem) {
2334 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2335 call_data* calld = static_cast<call_data*>(elem->call_data);
2336 if (grpc_client_channel_trace.enabled()) {
2337 gpr_log(GPR_INFO,
2338 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
2339 "started; starting it internally",
2340 chand, calld);
2341 }
2342 subchannel_call_retry_state* retry_state =
2343 static_cast<subchannel_call_retry_state*>(
2344 grpc_connected_subchannel_call_get_parent_data(
2345 calld->subchannel_call));
2346 // Create batch_data with 2 refs, since this batch will be unreffed twice:
2347 // once for the recv_trailing_metadata_ready callback when the subchannel
2348 // batch returns, and again when we actually get a recv_trailing_metadata
2349 // op from the surface.
2350 subchannel_batch_data* batch_data =
2351 batch_data_create(elem, 2, false /* set_on_complete */);
2352 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2353 retry_state->recv_trailing_metadata_internal_batch = batch_data;
2354 // Note: This will release the call combiner.
2355 grpc_subchannel_call_process_op(calld->subchannel_call, &batch_data->batch);
2356 }
2357
2358 // If there are any cached send ops that need to be replayed on the
2359 // current subchannel call, creates and returns a new subchannel batch
2360 // to replay those ops. Otherwise, returns nullptr.
maybe_create_subchannel_batch_for_replay(grpc_call_element * elem,subchannel_call_retry_state * retry_state)2361 static subchannel_batch_data* maybe_create_subchannel_batch_for_replay(
2362 grpc_call_element* elem, subchannel_call_retry_state* retry_state) {
2363 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2364 call_data* calld = static_cast<call_data*>(elem->call_data);
2365 subchannel_batch_data* replay_batch_data = nullptr;
2366 // send_initial_metadata.
2367 if (calld->seen_send_initial_metadata &&
2368 !retry_state->started_send_initial_metadata &&
2369 !calld->pending_send_initial_metadata) {
2370 if (grpc_client_channel_trace.enabled()) {
2371 gpr_log(GPR_INFO,
2372 "chand=%p calld=%p: replaying previously completed "
2373 "send_initial_metadata op",
2374 chand, calld);
2375 }
2376 replay_batch_data = batch_data_create(elem, 1, true /* set_on_complete */);
2377 add_retriable_send_initial_metadata_op(calld, retry_state,
2378 replay_batch_data);
2379 }
2380 // send_message.
2381 // Note that we can only have one send_message op in flight at a time.
2382 if (retry_state->started_send_message_count < calld->send_messages->size() &&
2383 retry_state->started_send_message_count ==
2384 retry_state->completed_send_message_count &&
2385 !calld->pending_send_message) {
2386 if (grpc_client_channel_trace.enabled()) {
2387 gpr_log(GPR_INFO,
2388 "chand=%p calld=%p: replaying previously completed "
2389 "send_message op",
2390 chand, calld);
2391 }
2392 if (replay_batch_data == nullptr) {
2393 replay_batch_data =
2394 batch_data_create(elem, 1, true /* set_on_complete */);
2395 }
2396 add_retriable_send_message_op(elem, retry_state, replay_batch_data);
2397 }
2398 // send_trailing_metadata.
2399 // Note that we only add this op if we have no more send_message ops
2400 // to start, since we can't send down any more send_message ops after
2401 // send_trailing_metadata.
2402 if (calld->seen_send_trailing_metadata &&
2403 retry_state->started_send_message_count == calld->send_messages->size() &&
2404 !retry_state->started_send_trailing_metadata &&
2405 !calld->pending_send_trailing_metadata) {
2406 if (grpc_client_channel_trace.enabled()) {
2407 gpr_log(GPR_INFO,
2408 "chand=%p calld=%p: replaying previously completed "
2409 "send_trailing_metadata op",
2410 chand, calld);
2411 }
2412 if (replay_batch_data == nullptr) {
2413 replay_batch_data =
2414 batch_data_create(elem, 1, true /* set_on_complete */);
2415 }
2416 add_retriable_send_trailing_metadata_op(calld, retry_state,
2417 replay_batch_data);
2418 }
2419 return replay_batch_data;
2420 }
2421
2422 // Adds subchannel batches for pending batches to batches, updating
2423 // *num_batches as needed.
add_subchannel_batches_for_pending_batches(grpc_call_element * elem,subchannel_call_retry_state * retry_state,grpc_core::CallCombinerClosureList * closures)2424 static void add_subchannel_batches_for_pending_batches(
2425 grpc_call_element* elem, subchannel_call_retry_state* retry_state,
2426 grpc_core::CallCombinerClosureList* closures) {
2427 call_data* calld = static_cast<call_data*>(elem->call_data);
2428 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
2429 pending_batch* pending = &calld->pending_batches[i];
2430 grpc_transport_stream_op_batch* batch = pending->batch;
2431 if (batch == nullptr) continue;
2432 // Skip any batch that either (a) has already been started on this
2433 // subchannel call or (b) we can't start yet because we're still
2434 // replaying send ops that need to be completed first.
2435 // TODO(roth): Note that if any one op in the batch can't be sent
2436 // yet due to ops that we're replaying, we don't start any of the ops
2437 // in the batch. This is probably okay, but it could conceivably
2438 // lead to increased latency in some cases -- e.g., we could delay
2439 // starting a recv op due to it being in the same batch with a send
2440 // op. If/when we revamp the callback protocol in
2441 // transport_stream_op_batch, we may be able to fix this.
2442 if (batch->send_initial_metadata &&
2443 retry_state->started_send_initial_metadata) {
2444 continue;
2445 }
2446 if (batch->send_message && retry_state->completed_send_message_count <
2447 retry_state->started_send_message_count) {
2448 continue;
2449 }
2450 // Note that we only start send_trailing_metadata if we have no more
2451 // send_message ops to start, since we can't send down any more
2452 // send_message ops after send_trailing_metadata.
2453 if (batch->send_trailing_metadata &&
2454 (retry_state->started_send_message_count + batch->send_message <
2455 calld->send_messages->size() ||
2456 retry_state->started_send_trailing_metadata)) {
2457 continue;
2458 }
2459 if (batch->recv_initial_metadata &&
2460 retry_state->started_recv_initial_metadata) {
2461 continue;
2462 }
2463 if (batch->recv_message && retry_state->completed_recv_message_count <
2464 retry_state->started_recv_message_count) {
2465 continue;
2466 }
2467 if (batch->recv_trailing_metadata &&
2468 retry_state->started_recv_trailing_metadata) {
2469 // If we previously completed a recv_trailing_metadata op
2470 // initiated by start_internal_recv_trailing_metadata(), use the
2471 // result of that instead of trying to re-start this op.
2472 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
2473 nullptr))) {
2474 // If the batch completed, then trigger the completion callback
2475 // directly, so that we return the previously returned results to
2476 // the application. Otherwise, just unref the internally
2477 // started subchannel batch, since we'll propagate the
2478 // completion when it completes.
2479 if (retry_state->completed_recv_trailing_metadata) {
2480 // Batches containing recv_trailing_metadata always succeed.
2481 closures->Add(
2482 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
2483 "re-executing recv_trailing_metadata_ready to propagate "
2484 "internally triggered result");
2485 } else {
2486 batch_data_unref(retry_state->recv_trailing_metadata_internal_batch);
2487 }
2488 retry_state->recv_trailing_metadata_internal_batch = nullptr;
2489 }
2490 continue;
2491 }
2492 // If we're not retrying, just send the batch as-is.
2493 if (calld->method_params == nullptr ||
2494 calld->method_params->retry_policy() == nullptr ||
2495 calld->retry_committed) {
2496 add_closure_for_subchannel_batch(elem, batch, closures);
2497 pending_batch_clear(calld, pending);
2498 continue;
2499 }
2500 // Create batch with the right number of callbacks.
2501 const bool has_send_ops = batch->send_initial_metadata ||
2502 batch->send_message ||
2503 batch->send_trailing_metadata;
2504 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
2505 batch->recv_message +
2506 batch->recv_trailing_metadata;
2507 subchannel_batch_data* batch_data = batch_data_create(
2508 elem, num_callbacks, has_send_ops /* set_on_complete */);
2509 // Cache send ops if needed.
2510 maybe_cache_send_ops_for_batch(calld, pending);
2511 // send_initial_metadata.
2512 if (batch->send_initial_metadata) {
2513 add_retriable_send_initial_metadata_op(calld, retry_state, batch_data);
2514 }
2515 // send_message.
2516 if (batch->send_message) {
2517 add_retriable_send_message_op(elem, retry_state, batch_data);
2518 }
2519 // send_trailing_metadata.
2520 if (batch->send_trailing_metadata) {
2521 add_retriable_send_trailing_metadata_op(calld, retry_state, batch_data);
2522 }
2523 // recv_initial_metadata.
2524 if (batch->recv_initial_metadata) {
2525 // recv_flags is only used on the server side.
2526 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
2527 add_retriable_recv_initial_metadata_op(calld, retry_state, batch_data);
2528 }
2529 // recv_message.
2530 if (batch->recv_message) {
2531 add_retriable_recv_message_op(calld, retry_state, batch_data);
2532 }
2533 // recv_trailing_metadata.
2534 if (batch->recv_trailing_metadata) {
2535 add_retriable_recv_trailing_metadata_op(calld, retry_state, batch_data);
2536 }
2537 add_closure_for_subchannel_batch(elem, &batch_data->batch, closures);
2538 // Track number of pending subchannel send batches.
2539 // If this is the first one, take a ref to the call stack.
2540 if (batch->send_initial_metadata || batch->send_message ||
2541 batch->send_trailing_metadata) {
2542 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2543 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2544 }
2545 ++calld->num_pending_retriable_subchannel_send_batches;
2546 }
2547 }
2548 }
2549
2550 // Constructs and starts whatever subchannel batches are needed on the
2551 // subchannel call.
start_retriable_subchannel_batches(void * arg,grpc_error * ignored)2552 static void start_retriable_subchannel_batches(void* arg, grpc_error* ignored) {
2553 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2554 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2555 call_data* calld = static_cast<call_data*>(elem->call_data);
2556 if (grpc_client_channel_trace.enabled()) {
2557 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
2558 chand, calld);
2559 }
2560 subchannel_call_retry_state* retry_state =
2561 static_cast<subchannel_call_retry_state*>(
2562 grpc_connected_subchannel_call_get_parent_data(
2563 calld->subchannel_call));
2564 // Construct list of closures to execute, one for each pending batch.
2565 grpc_core::CallCombinerClosureList closures;
2566 // Replay previously-returned send_* ops if needed.
2567 subchannel_batch_data* replay_batch_data =
2568 maybe_create_subchannel_batch_for_replay(elem, retry_state);
2569 if (replay_batch_data != nullptr) {
2570 add_closure_for_subchannel_batch(elem, &replay_batch_data->batch,
2571 &closures);
2572 // Track number of pending subchannel send batches.
2573 // If this is the first one, take a ref to the call stack.
2574 if (calld->num_pending_retriable_subchannel_send_batches == 0) {
2575 GRPC_CALL_STACK_REF(calld->owning_call, "subchannel_send_batches");
2576 }
2577 ++calld->num_pending_retriable_subchannel_send_batches;
2578 }
2579 // Now add pending batches.
2580 add_subchannel_batches_for_pending_batches(elem, retry_state, &closures);
2581 // Start batches on subchannel call.
2582 if (grpc_client_channel_trace.enabled()) {
2583 gpr_log(GPR_INFO,
2584 "chand=%p calld=%p: starting %" PRIuPTR
2585 " retriable batches on subchannel_call=%p",
2586 chand, calld, closures.size(), calld->subchannel_call);
2587 }
2588 // Note: This will yield the call combiner.
2589 closures.RunClosures(calld->call_combiner);
2590 }
2591
2592 //
2593 // Channelz
2594 //
2595
recv_trailing_metadata_ready_channelz(void * arg,grpc_error * error)2596 static void recv_trailing_metadata_ready_channelz(void* arg,
2597 grpc_error* error) {
2598 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2599 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2600 call_data* calld = static_cast<call_data*>(elem->call_data);
2601 if (grpc_client_channel_trace.enabled()) {
2602 gpr_log(GPR_INFO,
2603 "chand=%p calld=%p: got recv_trailing_metadata_ready_channelz, "
2604 "error=%s",
2605 chand, calld, grpc_error_string(error));
2606 }
2607 GPR_ASSERT(calld->recv_trailing_metadata != nullptr);
2608 grpc_status_code status = GRPC_STATUS_OK;
2609 grpc_metadata_batch* md_batch = calld->recv_trailing_metadata;
2610 get_call_status(elem, md_batch, GRPC_ERROR_REF(error), &status, nullptr);
2611 grpc_core::channelz::SubchannelNode* channelz_subchannel =
2612 calld->pick.connected_subchannel->channelz_subchannel();
2613 GPR_ASSERT(channelz_subchannel != nullptr);
2614 if (status == GRPC_STATUS_OK) {
2615 channelz_subchannel->RecordCallSucceeded();
2616 } else {
2617 channelz_subchannel->RecordCallFailed();
2618 }
2619 calld->recv_trailing_metadata = nullptr;
2620 GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata, error);
2621 }
2622
2623 // If channelz is enabled, intercept recv_trailing so that we may check the
2624 // status and associate it to a subchannel.
2625 // Returns true if callback was intercepted, false otherwise.
maybe_intercept_recv_trailing_metadata_for_channelz(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2626 static void maybe_intercept_recv_trailing_metadata_for_channelz(
2627 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2628 call_data* calld = static_cast<call_data*>(elem->call_data);
2629 // only intercept payloads with recv trailing.
2630 if (!batch->recv_trailing_metadata) {
2631 return;
2632 }
2633 // only add interceptor is channelz is enabled.
2634 if (calld->pick.connected_subchannel->channelz_subchannel() == nullptr) {
2635 return;
2636 }
2637 if (grpc_client_channel_trace.enabled()) {
2638 gpr_log(GPR_INFO,
2639 "calld=%p batch=%p: intercepting recv trailing for channelz", calld,
2640 batch);
2641 }
2642 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_channelz,
2643 recv_trailing_metadata_ready_channelz, elem,
2644 grpc_schedule_on_exec_ctx);
2645 // save some state needed for the interception callback.
2646 GPR_ASSERT(calld->recv_trailing_metadata == nullptr);
2647 calld->recv_trailing_metadata =
2648 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2649 calld->original_recv_trailing_metadata =
2650 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2651 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2652 &calld->recv_trailing_metadata_ready_channelz;
2653 }
2654
2655 //
2656 // LB pick
2657 //
2658
create_subchannel_call(grpc_call_element * elem,grpc_error * error)2659 static void create_subchannel_call(grpc_call_element* elem, grpc_error* error) {
2660 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2661 call_data* calld = static_cast<call_data*>(elem->call_data);
2662 const size_t parent_data_size =
2663 calld->enable_retries ? sizeof(subchannel_call_retry_state) : 0;
2664 const grpc_core::ConnectedSubchannel::CallArgs call_args = {
2665 calld->pollent, // pollent
2666 calld->path, // path
2667 calld->call_start_time, // start_time
2668 calld->deadline, // deadline
2669 calld->arena, // arena
2670 calld->pick.subchannel_call_context, // context
2671 calld->call_combiner, // call_combiner
2672 parent_data_size // parent_data_size
2673 };
2674 grpc_error* new_error = calld->pick.connected_subchannel->CreateCall(
2675 call_args, &calld->subchannel_call);
2676 if (grpc_client_channel_trace.enabled()) {
2677 gpr_log(GPR_INFO, "chand=%p calld=%p: create subchannel_call=%p: error=%s",
2678 chand, calld, calld->subchannel_call, grpc_error_string(new_error));
2679 }
2680 if (GPR_UNLIKELY(new_error != GRPC_ERROR_NONE)) {
2681 new_error = grpc_error_add_child(new_error, error);
2682 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2683 } else {
2684 grpc_core::channelz::SubchannelNode* channelz_subchannel =
2685 calld->pick.connected_subchannel->channelz_subchannel();
2686 if (channelz_subchannel != nullptr) {
2687 channelz_subchannel->RecordCallStarted();
2688 }
2689 if (parent_data_size > 0) {
2690 subchannel_call_retry_state* retry_state =
2691 static_cast<subchannel_call_retry_state*>(
2692 grpc_connected_subchannel_call_get_parent_data(
2693 calld->subchannel_call));
2694 retry_state->batch_payload.context = calld->pick.subchannel_call_context;
2695 }
2696 pending_batches_resume(elem);
2697 }
2698 GRPC_ERROR_UNREF(error);
2699 }
2700
2701 // Invoked when a pick is completed, on both success or failure.
pick_done(void * arg,grpc_error * error)2702 static void pick_done(void* arg, grpc_error* error) {
2703 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2704 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2705 call_data* calld = static_cast<call_data*>(elem->call_data);
2706 if (GPR_UNLIKELY(calld->pick.connected_subchannel == nullptr)) {
2707 // Failed to create subchannel.
2708 // If there was no error, this is an LB policy drop, in which case
2709 // we return an error; otherwise, we may retry.
2710 grpc_status_code status = GRPC_STATUS_OK;
2711 grpc_error_get_status(error, calld->deadline, &status, nullptr, nullptr,
2712 nullptr);
2713 if (error == GRPC_ERROR_NONE || !calld->enable_retries ||
2714 !maybe_retry(elem, nullptr /* batch_data */, status,
2715 nullptr /* server_pushback_md */)) {
2716 grpc_error* new_error =
2717 error == GRPC_ERROR_NONE
2718 ? GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2719 "Call dropped by load balancing policy")
2720 : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2721 "Failed to create subchannel", &error, 1);
2722 if (grpc_client_channel_trace.enabled()) {
2723 gpr_log(GPR_INFO,
2724 "chand=%p calld=%p: failed to create subchannel: error=%s",
2725 chand, calld, grpc_error_string(new_error));
2726 }
2727 pending_batches_fail(elem, new_error, true /* yield_call_combiner */);
2728 }
2729 } else {
2730 /* Create call on subchannel. */
2731 create_subchannel_call(elem, GRPC_ERROR_REF(error));
2732 }
2733 }
2734
maybe_add_call_to_channel_interested_parties_locked(grpc_call_element * elem)2735 static void maybe_add_call_to_channel_interested_parties_locked(
2736 grpc_call_element* elem) {
2737 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2738 call_data* calld = static_cast<call_data*>(elem->call_data);
2739 if (!calld->pollent_added_to_interested_parties) {
2740 calld->pollent_added_to_interested_parties = true;
2741 grpc_polling_entity_add_to_pollset_set(calld->pollent,
2742 chand->interested_parties);
2743 }
2744 }
2745
maybe_del_call_from_channel_interested_parties_locked(grpc_call_element * elem)2746 static void maybe_del_call_from_channel_interested_parties_locked(
2747 grpc_call_element* elem) {
2748 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2749 call_data* calld = static_cast<call_data*>(elem->call_data);
2750 if (calld->pollent_added_to_interested_parties) {
2751 calld->pollent_added_to_interested_parties = false;
2752 grpc_polling_entity_del_from_pollset_set(calld->pollent,
2753 chand->interested_parties);
2754 }
2755 }
2756
2757 // Invoked when a pick is completed to leave the client_channel combiner
2758 // and continue processing in the call combiner.
2759 // If needed, removes the call's polling entity from chand->interested_parties.
pick_done_locked(grpc_call_element * elem,grpc_error * error)2760 static void pick_done_locked(grpc_call_element* elem, grpc_error* error) {
2761 call_data* calld = static_cast<call_data*>(elem->call_data);
2762 maybe_del_call_from_channel_interested_parties_locked(elem);
2763 GRPC_CLOSURE_INIT(&calld->pick_closure, pick_done, elem,
2764 grpc_schedule_on_exec_ctx);
2765 GRPC_CLOSURE_SCHED(&calld->pick_closure, error);
2766 }
2767
2768 namespace grpc_core {
2769
2770 // Performs subchannel pick via LB policy.
2771 class LbPicker {
2772 public:
2773 // Starts a pick on chand->lb_policy.
StartLocked(grpc_call_element * elem)2774 static void StartLocked(grpc_call_element* elem) {
2775 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2776 call_data* calld = static_cast<call_data*>(elem->call_data);
2777 if (grpc_client_channel_trace.enabled()) {
2778 gpr_log(GPR_INFO, "chand=%p calld=%p: starting pick on lb_policy=%p",
2779 chand, calld, chand->lb_policy.get());
2780 }
2781 // If this is a retry, use the send_initial_metadata payload that
2782 // we've cached; otherwise, use the pending batch. The
2783 // send_initial_metadata batch will be the first pending batch in the
2784 // list, as set by get_batch_index() above.
2785 calld->pick.initial_metadata =
2786 calld->seen_send_initial_metadata
2787 ? &calld->send_initial_metadata
2788 : calld->pending_batches[0]
2789 .batch->payload->send_initial_metadata.send_initial_metadata;
2790 calld->pick.initial_metadata_flags =
2791 calld->seen_send_initial_metadata
2792 ? calld->send_initial_metadata_flags
2793 : calld->pending_batches[0]
2794 .batch->payload->send_initial_metadata
2795 .send_initial_metadata_flags;
2796 GRPC_CLOSURE_INIT(&calld->pick_closure, &LbPicker::DoneLocked, elem,
2797 grpc_combiner_scheduler(chand->combiner));
2798 calld->pick.on_complete = &calld->pick_closure;
2799 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback");
2800 grpc_error* error = GRPC_ERROR_NONE;
2801 const bool pick_done = chand->lb_policy->PickLocked(&calld->pick, &error);
2802 if (GPR_LIKELY(pick_done)) {
2803 // Pick completed synchronously.
2804 if (grpc_client_channel_trace.enabled()) {
2805 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed synchronously",
2806 chand, calld);
2807 }
2808 pick_done_locked(elem, error);
2809 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2810 } else {
2811 // Pick will be returned asynchronously.
2812 // Add the polling entity from call_data to the channel_data's
2813 // interested_parties, so that the I/O of the LB policy can be done
2814 // under it. It will be removed in pick_done_locked().
2815 maybe_add_call_to_channel_interested_parties_locked(elem);
2816 // Request notification on call cancellation.
2817 GRPC_CALL_STACK_REF(calld->owning_call, "pick_callback_cancel");
2818 grpc_call_combiner_set_notify_on_cancel(
2819 calld->call_combiner,
2820 GRPC_CLOSURE_INIT(&calld->pick_cancel_closure,
2821 &LbPicker::CancelLocked, elem,
2822 grpc_combiner_scheduler(chand->combiner)));
2823 }
2824 }
2825
2826 private:
2827 // Callback invoked by LoadBalancingPolicy::PickLocked() for async picks.
2828 // Unrefs the LB policy and invokes pick_done_locked().
DoneLocked(void * arg,grpc_error * error)2829 static void DoneLocked(void* arg, grpc_error* error) {
2830 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2831 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2832 call_data* calld = static_cast<call_data*>(elem->call_data);
2833 if (grpc_client_channel_trace.enabled()) {
2834 gpr_log(GPR_INFO, "chand=%p calld=%p: pick completed asynchronously",
2835 chand, calld);
2836 }
2837 pick_done_locked(elem, GRPC_ERROR_REF(error));
2838 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback");
2839 }
2840
2841 // Note: This runs under the client_channel combiner, but will NOT be
2842 // holding the call combiner.
CancelLocked(void * arg,grpc_error * error)2843 static void CancelLocked(void* arg, grpc_error* error) {
2844 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2845 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2846 call_data* calld = static_cast<call_data*>(elem->call_data);
2847 // Note: chand->lb_policy may have changed since we started our pick,
2848 // in which case we will be cancelling the pick on a policy other than
2849 // the one we started it on. However, this will just be a no-op.
2850 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE && chand->lb_policy != nullptr)) {
2851 if (grpc_client_channel_trace.enabled()) {
2852 gpr_log(GPR_INFO,
2853 "chand=%p calld=%p: cancelling pick from LB policy %p", chand,
2854 calld, chand->lb_policy.get());
2855 }
2856 chand->lb_policy->CancelPickLocked(&calld->pick, GRPC_ERROR_REF(error));
2857 }
2858 GRPC_CALL_STACK_UNREF(calld->owning_call, "pick_callback_cancel");
2859 }
2860 };
2861
2862 } // namespace grpc_core
2863
2864 // Applies service config to the call. Must be invoked once we know
2865 // that the resolver has returned results to the channel.
apply_service_config_to_call_locked(grpc_call_element * elem)2866 static void apply_service_config_to_call_locked(grpc_call_element* elem) {
2867 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2868 call_data* calld = static_cast<call_data*>(elem->call_data);
2869 if (grpc_client_channel_trace.enabled()) {
2870 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2871 chand, calld);
2872 }
2873 if (chand->retry_throttle_data != nullptr) {
2874 calld->retry_throttle_data = chand->retry_throttle_data->Ref();
2875 }
2876 if (chand->method_params_table != nullptr) {
2877 calld->method_params = grpc_core::ServiceConfig::MethodConfigTableLookup(
2878 *chand->method_params_table, calld->path);
2879 if (calld->method_params != nullptr) {
2880 // If the deadline from the service config is shorter than the one
2881 // from the client API, reset the deadline timer.
2882 if (chand->deadline_checking_enabled &&
2883 calld->method_params->timeout() != 0) {
2884 const grpc_millis per_method_deadline =
2885 grpc_timespec_to_millis_round_up(calld->call_start_time) +
2886 calld->method_params->timeout();
2887 if (per_method_deadline < calld->deadline) {
2888 calld->deadline = per_method_deadline;
2889 grpc_deadline_state_reset(elem, calld->deadline);
2890 }
2891 }
2892 // If the service config set wait_for_ready and the application
2893 // did not explicitly set it, use the value from the service config.
2894 uint32_t* send_initial_metadata_flags =
2895 &calld->pending_batches[0]
2896 .batch->payload->send_initial_metadata
2897 .send_initial_metadata_flags;
2898 if (GPR_UNLIKELY(
2899 calld->method_params->wait_for_ready() !=
2900 ClientChannelMethodParams::WAIT_FOR_READY_UNSET &&
2901 !(*send_initial_metadata_flags &
2902 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET))) {
2903 if (calld->method_params->wait_for_ready() ==
2904 ClientChannelMethodParams::WAIT_FOR_READY_TRUE) {
2905 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2906 } else {
2907 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2908 }
2909 }
2910 }
2911 }
2912 // If no retry policy, disable retries.
2913 // TODO(roth): Remove this when adding support for transparent retries.
2914 if (calld->method_params == nullptr ||
2915 calld->method_params->retry_policy() == nullptr) {
2916 calld->enable_retries = false;
2917 }
2918 }
2919
2920 // Invoked once resolver results are available.
process_service_config_and_start_lb_pick_locked(grpc_call_element * elem)2921 static void process_service_config_and_start_lb_pick_locked(
2922 grpc_call_element* elem) {
2923 call_data* calld = static_cast<call_data*>(elem->call_data);
2924 // Only get service config data on the first attempt.
2925 if (GPR_LIKELY(calld->num_attempts_completed == 0)) {
2926 apply_service_config_to_call_locked(elem);
2927 }
2928 // Start LB pick.
2929 grpc_core::LbPicker::StartLocked(elem);
2930 }
2931
2932 namespace grpc_core {
2933
2934 // Handles waiting for a resolver result.
2935 // Used only for the first call on an idle channel.
2936 class ResolverResultWaiter {
2937 public:
ResolverResultWaiter(grpc_call_element * elem)2938 explicit ResolverResultWaiter(grpc_call_element* elem) : elem_(elem) {
2939 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2940 call_data* calld = static_cast<call_data*>(elem->call_data);
2941 if (grpc_client_channel_trace.enabled()) {
2942 gpr_log(GPR_INFO,
2943 "chand=%p calld=%p: deferring pick pending resolver result",
2944 chand, calld);
2945 }
2946 // Add closure to be run when a resolver result is available.
2947 GRPC_CLOSURE_INIT(&done_closure_, &ResolverResultWaiter::DoneLocked, this,
2948 grpc_combiner_scheduler(chand->combiner));
2949 AddToWaitingList();
2950 // Set cancellation closure, so that we abort if the call is cancelled.
2951 GRPC_CLOSURE_INIT(&cancel_closure_, &ResolverResultWaiter::CancelLocked,
2952 this, grpc_combiner_scheduler(chand->combiner));
2953 grpc_call_combiner_set_notify_on_cancel(calld->call_combiner,
2954 &cancel_closure_);
2955 }
2956
2957 private:
2958 // Adds closure_ to chand->waiting_for_resolver_result_closures.
AddToWaitingList()2959 void AddToWaitingList() {
2960 channel_data* chand = static_cast<channel_data*>(elem_->channel_data);
2961 grpc_closure_list_append(&chand->waiting_for_resolver_result_closures,
2962 &done_closure_, GRPC_ERROR_NONE);
2963 }
2964
2965 // Invoked when a resolver result is available.
DoneLocked(void * arg,grpc_error * error)2966 static void DoneLocked(void* arg, grpc_error* error) {
2967 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
2968 // If CancelLocked() has already run, delete ourselves without doing
2969 // anything. Note that the call stack may have already been destroyed,
2970 // so it's not safe to access anything in elem_.
2971 if (GPR_UNLIKELY(self->finished_)) {
2972 if (grpc_client_channel_trace.enabled()) {
2973 gpr_log(GPR_INFO, "call cancelled before resolver result");
2974 }
2975 Delete(self);
2976 return;
2977 }
2978 // Otherwise, process the resolver result.
2979 grpc_call_element* elem = self->elem_;
2980 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
2981 call_data* calld = static_cast<call_data*>(elem->call_data);
2982 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2983 if (grpc_client_channel_trace.enabled()) {
2984 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver failed to return data",
2985 chand, calld);
2986 }
2987 pick_done_locked(elem, GRPC_ERROR_REF(error));
2988 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
2989 // Shutting down.
2990 if (grpc_client_channel_trace.enabled()) {
2991 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver disconnected", chand,
2992 calld);
2993 }
2994 pick_done_locked(elem,
2995 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
2996 } else if (GPR_UNLIKELY(chand->lb_policy == nullptr)) {
2997 // Transient resolver failure.
2998 // If call has wait_for_ready=true, try again; otherwise, fail.
2999 uint32_t send_initial_metadata_flags =
3000 calld->seen_send_initial_metadata
3001 ? calld->send_initial_metadata_flags
3002 : calld->pending_batches[0]
3003 .batch->payload->send_initial_metadata
3004 .send_initial_metadata_flags;
3005 if (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) {
3006 if (grpc_client_channel_trace.enabled()) {
3007 gpr_log(GPR_INFO,
3008 "chand=%p calld=%p: resolver returned but no LB policy; "
3009 "wait_for_ready=true; trying again",
3010 chand, calld);
3011 }
3012 // Re-add ourselves to the waiting list.
3013 self->AddToWaitingList();
3014 // Return early so that we don't set finished_ to true below.
3015 return;
3016 } else {
3017 if (grpc_client_channel_trace.enabled()) {
3018 gpr_log(GPR_INFO,
3019 "chand=%p calld=%p: resolver returned but no LB policy; "
3020 "wait_for_ready=false; failing",
3021 chand, calld);
3022 }
3023 pick_done_locked(
3024 elem,
3025 grpc_error_set_int(
3026 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Name resolution failure"),
3027 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
3028 }
3029 } else {
3030 if (grpc_client_channel_trace.enabled()) {
3031 gpr_log(GPR_INFO, "chand=%p calld=%p: resolver returned, doing LB pick",
3032 chand, calld);
3033 }
3034 process_service_config_and_start_lb_pick_locked(elem);
3035 }
3036 self->finished_ = true;
3037 }
3038
3039 // Invoked when the call is cancelled.
3040 // Note: This runs under the client_channel combiner, but will NOT be
3041 // holding the call combiner.
CancelLocked(void * arg,grpc_error * error)3042 static void CancelLocked(void* arg, grpc_error* error) {
3043 ResolverResultWaiter* self = static_cast<ResolverResultWaiter*>(arg);
3044 // If DoneLocked() has already run, delete ourselves without doing anything.
3045 if (GPR_LIKELY(self->finished_)) {
3046 Delete(self);
3047 return;
3048 }
3049 // If we are being cancelled, immediately invoke pick_done_locked()
3050 // to propagate the error back to the caller.
3051 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
3052 grpc_call_element* elem = self->elem_;
3053 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3054 call_data* calld = static_cast<call_data*>(elem->call_data);
3055 if (grpc_client_channel_trace.enabled()) {
3056 gpr_log(GPR_INFO,
3057 "chand=%p calld=%p: cancelling call waiting for name "
3058 "resolution",
3059 chand, calld);
3060 }
3061 // Note: Although we are not in the call combiner here, we are
3062 // basically stealing the call combiner from the pending pick, so
3063 // it's safe to call pick_done_locked() here -- we are essentially
3064 // calling it here instead of calling it in DoneLocked().
3065 pick_done_locked(elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3066 "Pick cancelled", &error, 1));
3067 }
3068 self->finished_ = true;
3069 }
3070
3071 grpc_call_element* elem_;
3072 grpc_closure done_closure_;
3073 grpc_closure cancel_closure_;
3074 bool finished_ = false;
3075 };
3076
3077 } // namespace grpc_core
3078
start_pick_locked(void * arg,grpc_error * ignored)3079 static void start_pick_locked(void* arg, grpc_error* ignored) {
3080 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3081 call_data* calld = static_cast<call_data*>(elem->call_data);
3082 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3083 GPR_ASSERT(calld->pick.connected_subchannel == nullptr);
3084 GPR_ASSERT(calld->subchannel_call == nullptr);
3085 if (GPR_LIKELY(chand->lb_policy != nullptr)) {
3086 // We already have resolver results, so process the service config
3087 // and start an LB pick.
3088 process_service_config_and_start_lb_pick_locked(elem);
3089 } else if (GPR_UNLIKELY(chand->resolver == nullptr)) {
3090 pick_done_locked(elem,
3091 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected"));
3092 } else {
3093 // We do not yet have an LB policy, so wait for a resolver result.
3094 if (GPR_UNLIKELY(!chand->started_resolving)) {
3095 start_resolving_locked(chand);
3096 }
3097 // Create a new waiter, which will delete itself when done.
3098 grpc_core::New<grpc_core::ResolverResultWaiter>(elem);
3099 // Add the polling entity from call_data to the channel_data's
3100 // interested_parties, so that the I/O of the resolver can be done
3101 // under it. It will be removed in pick_done_locked().
3102 maybe_add_call_to_channel_interested_parties_locked(elem);
3103 }
3104 }
3105
3106 //
3107 // filter call vtable functions
3108 //
3109
cc_start_transport_stream_op_batch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)3110 static void cc_start_transport_stream_op_batch(
3111 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
3112 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
3113 call_data* calld = static_cast<call_data*>(elem->call_data);
3114 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3115 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3116 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
3117 }
3118 // If we've previously been cancelled, immediately fail any new batches.
3119 if (GPR_UNLIKELY(calld->cancel_error != GRPC_ERROR_NONE)) {
3120 if (grpc_client_channel_trace.enabled()) {
3121 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
3122 chand, calld, grpc_error_string(calld->cancel_error));
3123 }
3124 // Note: This will release the call combiner.
3125 grpc_transport_stream_op_batch_finish_with_failure(
3126 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
3127 return;
3128 }
3129 // Handle cancellation.
3130 if (GPR_UNLIKELY(batch->cancel_stream)) {
3131 // Stash a copy of cancel_error in our call data, so that we can use
3132 // it for subsequent operations. This ensures that if the call is
3133 // cancelled before any batches are passed down (e.g., if the deadline
3134 // is in the past when the call starts), we can return the right
3135 // error to the caller when the first batch does get passed down.
3136 GRPC_ERROR_UNREF(calld->cancel_error);
3137 calld->cancel_error =
3138 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3139 if (grpc_client_channel_trace.enabled()) {
3140 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
3141 calld, grpc_error_string(calld->cancel_error));
3142 }
3143 // If we do not have a subchannel call (i.e., a pick has not yet
3144 // been started), fail all pending batches. Otherwise, send the
3145 // cancellation down to the subchannel call.
3146 if (calld->subchannel_call == nullptr) {
3147 pending_batches_fail(elem, GRPC_ERROR_REF(calld->cancel_error),
3148 false /* yield_call_combiner */);
3149 // Note: This will release the call combiner.
3150 grpc_transport_stream_op_batch_finish_with_failure(
3151 batch, GRPC_ERROR_REF(calld->cancel_error), calld->call_combiner);
3152 } else {
3153 // Note: This will release the call combiner.
3154 grpc_subchannel_call_process_op(calld->subchannel_call, batch);
3155 }
3156 return;
3157 }
3158 // Add the batch to the pending list.
3159 pending_batches_add(elem, batch);
3160 // Check if we've already gotten a subchannel call.
3161 // Note that once we have completed the pick, we do not need to enter
3162 // the channel combiner, which is more efficient (especially for
3163 // streaming calls).
3164 if (calld->subchannel_call != nullptr) {
3165 if (grpc_client_channel_trace.enabled()) {
3166 gpr_log(GPR_INFO,
3167 "chand=%p calld=%p: starting batch on subchannel_call=%p", chand,
3168 calld, calld->subchannel_call);
3169 }
3170 pending_batches_resume(elem);
3171 return;
3172 }
3173 // We do not yet have a subchannel call.
3174 // For batches containing a send_initial_metadata op, enter the channel
3175 // combiner to start a pick.
3176 if (GPR_LIKELY(batch->send_initial_metadata)) {
3177 if (grpc_client_channel_trace.enabled()) {
3178 gpr_log(GPR_INFO, "chand=%p calld=%p: entering client_channel combiner",
3179 chand, calld);
3180 }
3181 GRPC_CLOSURE_SCHED(
3182 GRPC_CLOSURE_INIT(&batch->handler_private.closure, start_pick_locked,
3183 elem, grpc_combiner_scheduler(chand->combiner)),
3184 GRPC_ERROR_NONE);
3185 } else {
3186 // For all other batches, release the call combiner.
3187 if (grpc_client_channel_trace.enabled()) {
3188 gpr_log(GPR_INFO,
3189 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
3190 calld);
3191 }
3192 GRPC_CALL_COMBINER_STOP(calld->call_combiner,
3193 "batch does not include send_initial_metadata");
3194 }
3195 }
3196
3197 /* Constructor for call_data */
cc_init_call_elem(grpc_call_element * elem,const grpc_call_element_args * args)3198 static grpc_error* cc_init_call_elem(grpc_call_element* elem,
3199 const grpc_call_element_args* args) {
3200 call_data* calld = static_cast<call_data*>(elem->call_data);
3201 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3202 // Initialize data members.
3203 calld->path = grpc_slice_ref_internal(args->path);
3204 calld->call_start_time = args->start_time;
3205 calld->deadline = args->deadline;
3206 calld->arena = args->arena;
3207 calld->owning_call = args->call_stack;
3208 calld->call_combiner = args->call_combiner;
3209 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3210 grpc_deadline_state_init(elem, args->call_stack, args->call_combiner,
3211 calld->deadline);
3212 }
3213 calld->enable_retries = chand->enable_retries;
3214 calld->send_messages.Init();
3215 return GRPC_ERROR_NONE;
3216 }
3217
3218 /* Destructor for call_data */
cc_destroy_call_elem(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)3219 static void cc_destroy_call_elem(grpc_call_element* elem,
3220 const grpc_call_final_info* final_info,
3221 grpc_closure* then_schedule_closure) {
3222 call_data* calld = static_cast<call_data*>(elem->call_data);
3223 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3224 if (GPR_LIKELY(chand->deadline_checking_enabled)) {
3225 grpc_deadline_state_destroy(elem);
3226 }
3227 grpc_slice_unref_internal(calld->path);
3228 calld->retry_throttle_data.reset();
3229 calld->method_params.reset();
3230 GRPC_ERROR_UNREF(calld->cancel_error);
3231 if (GPR_LIKELY(calld->subchannel_call != nullptr)) {
3232 grpc_subchannel_call_set_cleanup_closure(calld->subchannel_call,
3233 then_schedule_closure);
3234 then_schedule_closure = nullptr;
3235 GRPC_SUBCHANNEL_CALL_UNREF(calld->subchannel_call,
3236 "client_channel_destroy_call");
3237 }
3238 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches); ++i) {
3239 GPR_ASSERT(calld->pending_batches[i].batch == nullptr);
3240 }
3241 if (GPR_LIKELY(calld->pick.connected_subchannel != nullptr)) {
3242 calld->pick.connected_subchannel.reset();
3243 }
3244 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) {
3245 if (calld->pick.subchannel_call_context[i].value != nullptr) {
3246 calld->pick.subchannel_call_context[i].destroy(
3247 calld->pick.subchannel_call_context[i].value);
3248 }
3249 }
3250 calld->send_messages.Destroy();
3251 GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE);
3252 }
3253
cc_set_pollset_or_pollset_set(grpc_call_element * elem,grpc_polling_entity * pollent)3254 static void cc_set_pollset_or_pollset_set(grpc_call_element* elem,
3255 grpc_polling_entity* pollent) {
3256 call_data* calld = static_cast<call_data*>(elem->call_data);
3257 calld->pollent = pollent;
3258 }
3259
3260 /*************************************************************************
3261 * EXPORTED SYMBOLS
3262 */
3263
3264 const grpc_channel_filter grpc_client_channel_filter = {
3265 cc_start_transport_stream_op_batch,
3266 cc_start_transport_op,
3267 sizeof(call_data),
3268 cc_init_call_elem,
3269 cc_set_pollset_or_pollset_set,
3270 cc_destroy_call_elem,
3271 sizeof(channel_data),
3272 cc_init_channel_elem,
3273 cc_destroy_channel_elem,
3274 cc_get_channel_info,
3275 "client-channel",
3276 };
3277
try_to_connect_locked(void * arg,grpc_error * error_ignored)3278 static void try_to_connect_locked(void* arg, grpc_error* error_ignored) {
3279 channel_data* chand = static_cast<channel_data*>(arg);
3280 if (chand->lb_policy != nullptr) {
3281 chand->lb_policy->ExitIdleLocked();
3282 } else {
3283 chand->exit_idle_when_lb_policy_arrives = true;
3284 if (!chand->started_resolving && chand->resolver != nullptr) {
3285 start_resolving_locked(chand);
3286 }
3287 }
3288 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack, "try_to_connect");
3289 }
3290
grpc_client_channel_populate_child_refs(grpc_channel_element * elem,grpc_core::ChildRefsList * child_subchannels,grpc_core::ChildRefsList * child_channels)3291 void grpc_client_channel_populate_child_refs(
3292 grpc_channel_element* elem, grpc_core::ChildRefsList* child_subchannels,
3293 grpc_core::ChildRefsList* child_channels) {
3294 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3295 if (chand->lb_policy != nullptr) {
3296 chand->lb_policy->FillChildRefsForChannelz(child_subchannels,
3297 child_channels);
3298 }
3299 }
3300
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)3301 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
3302 grpc_channel_element* elem, int try_to_connect) {
3303 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3304 grpc_connectivity_state out =
3305 grpc_connectivity_state_check(&chand->state_tracker);
3306 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
3307 GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
3308 GRPC_CLOSURE_SCHED(
3309 GRPC_CLOSURE_CREATE(try_to_connect_locked, chand,
3310 grpc_combiner_scheduler(chand->combiner)),
3311 GRPC_ERROR_NONE);
3312 }
3313 return out;
3314 }
3315
3316 typedef struct external_connectivity_watcher {
3317 channel_data* chand;
3318 grpc_polling_entity pollent;
3319 grpc_closure* on_complete;
3320 grpc_closure* watcher_timer_init;
3321 grpc_connectivity_state* state;
3322 grpc_closure my_closure;
3323 struct external_connectivity_watcher* next;
3324 } external_connectivity_watcher;
3325
lookup_external_connectivity_watcher(channel_data * chand,grpc_closure * on_complete)3326 static external_connectivity_watcher* lookup_external_connectivity_watcher(
3327 channel_data* chand, grpc_closure* on_complete) {
3328 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3329 external_connectivity_watcher* w =
3330 chand->external_connectivity_watcher_list_head;
3331 while (w != nullptr && w->on_complete != on_complete) {
3332 w = w->next;
3333 }
3334 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3335 return w;
3336 }
3337
external_connectivity_watcher_list_append(channel_data * chand,external_connectivity_watcher * w)3338 static void external_connectivity_watcher_list_append(
3339 channel_data* chand, external_connectivity_watcher* w) {
3340 GPR_ASSERT(!lookup_external_connectivity_watcher(chand, w->on_complete));
3341
3342 gpr_mu_lock(&w->chand->external_connectivity_watcher_list_mu);
3343 GPR_ASSERT(!w->next);
3344 w->next = chand->external_connectivity_watcher_list_head;
3345 chand->external_connectivity_watcher_list_head = w;
3346 gpr_mu_unlock(&w->chand->external_connectivity_watcher_list_mu);
3347 }
3348
external_connectivity_watcher_list_remove(channel_data * chand,external_connectivity_watcher * too_remove)3349 static void external_connectivity_watcher_list_remove(
3350 channel_data* chand, external_connectivity_watcher* too_remove) {
3351 GPR_ASSERT(
3352 lookup_external_connectivity_watcher(chand, too_remove->on_complete));
3353 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3354 if (too_remove == chand->external_connectivity_watcher_list_head) {
3355 chand->external_connectivity_watcher_list_head = too_remove->next;
3356 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3357 return;
3358 }
3359 external_connectivity_watcher* w =
3360 chand->external_connectivity_watcher_list_head;
3361 while (w != nullptr) {
3362 if (w->next == too_remove) {
3363 w->next = w->next->next;
3364 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3365 return;
3366 }
3367 w = w->next;
3368 }
3369 GPR_UNREACHABLE_CODE(return );
3370 }
3371
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)3372 int grpc_client_channel_num_external_connectivity_watchers(
3373 grpc_channel_element* elem) {
3374 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3375 int count = 0;
3376
3377 gpr_mu_lock(&chand->external_connectivity_watcher_list_mu);
3378 external_connectivity_watcher* w =
3379 chand->external_connectivity_watcher_list_head;
3380 while (w != nullptr) {
3381 count++;
3382 w = w->next;
3383 }
3384 gpr_mu_unlock(&chand->external_connectivity_watcher_list_mu);
3385
3386 return count;
3387 }
3388
on_external_watch_complete_locked(void * arg,grpc_error * error)3389 static void on_external_watch_complete_locked(void* arg, grpc_error* error) {
3390 external_connectivity_watcher* w =
3391 static_cast<external_connectivity_watcher*>(arg);
3392 grpc_closure* follow_up = w->on_complete;
3393 grpc_polling_entity_del_from_pollset_set(&w->pollent,
3394 w->chand->interested_parties);
3395 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3396 "external_connectivity_watcher");
3397 external_connectivity_watcher_list_remove(w->chand, w);
3398 gpr_free(w);
3399 GRPC_CLOSURE_SCHED(follow_up, GRPC_ERROR_REF(error));
3400 }
3401
watch_connectivity_state_locked(void * arg,grpc_error * error_ignored)3402 static void watch_connectivity_state_locked(void* arg,
3403 grpc_error* error_ignored) {
3404 external_connectivity_watcher* w =
3405 static_cast<external_connectivity_watcher*>(arg);
3406 external_connectivity_watcher* found = nullptr;
3407 if (w->state != nullptr) {
3408 external_connectivity_watcher_list_append(w->chand, w);
3409 // An assumption is being made that the closure is scheduled on the exec ctx
3410 // scheduler and that GRPC_CLOSURE_RUN would run the closure immediately.
3411 GRPC_CLOSURE_RUN(w->watcher_timer_init, GRPC_ERROR_NONE);
3412 GRPC_CLOSURE_INIT(&w->my_closure, on_external_watch_complete_locked, w,
3413 grpc_combiner_scheduler(w->chand->combiner));
3414 grpc_connectivity_state_notify_on_state_change(&w->chand->state_tracker,
3415 w->state, &w->my_closure);
3416 } else {
3417 GPR_ASSERT(w->watcher_timer_init == nullptr);
3418 found = lookup_external_connectivity_watcher(w->chand, w->on_complete);
3419 if (found) {
3420 GPR_ASSERT(found->on_complete == w->on_complete);
3421 grpc_connectivity_state_notify_on_state_change(
3422 &found->chand->state_tracker, nullptr, &found->my_closure);
3423 }
3424 grpc_polling_entity_del_from_pollset_set(&w->pollent,
3425 w->chand->interested_parties);
3426 GRPC_CHANNEL_STACK_UNREF(w->chand->owning_stack,
3427 "external_connectivity_watcher");
3428 gpr_free(w);
3429 }
3430 }
3431
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * closure,grpc_closure * watcher_timer_init)3432 void grpc_client_channel_watch_connectivity_state(
3433 grpc_channel_element* elem, grpc_polling_entity pollent,
3434 grpc_connectivity_state* state, grpc_closure* closure,
3435 grpc_closure* watcher_timer_init) {
3436 channel_data* chand = static_cast<channel_data*>(elem->channel_data);
3437 external_connectivity_watcher* w =
3438 static_cast<external_connectivity_watcher*>(gpr_zalloc(sizeof(*w)));
3439 w->chand = chand;
3440 w->pollent = pollent;
3441 w->on_complete = closure;
3442 w->state = state;
3443 w->watcher_timer_init = watcher_timer_init;
3444 grpc_polling_entity_add_to_pollset_set(&w->pollent,
3445 chand->interested_parties);
3446 GRPC_CHANNEL_STACK_REF(w->chand->owning_stack,
3447 "external_connectivity_watcher");
3448 GRPC_CLOSURE_SCHED(
3449 GRPC_CLOSURE_INIT(&w->my_closure, watch_connectivity_state_locked, w,
3450 grpc_combiner_scheduler(chand->combiner)),
3451 GRPC_ERROR_NONE);
3452 }
3453
grpc_client_channel_get_subchannel_call(grpc_call_element * elem)3454 grpc_subchannel_call* grpc_client_channel_get_subchannel_call(
3455 grpc_call_element* elem) {
3456 call_data* calld = static_cast<call_data*>(elem->call_data);
3457 return calld->subchannel_call;
3458 }
3459