1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26
27 #include <set>
28
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_throttle.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/manual_constructor.h"
65 #include "src/core/lib/gprpp/sync.h"
66 #include "src/core/lib/iomgr/iomgr.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/iomgr/work_serializer.h"
69 #include "src/core/lib/profiling/timers.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/slice/slice_string_helpers.h"
72 #include "src/core/lib/surface/channel.h"
73 #include "src/core/lib/transport/connectivity_state.h"
74 #include "src/core/lib/transport/error_utils.h"
75 #include "src/core/lib/transport/metadata.h"
76 #include "src/core/lib/transport/metadata_batch.h"
77 #include "src/core/lib/transport/static_metadata.h"
78 #include "src/core/lib/transport/status_metadata.h"
79
80 //
81 // Client channel filter
82 //
83
84 // By default, we buffer 256 KiB per RPC for retries.
85 // TODO(roth): Do we have any data to suggest a better value?
86 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
87
88 // This value was picked arbitrarily. It can be changed if there is
89 // any even moderately compelling reason to do so.
90 #define RETRY_BACKOFF_JITTER 0.2
91
92 // Max number of batches that can be pending on a call at any given
93 // time. This includes one batch for each of the following ops:
94 // recv_initial_metadata
95 // send_initial_metadata
96 // recv_message
97 // send_message
98 // recv_trailing_metadata
99 // send_trailing_metadata
100 #define MAX_PENDING_BATCHES 6
101
102 // Channel arg containing a pointer to the ChannelData object.
103 #define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
104
105 // Channel arg containing a pointer to the RetryThrottleData object.
106 #define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
107
108 namespace grpc_core {
109
110 using internal::ClientChannelGlobalParsedConfig;
111 using internal::ClientChannelMethodParsedConfig;
112 using internal::ClientChannelServiceConfigParser;
113 using internal::ServerRetryThrottleData;
114
115 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
116 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
117
118 namespace {
119
120 //
121 // ChannelData definition
122 //
123
124 class LoadBalancedCall;
125
126 class ChannelData {
127 public:
128 struct ResolverQueuedCall {
129 grpc_call_element* elem;
130 ResolverQueuedCall* next = nullptr;
131 };
132 struct LbQueuedCall {
133 LoadBalancedCall* lb_call;
134 LbQueuedCall* next = nullptr;
135 };
136
137 static grpc_error* Init(grpc_channel_element* elem,
138 grpc_channel_element_args* args);
139 static void Destroy(grpc_channel_element* elem);
140 static void StartTransportOp(grpc_channel_element* elem,
141 grpc_transport_op* op);
142 static void GetChannelInfo(grpc_channel_element* elem,
143 const grpc_channel_info* info);
144
deadline_checking_enabled() const145 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const146 bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const147 size_t per_rpc_retry_buffer_size() const {
148 return per_rpc_retry_buffer_size_;
149 }
owning_stack() const150 grpc_channel_stack* owning_stack() const { return owning_stack_; }
151
152 // Note: Does NOT return a new ref.
disconnect_error() const153 grpc_error* disconnect_error() const {
154 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
155 }
156
resolution_mu() const157 Mutex* resolution_mu() const { return &resolution_mu_; }
158 // These methods all require holding resolution_mu_.
159 void AddResolverQueuedCall(ResolverQueuedCall* call,
160 grpc_polling_entity* pollent);
161 void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
162 grpc_polling_entity* pollent);
received_service_config_data() const163 bool received_service_config_data() const {
164 return received_service_config_data_;
165 }
resolver_transient_failure_error() const166 grpc_error* resolver_transient_failure_error() const {
167 return resolver_transient_failure_error_;
168 }
service_config() const169 RefCountedPtr<ServiceConfig> service_config() const {
170 return service_config_;
171 }
config_selector() const172 ConfigSelector* config_selector() const { return config_selector_.get(); }
dynamic_filters() const173 RefCountedPtr<DynamicFilters> dynamic_filters() const {
174 return dynamic_filters_;
175 }
176
data_plane_mu() const177 Mutex* data_plane_mu() const { return &data_plane_mu_; }
178 // These methods all require holding data_plane_mu_.
picker() const179 LoadBalancingPolicy::SubchannelPicker* picker() const {
180 return picker_.get();
181 }
182 void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent);
183 void RemoveLbQueuedCall(LbQueuedCall* to_remove,
184 grpc_polling_entity* pollent);
185 RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
186 SubchannelInterface* subchannel) const;
187
work_serializer() const188 WorkSerializer* work_serializer() const { return work_serializer_.get(); }
189
190 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
191
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)192 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
193 grpc_connectivity_state* state,
194 grpc_closure* on_complete,
195 grpc_closure* watcher_timer_init) {
196 new ExternalConnectivityWatcher(this, pollent, state, on_complete,
197 watcher_timer_init);
198 }
199
RemoveExternalConnectivityWatcher(grpc_closure * on_complete,bool cancel)200 void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
201 bool cancel) {
202 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
203 this, on_complete, cancel);
204 }
205
NumExternalConnectivityWatchers() const206 int NumExternalConnectivityWatchers() const {
207 MutexLock lock(&external_watchers_mu_);
208 return static_cast<int>(external_watchers_.size());
209 }
210
211 void AddConnectivityWatcher(
212 grpc_connectivity_state initial_state,
213 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
214 void RemoveConnectivityWatcher(
215 AsyncConnectivityStateWatcherInterface* watcher);
216
217 private:
218 class SubchannelWrapper;
219 class ClientChannelControlHelper;
220 class ConnectivityWatcherAdder;
221 class ConnectivityWatcherRemover;
222
223 // Represents a pending connectivity callback from an external caller
224 // via grpc_client_channel_watch_connectivity_state().
225 class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
226 public:
227 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
228 grpc_connectivity_state* state,
229 grpc_closure* on_complete,
230 grpc_closure* watcher_timer_init);
231
232 ~ExternalConnectivityWatcher() override;
233
234 // Removes the watcher from the external_watchers_ map.
235 static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
236 grpc_closure* on_complete,
237 bool cancel);
238
239 void Notify(grpc_connectivity_state state,
240 const absl::Status& /* status */) override;
241
242 void Cancel();
243
244 private:
245 // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
246 // from Start().
247 void AddWatcherLocked();
248 void RemoveWatcherLocked();
249
250 ChannelData* chand_;
251 grpc_polling_entity pollent_;
252 grpc_connectivity_state initial_state_;
253 grpc_connectivity_state* state_;
254 grpc_closure* on_complete_;
255 grpc_closure* watcher_timer_init_;
256 Atomic<bool> done_{false};
257 };
258
259 class ResolverResultHandler : public Resolver::ResultHandler {
260 public:
ResolverResultHandler(ChannelData * chand)261 explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) {
262 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
263 }
264
~ResolverResultHandler()265 ~ResolverResultHandler() override {
266 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
267 gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
268 }
269 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
270 }
271
ReturnResult(Resolver::Result result)272 void ReturnResult(Resolver::Result result) override {
273 chand_->OnResolverResultChangedLocked(std::move(result));
274 }
275
ReturnError(grpc_error * error)276 void ReturnError(grpc_error* error) override {
277 chand_->OnResolverErrorLocked(error);
278 }
279
280 private:
281 ChannelData* chand_;
282 };
283
284 ChannelData(grpc_channel_element_args* args, grpc_error** error);
285 ~ChannelData();
286
287 // Note: All methods with "Locked" suffix must be invoked from within
288 // work_serializer_.
289
290 void OnResolverResultChangedLocked(Resolver::Result result);
291 void OnResolverErrorLocked(grpc_error* error);
292
293 void CreateOrUpdateLbPolicyLocked(
294 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
295 Resolver::Result result);
296 OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
297 const grpc_channel_args& args);
298
299 void UpdateStateAndPickerLocked(
300 grpc_connectivity_state state, const absl::Status& status,
301 const char* reason,
302 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
303
304 void UpdateServiceConfigInControlPlaneLocked(
305 RefCountedPtr<ServiceConfig> service_config,
306 RefCountedPtr<ConfigSelector> config_selector,
307 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
308 const char* lb_policy_name);
309
310 void UpdateServiceConfigInDataPlaneLocked();
311
312 void CreateResolverLocked();
313 void DestroyResolverAndLbPolicyLocked();
314
315 grpc_error* DoPingLocked(grpc_transport_op* op);
316
317 void StartTransportOpLocked(grpc_transport_op* op);
318
319 void TryToConnectLocked();
320
321 //
322 // Fields set at construction and never modified.
323 //
324 const bool deadline_checking_enabled_;
325 const bool enable_retries_;
326 const size_t per_rpc_retry_buffer_size_;
327 grpc_channel_stack* owning_stack_;
328 ClientChannelFactory* client_channel_factory_;
329 const grpc_channel_args* channel_args_;
330 RefCountedPtr<ServiceConfig> default_service_config_;
331 std::string server_name_;
332 UniquePtr<char> target_uri_;
333 channelz::ChannelNode* channelz_node_;
334
335 //
336 // Fields related to name resolution. Guarded by resolution_mu_.
337 //
338 mutable Mutex resolution_mu_;
339 // Linked list of calls queued waiting for resolver result.
340 ResolverQueuedCall* resolver_queued_calls_ = nullptr;
341 // Data from service config.
342 grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
343 bool received_service_config_data_ = false;
344 RefCountedPtr<ServiceConfig> service_config_;
345 RefCountedPtr<ConfigSelector> config_selector_;
346 RefCountedPtr<DynamicFilters> dynamic_filters_;
347
348 //
349 // Fields used in the data plane. Guarded by data_plane_mu_.
350 //
351 mutable Mutex data_plane_mu_;
352 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_;
353 // Linked list of calls queued waiting for LB pick.
354 LbQueuedCall* lb_queued_calls_ = nullptr;
355
356 //
357 // Fields used in the control plane. Guarded by work_serializer.
358 //
359 std::shared_ptr<WorkSerializer> work_serializer_;
360 grpc_pollset_set* interested_parties_;
361 ConnectivityStateTracker state_tracker_;
362 OrphanablePtr<Resolver> resolver_;
363 bool previous_resolution_contained_addresses_ = false;
364 RefCountedPtr<ServiceConfig> saved_service_config_;
365 RefCountedPtr<ConfigSelector> saved_config_selector_;
366 absl::optional<std::string> health_check_service_name_;
367 OrphanablePtr<LoadBalancingPolicy> lb_policy_;
368 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
369 // The number of SubchannelWrapper instances referencing a given Subchannel.
370 std::map<Subchannel*, int> subchannel_refcount_map_;
371 // The set of SubchannelWrappers that currently exist.
372 // No need to hold a ref, since the map is updated in the control-plane
373 // work_serializer when the SubchannelWrappers are created and destroyed.
374 std::set<SubchannelWrapper*> subchannel_wrappers_;
375 // Pending ConnectedSubchannel updates for each SubchannelWrapper.
376 // Updates are queued here in the control plane work_serializer and then
377 // applied in the data plane mutex when the picker is updated.
378 std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
379 pending_subchannel_updates_;
380 int keepalive_time_ = -1;
381
382 //
383 // Fields accessed from both data plane mutex and control plane
384 // work_serializer.
385 //
386 Atomic<grpc_error*> disconnect_error_;
387
388 //
389 // Fields guarded by a mutex, since they need to be accessed
390 // synchronously via get_channel_info().
391 //
392 gpr_mu info_mu_;
393 UniquePtr<char> info_lb_policy_name_;
394 UniquePtr<char> info_service_config_json_;
395
396 //
397 // Fields guarded by a mutex, since they need to be accessed
398 // synchronously via grpc_channel_num_external_connectivity_watchers().
399 //
400 mutable Mutex external_watchers_mu_;
401 std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
402 external_watchers_;
403 };
404
405 //
406 // CallData definition
407 //
408
409 class CallData {
410 public:
411 static grpc_error* Init(grpc_call_element* elem,
412 const grpc_call_element_args* args);
413 static void Destroy(grpc_call_element* elem,
414 const grpc_call_final_info* final_info,
415 grpc_closure* then_schedule_closure);
416 static void StartTransportStreamOpBatch(
417 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
418 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
419
420 // Invoked by channel for queued calls when name resolution is completed.
421 static void CheckResolution(void* arg, grpc_error* error);
422 // Helper function for applying the service config to a call while
423 // holding ChannelData::resolution_mu_.
424 // Returns true if the service config has been applied to the call, in which
425 // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
426 // with the returned error.
427 bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error);
428 // Schedules a callback to continue processing the call once
429 // resolution is complete. The callback will not run until after this
430 // method returns.
431 void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error);
432
433 private:
434 class ResolverQueuedCallCanceller;
435
436 CallData(grpc_call_element* elem, const ChannelData& chand,
437 const grpc_call_element_args& args);
438 ~CallData();
439
440 // Returns the index into pending_batches_ to be used for batch.
441 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
442 void PendingBatchesAdd(grpc_call_element* elem,
443 grpc_transport_stream_op_batch* batch);
444 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
445 // A predicate type and some useful implementations for PendingBatchesFail().
446 typedef bool (*YieldCallCombinerPredicate)(
447 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)448 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
449 return true;
450 }
NoYieldCallCombiner(const CallCombinerClosureList &)451 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
452 return false;
453 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)454 static bool YieldCallCombinerIfPendingBatchesFound(
455 const CallCombinerClosureList& closures) {
456 return closures.size() > 0;
457 }
458 // Fails all pending batches.
459 // If yield_call_combiner_predicate returns true, assumes responsibility for
460 // yielding the call combiner.
461 void PendingBatchesFail(
462 grpc_call_element* elem, grpc_error* error,
463 YieldCallCombinerPredicate yield_call_combiner_predicate);
464 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
465 // Resumes all pending batches on lb_call_.
466 void PendingBatchesResume(grpc_call_element* elem);
467
468 // Applies service config to the call. Must be invoked once we know
469 // that the resolver has returned results to the channel.
470 // If an error is returned, the error indicates the status with which
471 // the call should be failed.
472 grpc_error* ApplyServiceConfigToCallLocked(
473 grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
474 // Invoked when the resolver result is applied to the caller, on both
475 // success or failure.
476 static void ResolutionDone(void* arg, grpc_error* error);
477 // Removes the call (if present) from the channel's list of calls queued
478 // for name resolution.
479 void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem);
480 // Adds the call (if not already present) to the channel's list of
481 // calls queued for name resolution.
482 void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem);
483
484 static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
485 void* arg, grpc_error* error);
486 void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
487 grpc_transport_stream_op_batch* batch);
488
489 void CreateDynamicCall(grpc_call_element* elem);
490
491 // State for handling deadlines.
492 // The code in deadline_filter.c requires this to be the first field.
493 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
494 // and this struct both independently store pointers to the call stack
495 // and call combiner. If/when we have time, find a way to avoid this
496 // without breaking the grpc_deadline_state abstraction.
497 grpc_deadline_state deadline_state_;
498
499 grpc_slice path_; // Request path.
500 gpr_cycle_counter call_start_time_;
501 grpc_millis deadline_;
502 Arena* arena_;
503 grpc_call_stack* owning_call_;
504 CallCombiner* call_combiner_;
505 grpc_call_context_element* call_context_;
506
507 grpc_polling_entity* pollent_ = nullptr;
508
509 grpc_closure pick_closure_;
510
511 // Accessed while holding ChannelData::resolution_mu_.
512 bool service_config_applied_ = false;
513 bool queued_pending_resolver_result_ = false;
514 ChannelData::ResolverQueuedCall resolver_queued_call_;
515 ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
516
517 std::function<void()> on_call_committed_;
518
519 grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
520 grpc_closure recv_initial_metadata_ready_;
521
522 RefCountedPtr<DynamicFilters> dynamic_filters_;
523 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
524
525 // Batches are added to this list when received from above.
526 // They are removed when we are done handling the batch (i.e., when
527 // either we have invoked all of the batch's callbacks or we have
528 // passed the batch down to the LB call and are not intercepting any of
529 // its callbacks).
530 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
531
532 // Set when we get a cancel_stream op.
533 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
534 };
535
536 //
537 // RetryingCall definition
538 //
539
540 class RetryingCall {
541 public:
542 RetryingCall(
543 ChannelData* chand, const grpc_call_element_args& args,
544 grpc_polling_entity* pollent,
545 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
546 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy);
547 ~RetryingCall();
548
549 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
550
551 RefCountedPtr<SubchannelCall> subchannel_call() const;
552
553 private:
554 // State used for starting a retryable batch on a subchannel call.
555 // This provides its own grpc_transport_stream_op_batch and other data
556 // structures needed to populate the ops in the batch.
557 // We allocate one struct on the arena for each attempt at starting a
558 // batch on a given subchannel call.
559 struct SubchannelCallBatchData {
560 // Creates a SubchannelCallBatchData object on the call's arena with the
561 // specified refcount. If set_on_complete is true, the batch's
562 // on_complete callback will be set to point to on_complete();
563 // otherwise, the batch's on_complete callback will be null.
564 static SubchannelCallBatchData* Create(RetryingCall* call, int refcount,
565 bool set_on_complete);
566
Unrefgrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallBatchData567 void Unref() {
568 if (gpr_unref(&refs)) Destroy();
569 }
570
571 SubchannelCallBatchData(RetryingCall* call, int refcount,
572 bool set_on_complete);
573 // All dtor code must be added in `Destroy()`. This is because we may
574 // call closures in `SubchannelCallBatchData` after they are unrefed by
575 // `Unref()`, and msan would complain about accessing this class
576 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
577 // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallBatchData578 ~SubchannelCallBatchData() { Destroy(); }
579 void Destroy();
580
581 gpr_refcount refs;
582 grpc_call_element* elem;
583 RetryingCall* call;
584 RefCountedPtr<LoadBalancedCall> lb_call;
585 // The batch to use in the subchannel call.
586 // Its payload field points to SubchannelCallRetryState::batch_payload.
587 grpc_transport_stream_op_batch batch;
588 // For intercepting on_complete.
589 grpc_closure on_complete;
590 };
591
592 // Retry state associated with a subchannel call.
593 // Stored in the parent_data of the subchannel call object.
594 struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallRetryState595 explicit SubchannelCallRetryState(grpc_call_context_element* context)
596 : batch_payload(context),
597 started_send_initial_metadata(false),
598 completed_send_initial_metadata(false),
599 started_send_trailing_metadata(false),
600 completed_send_trailing_metadata(false),
601 started_recv_initial_metadata(false),
602 completed_recv_initial_metadata(false),
603 started_recv_trailing_metadata(false),
604 completed_recv_trailing_metadata(false),
605 retry_dispatched(false) {}
606
607 // SubchannelCallBatchData.batch.payload points to this.
608 grpc_transport_stream_op_batch_payload batch_payload;
609 // For send_initial_metadata.
610 // Note that we need to make a copy of the initial metadata for each
611 // subchannel call instead of just referring to the copy in call_data,
612 // because filters in the subchannel stack will probably add entries,
613 // so we need to start in a pristine state for each attempt of the call.
614 grpc_linked_mdelem* send_initial_metadata_storage;
615 grpc_metadata_batch send_initial_metadata;
616 // For send_message.
617 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
618 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
619 // For send_trailing_metadata.
620 grpc_linked_mdelem* send_trailing_metadata_storage;
621 grpc_metadata_batch send_trailing_metadata;
622 // For intercepting recv_initial_metadata.
623 grpc_metadata_batch recv_initial_metadata;
624 grpc_closure recv_initial_metadata_ready;
625 bool trailing_metadata_available = false;
626 // For intercepting recv_message.
627 grpc_closure recv_message_ready;
628 OrphanablePtr<ByteStream> recv_message;
629 // For intercepting recv_trailing_metadata.
630 grpc_metadata_batch recv_trailing_metadata;
631 grpc_transport_stream_stats collect_stats;
632 grpc_closure recv_trailing_metadata_ready;
633 // These fields indicate which ops have been started and completed on
634 // this subchannel call.
635 size_t started_send_message_count = 0;
636 size_t completed_send_message_count = 0;
637 size_t started_recv_message_count = 0;
638 size_t completed_recv_message_count = 0;
639 bool started_send_initial_metadata : 1;
640 bool completed_send_initial_metadata : 1;
641 bool started_send_trailing_metadata : 1;
642 bool completed_send_trailing_metadata : 1;
643 bool started_recv_initial_metadata : 1;
644 bool completed_recv_initial_metadata : 1;
645 bool started_recv_trailing_metadata : 1;
646 bool completed_recv_trailing_metadata : 1;
647 // State for callback processing.
648 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
649 nullptr;
650 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
651 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
652 grpc_error* recv_message_error = GRPC_ERROR_NONE;
653 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
654 // NOTE: Do not move this next to the metadata bitfields above. That would
655 // save space but will also result in a data race because compiler
656 // will generate a 2 byte store which overwrites the meta-data
657 // fields upon setting this field.
658 bool retry_dispatched : 1;
659 };
660
661 // Pending batches stored in call data.
662 struct PendingBatch {
663 // The pending batch. If nullptr, this slot is empty.
664 grpc_transport_stream_op_batch* batch = nullptr;
665 // Indicates whether payload for send ops has been cached in CallData.
666 bool send_ops_cached = false;
667 };
668
669 // Caches data for send ops so that it can be retried later, if not
670 // already cached.
671 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
672 void FreeCachedSendInitialMetadata();
673 // Frees cached send_message at index idx.
674 void FreeCachedSendMessage(size_t idx);
675 void FreeCachedSendTrailingMetadata();
676 // Frees cached send ops that have already been completed after
677 // committing the call.
678 void FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState* retry_state);
679 // Frees cached send ops that were completed by the completed batch in
680 // batch_data. Used when batches are completed after the call is committed.
681 void FreeCachedSendOpDataForCompletedBatch(
682 SubchannelCallBatchData* batch_data,
683 SubchannelCallRetryState* retry_state);
684
685 // Returns the index into pending_batches_ to be used for batch.
686 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
687 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
688 void PendingBatchClear(PendingBatch* pending);
689 void MaybeClearPendingBatch(PendingBatch* pending);
690 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
691 // A predicate type and some useful implementations for PendingBatchesFail().
692 typedef bool (*YieldCallCombinerPredicate)(
693 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)694 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
695 return true;
696 }
NoYieldCallCombiner(const CallCombinerClosureList &)697 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
698 return false;
699 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)700 static bool YieldCallCombinerIfPendingBatchesFound(
701 const CallCombinerClosureList& closures) {
702 return closures.size() > 0;
703 }
704 // Fails all pending batches.
705 // If yield_call_combiner_predicate returns true, assumes responsibility for
706 // yielding the call combiner.
707 void PendingBatchesFail(
708 grpc_error* error,
709 YieldCallCombinerPredicate yield_call_combiner_predicate);
710 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
711 // Resumes all pending batches on lb_call_.
712 void PendingBatchesResume();
713 // Returns a pointer to the first pending batch for which predicate(batch)
714 // returns true, or null if not found.
715 template <typename Predicate>
716 PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
717
718 // Commits the call so that no further retry attempts will be performed.
719 void RetryCommit(SubchannelCallRetryState* retry_state);
720 // Starts a retry after appropriate back-off.
721 void DoRetry(SubchannelCallRetryState* retry_state,
722 grpc_millis server_pushback_ms);
723 // Returns true if the call is being retried.
724 bool MaybeRetry(SubchannelCallBatchData* batch_data, grpc_status_code status,
725 grpc_mdelem* server_pushback_md);
726
727 // Invokes recv_initial_metadata_ready for a subchannel batch.
728 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
729 // Intercepts recv_initial_metadata_ready callback for retries.
730 // Commits the call and returns the initial metadata up the stack.
731 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
732
733 // Invokes recv_message_ready for a subchannel batch.
734 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
735 // Intercepts recv_message_ready callback for retries.
736 // Commits the call and returns the message up the stack.
737 static void RecvMessageReady(void* arg, grpc_error* error);
738
739 // Sets *status and *server_pushback_md based on md_batch and error.
740 // Only sets *server_pushback_md if server_pushback_md != nullptr.
741 void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
742 grpc_status_code* status,
743 grpc_mdelem** server_pushback_md);
744 // Adds recv_trailing_metadata_ready closure to closures.
745 void AddClosureForRecvTrailingMetadataReady(
746 SubchannelCallBatchData* batch_data, grpc_error* error,
747 CallCombinerClosureList* closures);
748 // Adds any necessary closures for deferred recv_initial_metadata and
749 // recv_message callbacks to closures.
750 static void AddClosuresForDeferredRecvCallbacks(
751 SubchannelCallBatchData* batch_data,
752 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
753 // Returns true if any op in the batch was not yet started.
754 // Only looks at send ops, since recv ops are always started immediately.
755 bool PendingBatchIsUnstarted(PendingBatch* pending,
756 SubchannelCallRetryState* retry_state);
757 // For any pending batch containing an op that has not yet been started,
758 // adds the pending batch's completion closures to closures.
759 void AddClosuresToFailUnstartedPendingBatches(
760 SubchannelCallRetryState* retry_state, grpc_error* error,
761 CallCombinerClosureList* closures);
762 // Runs necessary closures upon completion of a call attempt.
763 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
764 grpc_error* error);
765 // Intercepts recv_trailing_metadata_ready callback for retries.
766 // Commits the call and returns the trailing metadata up the stack.
767 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
768
769 // Adds the on_complete closure for the pending batch completed in
770 // batch_data to closures.
771 void AddClosuresForCompletedPendingBatch(SubchannelCallBatchData* batch_data,
772 grpc_error* error,
773 CallCombinerClosureList* closures);
774
775 // If there are any cached ops to replay or pending ops to start on the
776 // subchannel call, adds a closure to closures to invoke
777 // StartRetriableSubchannelBatches().
778 void AddClosuresForReplayOrPendingSendOps(
779 SubchannelCallBatchData* batch_data,
780 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
781
782 // Callback used to intercept on_complete from subchannel calls.
783 // Called only when retries are enabled.
784 static void OnComplete(void* arg, grpc_error* error);
785
786 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
787 // Adds a closure to closures that will execute batch in the call combiner.
788 void AddClosureForSubchannelBatch(grpc_transport_stream_op_batch* batch,
789 CallCombinerClosureList* closures);
790 // Adds retriable send_initial_metadata op to batch_data.
791 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
792 SubchannelCallBatchData* batch_data);
793 // Adds retriable send_message op to batch_data.
794 void AddRetriableSendMessageOp(SubchannelCallRetryState* retry_state,
795 SubchannelCallBatchData* batch_data);
796 // Adds retriable send_trailing_metadata op to batch_data.
797 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
798 SubchannelCallBatchData* batch_data);
799 // Adds retriable recv_initial_metadata op to batch_data.
800 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
801 SubchannelCallBatchData* batch_data);
802 // Adds retriable recv_message op to batch_data.
803 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
804 SubchannelCallBatchData* batch_data);
805 // Adds retriable recv_trailing_metadata op to batch_data.
806 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
807 SubchannelCallBatchData* batch_data);
808 // Helper function used to start a recv_trailing_metadata batch. This
809 // is used in the case where a recv_initial_metadata or recv_message
810 // op fails in a way that we know the call is over but when the application
811 // has not yet started its own recv_trailing_metadata op.
812 void StartInternalRecvTrailingMetadata();
813 // If there are any cached send ops that need to be replayed on the
814 // current subchannel call, creates and returns a new subchannel batch
815 // to replay those ops. Otherwise, returns nullptr.
816 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
817 SubchannelCallRetryState* retry_state);
818 // Adds subchannel batches for pending batches to closures.
819 void AddSubchannelBatchesForPendingBatches(
820 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
821 // Constructs and starts whatever subchannel batches are needed on the
822 // subchannel call.
823 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
824
825 static void CreateLbCall(void* arg, grpc_error* error);
826
827 ChannelData* chand_;
828 grpc_polling_entity* pollent_;
829 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
830 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
831 BackOff retry_backoff_;
832
833 grpc_slice path_; // Request path.
834 gpr_cycle_counter call_start_time_;
835 grpc_millis deadline_;
836 Arena* arena_;
837 grpc_call_stack* owning_call_;
838 CallCombiner* call_combiner_;
839 grpc_call_context_element* call_context_;
840
841 grpc_closure retry_closure_;
842
843 RefCountedPtr<LoadBalancedCall> lb_call_;
844
845 // Batches are added to this list when received from above.
846 // They are removed when we are done handling the batch (i.e., when
847 // either we have invoked all of the batch's callbacks or we have
848 // passed the batch down to the LB call and are not intercepting any of
849 // its callbacks).
850 // TODO(roth): Now that the retry code is split out into its own call
851 // object, revamp this to work in a cleaner way, since we no longer need
852 // for batches to ever wait for name resolution or LB picks.
853 PendingBatch pending_batches_[MAX_PENDING_BATCHES];
854 bool pending_send_initial_metadata_ : 1;
855 bool pending_send_message_ : 1;
856 bool pending_send_trailing_metadata_ : 1;
857
858 // Set when we get a cancel_stream op.
859 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
860
861 // Retry state.
862 bool enable_retries_ : 1;
863 bool retry_committed_ : 1;
864 bool last_attempt_got_server_pushback_ : 1;
865 int num_attempts_completed_ = 0;
866 size_t bytes_buffered_for_retry_ = 0;
867 grpc_timer retry_timer_;
868
869 // The number of pending retriable subchannel batches containing send ops.
870 // We hold a ref to the call stack while this is non-zero, since replay
871 // batches may not complete until after all callbacks have been returned
872 // to the surface, and we need to make sure that the call is not destroyed
873 // until all of these batches have completed.
874 // Note that we actually only need to track replay batches, but it's
875 // easier to track all batches with send ops.
876 int num_pending_retriable_subchannel_send_batches_ = 0;
877
878 // Cached data for retrying send ops.
879 // send_initial_metadata
880 bool seen_send_initial_metadata_ = false;
881 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
882 grpc_metadata_batch send_initial_metadata_;
883 uint32_t send_initial_metadata_flags_;
884 gpr_atm* peer_string_;
885 // send_message
886 // When we get a send_message op, we replace the original byte stream
887 // with a CachingByteStream that caches the slices to a local buffer for
888 // use in retries.
889 // Note: We inline the cache for the first 3 send_message ops and use
890 // dynamic allocation after that. This number was essentially picked
891 // at random; it could be changed in the future to tune performance.
892 absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
893 // send_trailing_metadata
894 bool seen_send_trailing_metadata_ = false;
895 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
896 grpc_metadata_batch send_trailing_metadata_;
897 };
898
899 //
900 // LoadBalancedCall definition
901 //
902
903 // This object is ref-counted, but it cannot inherit from RefCounted<>,
904 // because it is allocated on the arena and can't free its memory when
905 // its refcount goes to zero. So instead, it manually implements the
906 // same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
907 class LoadBalancedCall {
908 public:
909 static RefCountedPtr<LoadBalancedCall> Create(
910 ChannelData* chand, const grpc_call_element_args& args,
911 grpc_polling_entity* pollent, size_t parent_data_size);
912
913 LoadBalancedCall(ChannelData* chand, const grpc_call_element_args& args,
914 grpc_polling_entity* pollent);
915 ~LoadBalancedCall();
916
917 // Interface of RefCounted<>.
918 RefCountedPtr<LoadBalancedCall> Ref() GRPC_MUST_USE_RESULT;
919 RefCountedPtr<LoadBalancedCall> Ref(const DebugLocation& location,
920 const char* reason) GRPC_MUST_USE_RESULT;
921 // When refcount drops to 0, destroys itself and the associated call stack,
922 // but does NOT free the memory because it's in the call arena.
923 void Unref();
924 void Unref(const DebugLocation& location, const char* reason);
925
926 void* GetParentData();
927
928 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
929
930 // Invoked by channel for queued LB picks when the picker is updated.
931 static void PickSubchannel(void* arg, grpc_error* error);
932 // Helper function for performing an LB pick while holding the data plane
933 // mutex. Returns true if the pick is complete, in which case the caller
934 // must invoke PickDone() or AsyncPickDone() with the returned error.
935 bool PickSubchannelLocked(grpc_error** error);
936 // Schedules a callback to process the completed pick. The callback
937 // will not run until after this method returns.
938 void AsyncPickDone(grpc_error* error);
939
subchannel_call() const940 RefCountedPtr<SubchannelCall> subchannel_call() const {
941 return subchannel_call_;
942 }
943
944 private:
945 // Allow RefCountedPtr<> to access IncrementRefCount().
946 template <typename T>
947 friend class ::grpc_core::RefCountedPtr;
948
949 class LbQueuedCallCanceller;
950 class Metadata;
951 class LbCallState;
952
953 // Interface of RefCounted<>.
954 void IncrementRefCount();
955 void IncrementRefCount(const DebugLocation& location, const char* reason);
956
957 // Returns the index into pending_batches_ to be used for batch.
958 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
959 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
960 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
961 // A predicate type and some useful implementations for PendingBatchesFail().
962 typedef bool (*YieldCallCombinerPredicate)(
963 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)964 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
965 return true;
966 }
NoYieldCallCombiner(const CallCombinerClosureList &)967 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
968 return false;
969 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)970 static bool YieldCallCombinerIfPendingBatchesFound(
971 const CallCombinerClosureList& closures) {
972 return closures.size() > 0;
973 }
974 // Fails all pending batches.
975 // If yield_call_combiner_predicate returns true, assumes responsibility for
976 // yielding the call combiner.
977 void PendingBatchesFail(
978 grpc_error* error,
979 YieldCallCombinerPredicate yield_call_combiner_predicate);
980 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
981 // Resumes all pending batches on subchannel_call_.
982 void PendingBatchesResume();
983
984 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
985 void* arg, grpc_error* error);
986 void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
987 grpc_transport_stream_op_batch* batch);
988
989 void CreateSubchannelCall();
990 // Invoked when a pick is completed, on both success or failure.
991 static void PickDone(void* arg, grpc_error* error);
992 // Removes the call from the channel's list of queued picks if present.
993 void MaybeRemoveCallFromLbQueuedCallsLocked();
994 // Adds the call to the channel's list of queued picks if not already present.
995 void MaybeAddCallToLbQueuedCallsLocked();
996
997 RefCount refs_;
998
999 ChannelData* chand_;
1000
1001 // TODO(roth): Instead of duplicating these fields in every filter
1002 // that uses any one of them, we should store them in the call
1003 // context. This will save per-call memory overhead.
1004 grpc_slice path_; // Request path.
1005 gpr_cycle_counter call_start_time_;
1006 grpc_millis deadline_;
1007 Arena* arena_;
1008 grpc_call_stack* owning_call_;
1009 CallCombiner* call_combiner_;
1010 grpc_call_context_element* call_context_;
1011
1012 // Set when we get a cancel_stream op.
1013 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
1014
1015 grpc_polling_entity* pollent_ = nullptr;
1016
1017 grpc_closure pick_closure_;
1018
1019 // Accessed while holding ChannelData::data_plane_mu_.
1020 ChannelData::LbQueuedCall queued_call_;
1021 bool queued_pending_lb_pick_ = false;
1022 const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
1023 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1024 std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*,
1025 LoadBalancingPolicy::CallState*)>
1026 lb_recv_trailing_metadata_ready_;
1027 LbQueuedCallCanceller* lb_call_canceller_ = nullptr;
1028
1029 RefCountedPtr<SubchannelCall> subchannel_call_;
1030
1031 // For intercepting recv_trailing_metadata_ready for the LB policy.
1032 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
1033 grpc_closure recv_trailing_metadata_ready_;
1034 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
1035
1036 // Batches are added to this list when received from above.
1037 // They are removed when we are done handling the batch (i.e., when
1038 // either we have invoked all of the batch's callbacks or we have
1039 // passed the batch down to the subchannel call and are not
1040 // intercepting any of its callbacks).
1041 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
1042 };
1043
1044 //
1045 // dynamic termination filter
1046 //
1047
1048 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
ChannelDataArgCopy(void * p)1049 void* ChannelDataArgCopy(void* p) { return p; }
ChannelDataArgDestroy(void * p)1050 void ChannelDataArgDestroy(void* p) {}
ChannelDataArgCmp(void * p,void * q)1051 int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1052 const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
1053 ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
1054
1055 // Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
RetryThrottleDataArgCopy(void * p)1056 void* RetryThrottleDataArgCopy(void* p) {
1057 auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1058 retry_throttle_data->Ref().release();
1059 return p;
1060 }
RetryThrottleDataArgDestroy(void * p)1061 void RetryThrottleDataArgDestroy(void* p) {
1062 auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1063 retry_throttle_data->Unref();
1064 }
RetryThrottleDataArgCmp(void * p,void * q)1065 int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1066 const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
1067 RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
1068 RetryThrottleDataArgCmp};
1069
1070 class DynamicTerminationFilterChannelData {
1071 public:
1072 static grpc_error* Init(grpc_channel_element* elem,
1073 grpc_channel_element_args* args);
1074
Destroy(grpc_channel_element * elem)1075 static void Destroy(grpc_channel_element* elem) {
1076 auto* chand =
1077 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1078 chand->~DynamicTerminationFilterChannelData();
1079 }
1080
1081 // Will never be called.
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1082 static void StartTransportOp(grpc_channel_element* elem,
1083 grpc_transport_op* op) {}
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1084 static void GetChannelInfo(grpc_channel_element* elem,
1085 const grpc_channel_info* info) {}
1086
chand() const1087 ChannelData* chand() const { return chand_; }
retry_throttle_data() const1088 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
1089 return retry_throttle_data_;
1090 }
1091
1092 private:
GetRetryThrottleDataFromArgs(const grpc_channel_args * args)1093 static RefCountedPtr<ServerRetryThrottleData> GetRetryThrottleDataFromArgs(
1094 const grpc_channel_args* args) {
1095 auto* retry_throttle_data =
1096 grpc_channel_args_find_pointer<ServerRetryThrottleData>(
1097 args, GRPC_ARG_RETRY_THROTTLE_DATA);
1098 if (retry_throttle_data == nullptr) return nullptr;
1099 return retry_throttle_data->Ref();
1100 }
1101
DynamicTerminationFilterChannelData(const grpc_channel_args * args)1102 explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
1103 : chand_(grpc_channel_args_find_pointer<ChannelData>(
1104 args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
1105 retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
1106
1107 ChannelData* chand_;
1108 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
1109 };
1110
1111 class DynamicTerminationFilterCallData {
1112 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)1113 static grpc_error* Init(grpc_call_element* elem,
1114 const grpc_call_element_args* args) {
1115 new (elem->call_data) DynamicTerminationFilterCallData(*args);
1116 return GRPC_ERROR_NONE;
1117 }
1118
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)1119 static void Destroy(grpc_call_element* elem,
1120 const grpc_call_final_info* final_info,
1121 grpc_closure* then_schedule_closure) {
1122 auto* calld =
1123 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1124 auto* chand =
1125 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1126 RefCountedPtr<SubchannelCall> subchannel_call;
1127 if (chand->chand()->enable_retries()) {
1128 if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
1129 subchannel_call = calld->retrying_call_->subchannel_call();
1130 calld->retrying_call_->~RetryingCall();
1131 }
1132 } else {
1133 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
1134 subchannel_call = calld->lb_call_->subchannel_call();
1135 }
1136 }
1137 calld->~DynamicTerminationFilterCallData();
1138 if (GPR_LIKELY(subchannel_call != nullptr)) {
1139 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
1140 } else {
1141 // TODO(yashkt) : This can potentially be a Closure::Run
1142 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1143 }
1144 }
1145
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1146 static void StartTransportStreamOpBatch(
1147 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1148 auto* calld =
1149 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1150 auto* chand =
1151 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1152 if (chand->chand()->enable_retries()) {
1153 calld->retrying_call_->StartTransportStreamOpBatch(batch);
1154 } else {
1155 calld->lb_call_->StartTransportStreamOpBatch(batch);
1156 }
1157 }
1158
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1159 static void SetPollent(grpc_call_element* elem,
1160 grpc_polling_entity* pollent) {
1161 auto* calld =
1162 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1163 auto* chand =
1164 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1165 ChannelData* client_channel = chand->chand();
1166 grpc_call_element_args args = {
1167 calld->owning_call_, nullptr,
1168 calld->call_context_, calld->path_,
1169 calld->call_start_time_, calld->deadline_,
1170 calld->arena_, calld->call_combiner_};
1171 if (client_channel->enable_retries()) {
1172 // Get retry settings from service config.
1173 auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1174 calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1175 GPR_ASSERT(svc_cfg_call_data != nullptr);
1176 auto* method_config = static_cast<const ClientChannelMethodParsedConfig*>(
1177 svc_cfg_call_data->GetMethodParsedConfig(
1178 ClientChannelServiceConfigParser::ParserIndex()));
1179 // Create retrying call.
1180 calld->retrying_call_ = calld->arena_->New<RetryingCall>(
1181 client_channel, args, pollent, chand->retry_throttle_data(),
1182 method_config == nullptr ? nullptr : method_config->retry_policy());
1183 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184 gpr_log(
1185 GPR_INFO,
1186 "chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
1187 client_channel, calld, calld->retrying_call_);
1188 }
1189 } else {
1190 calld->lb_call_ =
1191 LoadBalancedCall::Create(client_channel, args, pollent, 0);
1192 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1193 gpr_log(GPR_INFO,
1194 "chand=%p dynamic_termination_calld=%p: create lb_call=%p",
1195 chand, client_channel, calld->lb_call_.get());
1196 }
1197 }
1198 }
1199
1200 private:
DynamicTerminationFilterCallData(const grpc_call_element_args & args)1201 explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
1202 : path_(grpc_slice_ref_internal(args.path)),
1203 call_start_time_(args.start_time),
1204 deadline_(args.deadline),
1205 arena_(args.arena),
1206 owning_call_(args.call_stack),
1207 call_combiner_(args.call_combiner),
1208 call_context_(args.context) {}
1209
~DynamicTerminationFilterCallData()1210 ~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
1211
1212 grpc_slice path_; // Request path.
1213 gpr_cycle_counter call_start_time_;
1214 grpc_millis deadline_;
1215 Arena* arena_;
1216 grpc_call_stack* owning_call_;
1217 CallCombiner* call_combiner_;
1218 grpc_call_context_element* call_context_;
1219
1220 RetryingCall* retrying_call_ = nullptr;
1221 RefCountedPtr<LoadBalancedCall> lb_call_;
1222 };
1223
1224 const grpc_channel_filter kDynamicTerminationFilterVtable = {
1225 DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
1226 DynamicTerminationFilterChannelData::StartTransportOp,
1227 sizeof(DynamicTerminationFilterCallData),
1228 DynamicTerminationFilterCallData::Init,
1229 DynamicTerminationFilterCallData::SetPollent,
1230 DynamicTerminationFilterCallData::Destroy,
1231 sizeof(DynamicTerminationFilterChannelData),
1232 DynamicTerminationFilterChannelData::Init,
1233 DynamicTerminationFilterChannelData::Destroy,
1234 DynamicTerminationFilterChannelData::GetChannelInfo,
1235 "dynamic_filter_termination",
1236 };
1237
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1238 grpc_error* DynamicTerminationFilterChannelData::Init(
1239 grpc_channel_element* elem, grpc_channel_element_args* args) {
1240 GPR_ASSERT(args->is_last);
1241 GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
1242 new (elem->channel_data)
1243 DynamicTerminationFilterChannelData(args->channel_args);
1244 return GRPC_ERROR_NONE;
1245 }
1246
1247 //
1248 // ChannelData::SubchannelWrapper
1249 //
1250
1251 // This class is a wrapper for Subchannel that hides details of the
1252 // channel's implementation (such as the health check service name and
1253 // connected subchannel) from the LB policy API.
1254 //
1255 // Note that no synchronization is needed here, because even if the
1256 // underlying subchannel is shared between channels, this wrapper will only
1257 // be used within one channel, so it will always be synchronized by the
1258 // control plane work_serializer.
1259 class ChannelData::SubchannelWrapper : public SubchannelInterface {
1260 public:
SubchannelWrapper(ChannelData * chand,Subchannel * subchannel,absl::optional<std::string> health_check_service_name)1261 SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
1262 absl::optional<std::string> health_check_service_name)
1263 : SubchannelInterface(
1264 GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
1265 ? "SubchannelWrapper"
1266 : nullptr),
1267 chand_(chand),
1268 subchannel_(subchannel),
1269 health_check_service_name_(std::move(health_check_service_name)) {
1270 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1271 gpr_log(GPR_INFO,
1272 "chand=%p: creating subchannel wrapper %p for subchannel %p",
1273 chand, this, subchannel_);
1274 }
1275 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
1276 auto* subchannel_node = subchannel_->channelz_node();
1277 if (subchannel_node != nullptr) {
1278 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1279 if (it == chand_->subchannel_refcount_map_.end()) {
1280 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
1281 it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
1282 }
1283 ++it->second;
1284 }
1285 chand_->subchannel_wrappers_.insert(this);
1286 }
1287
~SubchannelWrapper()1288 ~SubchannelWrapper() override {
1289 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1290 gpr_log(GPR_INFO,
1291 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
1292 chand_, this, subchannel_);
1293 }
1294 chand_->subchannel_wrappers_.erase(this);
1295 auto* subchannel_node = subchannel_->channelz_node();
1296 if (subchannel_node != nullptr) {
1297 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1298 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
1299 --it->second;
1300 if (it->second == 0) {
1301 chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
1302 chand_->subchannel_refcount_map_.erase(it);
1303 }
1304 }
1305 GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1306 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
1307 }
1308
CheckConnectivityState()1309 grpc_connectivity_state CheckConnectivityState() override {
1310 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
1311 grpc_connectivity_state connectivity_state =
1312 subchannel_->CheckConnectivityState(health_check_service_name_,
1313 &connected_subchannel);
1314 MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
1315 return connectivity_state;
1316 }
1317
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1318 void WatchConnectivityState(
1319 grpc_connectivity_state initial_state,
1320 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
1321 auto& watcher_wrapper = watcher_map_[watcher.get()];
1322 GPR_ASSERT(watcher_wrapper == nullptr);
1323 watcher_wrapper = new WatcherWrapper(std::move(watcher),
1324 Ref(DEBUG_LOCATION, "WatcherWrapper"),
1325 initial_state);
1326 subchannel_->WatchConnectivityState(
1327 initial_state, health_check_service_name_,
1328 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1329 watcher_wrapper));
1330 }
1331
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1332 void CancelConnectivityStateWatch(
1333 ConnectivityStateWatcherInterface* watcher) override {
1334 auto it = watcher_map_.find(watcher);
1335 GPR_ASSERT(it != watcher_map_.end());
1336 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1337 it->second);
1338 watcher_map_.erase(it);
1339 }
1340
AttemptToConnect()1341 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1342
ResetBackoff()1343 void ResetBackoff() override { subchannel_->ResetBackoff(); }
1344
channel_args()1345 const grpc_channel_args* channel_args() override {
1346 return subchannel_->channel_args();
1347 }
1348
ThrottleKeepaliveTime(int new_keepalive_time)1349 void ThrottleKeepaliveTime(int new_keepalive_time) {
1350 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
1351 }
1352
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)1353 void UpdateHealthCheckServiceName(
1354 absl::optional<std::string> health_check_service_name) {
1355 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1356 gpr_log(GPR_INFO,
1357 "chand=%p: subchannel wrapper %p: updating health check service "
1358 "name from \"%s\" to \"%s\"",
1359 chand_, this, health_check_service_name_->c_str(),
1360 health_check_service_name->c_str());
1361 }
1362 for (auto& p : watcher_map_) {
1363 WatcherWrapper*& watcher_wrapper = p.second;
1364 // Cancel the current watcher and create a new one using the new
1365 // health check service name.
1366 // TODO(roth): If there is not already an existing health watch
1367 // call for the new name, then the watcher will initially report
1368 // state CONNECTING. If the LB policy is currently reporting
1369 // state READY, this may cause it to switch to CONNECTING before
1370 // switching back to READY. This could cause a small delay for
1371 // RPCs being started on the channel. If/when this becomes a
1372 // problem, we may be able to handle it by waiting for the new
1373 // watcher to report READY before we use it to replace the old one.
1374 WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
1375 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1376 watcher_wrapper);
1377 watcher_wrapper = replacement;
1378 subchannel_->WatchConnectivityState(
1379 replacement->last_seen_state(), health_check_service_name,
1380 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1381 replacement));
1382 }
1383 // Save the new health check service name.
1384 health_check_service_name_ = std::move(health_check_service_name);
1385 }
1386
1387 // Caller must be holding the control-plane work_serializer.
connected_subchannel() const1388 ConnectedSubchannel* connected_subchannel() const {
1389 return connected_subchannel_.get();
1390 }
1391
1392 // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const1393 ConnectedSubchannel* connected_subchannel_in_data_plane() const {
1394 return connected_subchannel_in_data_plane_.get();
1395 }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1396 void set_connected_subchannel_in_data_plane(
1397 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1398 connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
1399 }
1400
1401 private:
1402 // Subchannel and SubchannelInterface have different interfaces for
1403 // their respective ConnectivityStateWatcherInterface classes.
1404 // The one in Subchannel updates the ConnectedSubchannel along with
1405 // the state, whereas the one in SubchannelInterface does not expose
1406 // the ConnectedSubchannel.
1407 //
1408 // This wrapper provides a bridge between the two. It implements
1409 // Subchannel::ConnectivityStateWatcherInterface and wraps
1410 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
1411 // that was passed in by the LB policy. We pass an instance of this
1412 // class to the underlying Subchannel, and when we get updates from
1413 // the subchannel, we pass those on to the wrapped watcher to return
1414 // the update to the LB policy. This allows us to set the connected
1415 // subchannel before passing the result back to the LB policy.
1416 class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
1417 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)1418 WatcherWrapper(
1419 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1420 watcher,
1421 RefCountedPtr<SubchannelWrapper> parent,
1422 grpc_connectivity_state initial_state)
1423 : watcher_(std::move(watcher)),
1424 parent_(std::move(parent)),
1425 last_seen_state_(initial_state) {}
1426
~WatcherWrapper()1427 ~WatcherWrapper() override {
1428 auto* parent = parent_.release(); // ref owned by lambda
1429 parent->chand_->work_serializer_->Run(
1430 [parent]() { parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); },
1431 DEBUG_LOCATION);
1432 }
1433
OnConnectivityStateChange()1434 void OnConnectivityStateChange() override {
1435 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1436 gpr_log(GPR_INFO,
1437 "chand=%p: connectivity change for subchannel wrapper %p "
1438 "subchannel %p; hopping into work_serializer",
1439 parent_->chand_, parent_.get(), parent_->subchannel_);
1440 }
1441 Ref().release(); // ref owned by lambda
1442 parent_->chand_->work_serializer_->Run(
1443 [this]() {
1444 ApplyUpdateInControlPlaneWorkSerializer();
1445 Unref();
1446 },
1447 DEBUG_LOCATION);
1448 }
1449
interested_parties()1450 grpc_pollset_set* interested_parties() override {
1451 SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1452 watcher_.get();
1453 if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1454 return watcher->interested_parties();
1455 }
1456
MakeReplacement()1457 WatcherWrapper* MakeReplacement() {
1458 auto* replacement =
1459 new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
1460 replacement_ = replacement;
1461 return replacement;
1462 }
1463
last_seen_state() const1464 grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1465
1466 private:
ApplyUpdateInControlPlaneWorkSerializer()1467 void ApplyUpdateInControlPlaneWorkSerializer() {
1468 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1469 gpr_log(GPR_INFO,
1470 "chand=%p: processing connectivity change in work serializer "
1471 "for subchannel wrapper %p subchannel %p "
1472 "watcher=%p",
1473 parent_->chand_, parent_.get(), parent_->subchannel_,
1474 watcher_.get());
1475 }
1476 ConnectivityStateChange state_change = PopConnectivityStateChange();
1477 absl::optional<absl::Cord> keepalive_throttling =
1478 state_change.status.GetPayload(kKeepaliveThrottlingKey);
1479 if (keepalive_throttling.has_value()) {
1480 int new_keepalive_time = -1;
1481 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
1482 &new_keepalive_time)) {
1483 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
1484 parent_->chand_->keepalive_time_ = new_keepalive_time;
1485 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1486 gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
1487 parent_->chand_, parent_->chand_->keepalive_time_);
1488 }
1489 // Propagate the new keepalive time to all subchannels. This is so
1490 // that new transports created by any subchannel (and not just the
1491 // subchannel that received the GOAWAY), use the new keepalive time.
1492 for (auto* subchannel_wrapper :
1493 parent_->chand_->subchannel_wrappers_) {
1494 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
1495 }
1496 }
1497 } else {
1498 gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
1499 parent_->chand_,
1500 std::string(keepalive_throttling.value()).c_str());
1501 }
1502 }
1503 // Ignore update if the parent WatcherWrapper has been replaced
1504 // since this callback was scheduled.
1505 if (watcher_ != nullptr) {
1506 last_seen_state_ = state_change.state;
1507 parent_->MaybeUpdateConnectedSubchannel(
1508 std::move(state_change.connected_subchannel));
1509 watcher_->OnConnectivityStateChange(state_change.state);
1510 }
1511 }
1512
1513 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1514 watcher_;
1515 RefCountedPtr<SubchannelWrapper> parent_;
1516 grpc_connectivity_state last_seen_state_;
1517 WatcherWrapper* replacement_ = nullptr;
1518 };
1519
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1520 void MaybeUpdateConnectedSubchannel(
1521 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1522 // Update the connected subchannel only if the channel is not shutting
1523 // down. This is because once the channel is shutting down, we
1524 // ignore picker updates from the LB policy, which means that
1525 // UpdateStateAndPickerLocked() will never process the entries
1526 // in chand_->pending_subchannel_updates_. So we don't want to add
1527 // entries there that will never be processed, since that would
1528 // leave dangling refs to the channel and prevent its destruction.
1529 grpc_error* disconnect_error = chand_->disconnect_error();
1530 if (disconnect_error != GRPC_ERROR_NONE) return;
1531 // Not shutting down, so do the update.
1532 if (connected_subchannel_ != connected_subchannel) {
1533 connected_subchannel_ = std::move(connected_subchannel);
1534 // Record the new connected subchannel so that it can be updated
1535 // in the data plane mutex the next time the picker is updated.
1536 chand_->pending_subchannel_updates_[Ref(
1537 DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1538 }
1539 }
1540
1541 ChannelData* chand_;
1542 Subchannel* subchannel_;
1543 absl::optional<std::string> health_check_service_name_;
1544 // Maps from the address of the watcher passed to us by the LB policy
1545 // to the address of the WrapperWatcher that we passed to the underlying
1546 // subchannel. This is needed so that when the LB policy calls
1547 // CancelConnectivityStateWatch() with its watcher, we know the
1548 // corresponding WrapperWatcher to cancel on the underlying subchannel.
1549 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1550 // To be accessed only in the control plane work_serializer.
1551 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1552 // To be accessed only in the data plane mutex.
1553 RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1554 };
1555
1556 //
1557 // ChannelData::ExternalConnectivityWatcher
1558 //
1559
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)1560 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1561 ChannelData* chand, grpc_polling_entity pollent,
1562 grpc_connectivity_state* state, grpc_closure* on_complete,
1563 grpc_closure* watcher_timer_init)
1564 : chand_(chand),
1565 pollent_(pollent),
1566 initial_state_(*state),
1567 state_(state),
1568 on_complete_(on_complete),
1569 watcher_timer_init_(watcher_timer_init) {
1570 grpc_polling_entity_add_to_pollset_set(&pollent_,
1571 chand_->interested_parties_);
1572 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1573 {
1574 MutexLock lock(&chand_->external_watchers_mu_);
1575 // Will be deleted when the watch is complete.
1576 GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
1577 // Store a ref to the watcher in the external_watchers_ map.
1578 chand->external_watchers_[on_complete] =
1579 Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
1580 }
1581 // Pass the ref from creating the object to Start().
1582 chand_->work_serializer_->Run(
1583 [this]() {
1584 // The ref is passed to AddWatcherLocked().
1585 AddWatcherLocked();
1586 },
1587 DEBUG_LOCATION);
1588 }
1589
~ExternalConnectivityWatcher()1590 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1591 grpc_polling_entity_del_from_pollset_set(&pollent_,
1592 chand_->interested_parties_);
1593 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1594 "ExternalConnectivityWatcher");
1595 }
1596
1597 void ChannelData::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ChannelData * chand,grpc_closure * on_complete,bool cancel)1598 RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
1599 grpc_closure* on_complete,
1600 bool cancel) {
1601 RefCountedPtr<ExternalConnectivityWatcher> watcher;
1602 {
1603 MutexLock lock(&chand->external_watchers_mu_);
1604 auto it = chand->external_watchers_.find(on_complete);
1605 if (it != chand->external_watchers_.end()) {
1606 watcher = std::move(it->second);
1607 chand->external_watchers_.erase(it);
1608 }
1609 }
1610 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
1611 // the mutex before calling it.
1612 if (watcher != nullptr && cancel) watcher->Cancel();
1613 }
1614
Notify(grpc_connectivity_state state,const absl::Status &)1615 void ChannelData::ExternalConnectivityWatcher::Notify(
1616 grpc_connectivity_state state, const absl::Status& /* status */) {
1617 bool done = false;
1618 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1619 MemoryOrder::RELAXED)) {
1620 return; // Already done.
1621 }
1622 // Remove external watcher.
1623 chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
1624 // Report new state to the user.
1625 *state_ = state;
1626 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
1627 // Hop back into the work_serializer to clean up.
1628 // Not needed in state SHUTDOWN, because the tracker will
1629 // automatically remove all watchers in that case.
1630 if (state != GRPC_CHANNEL_SHUTDOWN) {
1631 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1632 DEBUG_LOCATION);
1633 }
1634 }
1635
Cancel()1636 void ChannelData::ExternalConnectivityWatcher::Cancel() {
1637 bool done = false;
1638 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1639 MemoryOrder::RELAXED)) {
1640 return; // Already done.
1641 }
1642 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
1643 // Hop back into the work_serializer to clean up.
1644 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1645 DEBUG_LOCATION);
1646 }
1647
AddWatcherLocked()1648 void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
1649 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
1650 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1651 chand_->state_tracker_.AddWatcher(
1652 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1653 }
1654
RemoveWatcherLocked()1655 void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1656 chand_->state_tracker_.RemoveWatcher(this);
1657 }
1658
1659 //
1660 // ChannelData::ConnectivityWatcherAdder
1661 //
1662
1663 class ChannelData::ConnectivityWatcherAdder {
1664 public:
ConnectivityWatcherAdder(ChannelData * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1665 ConnectivityWatcherAdder(
1666 ChannelData* chand, grpc_connectivity_state initial_state,
1667 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1668 : chand_(chand),
1669 initial_state_(initial_state),
1670 watcher_(std::move(watcher)) {
1671 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1672 chand_->work_serializer_->Run([this]() { AddWatcherLocked(); },
1673 DEBUG_LOCATION);
1674 }
1675
1676 private:
AddWatcherLocked()1677 void AddWatcherLocked() {
1678 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1679 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1680 delete this;
1681 }
1682
1683 ChannelData* chand_;
1684 grpc_connectivity_state initial_state_;
1685 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1686 };
1687
1688 //
1689 // ChannelData::ConnectivityWatcherRemover
1690 //
1691
1692 class ChannelData::ConnectivityWatcherRemover {
1693 public:
ConnectivityWatcherRemover(ChannelData * chand,AsyncConnectivityStateWatcherInterface * watcher)1694 ConnectivityWatcherRemover(ChannelData* chand,
1695 AsyncConnectivityStateWatcherInterface* watcher)
1696 : chand_(chand), watcher_(watcher) {
1697 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1698 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1699 DEBUG_LOCATION);
1700 }
1701
1702 private:
RemoveWatcherLocked()1703 void RemoveWatcherLocked() {
1704 chand_->state_tracker_.RemoveWatcher(watcher_);
1705 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1706 "ConnectivityWatcherRemover");
1707 delete this;
1708 }
1709
1710 ChannelData* chand_;
1711 AsyncConnectivityStateWatcherInterface* watcher_;
1712 };
1713
1714 //
1715 // ChannelData::ClientChannelControlHelper
1716 //
1717
1718 class ChannelData::ClientChannelControlHelper
1719 : public LoadBalancingPolicy::ChannelControlHelper {
1720 public:
ClientChannelControlHelper(ChannelData * chand)1721 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1722 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1723 }
1724
~ClientChannelControlHelper()1725 ~ClientChannelControlHelper() override {
1726 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1727 "ClientChannelControlHelper");
1728 }
1729
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)1730 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1731 ServerAddress address, const grpc_channel_args& args) override {
1732 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
1733 // Determine health check service name.
1734 bool inhibit_health_checking = grpc_channel_arg_get_bool(
1735 grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1736 absl::optional<std::string> health_check_service_name;
1737 if (!inhibit_health_checking) {
1738 health_check_service_name = chand_->health_check_service_name_;
1739 }
1740 // Remove channel args that should not affect subchannel uniqueness.
1741 static const char* args_to_remove[] = {
1742 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1743 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1744 };
1745 // Add channel args needed for the subchannel.
1746 absl::InlinedVector<grpc_arg, 3> args_to_add = {
1747 Subchannel::CreateSubchannelAddressArg(&address.address()),
1748 SubchannelPoolInterface::CreateChannelArg(
1749 chand_->subchannel_pool_.get()),
1750 };
1751 if (address.args() != nullptr) {
1752 for (size_t j = 0; j < address.args()->num_args; ++j) {
1753 args_to_add.emplace_back(address.args()->args[j]);
1754 }
1755 }
1756 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1757 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
1758 args_to_add.data(), args_to_add.size());
1759 gpr_free(args_to_add[0].value.string);
1760 // Create subchannel.
1761 Subchannel* subchannel =
1762 chand_->client_channel_factory_->CreateSubchannel(new_args);
1763 grpc_channel_args_destroy(new_args);
1764 if (subchannel == nullptr) return nullptr;
1765 // Make sure the subchannel has updated keepalive time.
1766 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1767 // Create and return wrapper for the subchannel.
1768 return MakeRefCounted<SubchannelWrapper>(
1769 chand_, subchannel, std::move(health_check_service_name));
1770 }
1771
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1772 void UpdateState(
1773 grpc_connectivity_state state, const absl::Status& status,
1774 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1775 if (chand_->resolver_ == nullptr) return; // Shutting down.
1776 grpc_error* disconnect_error = chand_->disconnect_error();
1777 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1778 const char* extra = disconnect_error == GRPC_ERROR_NONE
1779 ? ""
1780 : " (ignoring -- channel shutting down)";
1781 gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1782 chand_, ConnectivityStateName(state), status.ToString().c_str(),
1783 picker.get(), extra);
1784 }
1785 // Do update only if not shutting down.
1786 if (disconnect_error == GRPC_ERROR_NONE) {
1787 chand_->UpdateStateAndPickerLocked(state, status, "helper",
1788 std::move(picker));
1789 }
1790 }
1791
RequestReresolution()1792 void RequestReresolution() override {
1793 if (chand_->resolver_ == nullptr) return; // Shutting down.
1794 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1795 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1796 }
1797 chand_->resolver_->RequestReresolutionLocked();
1798 }
1799
AddTraceEvent(TraceSeverity severity,absl::string_view message)1800 void AddTraceEvent(TraceSeverity severity,
1801 absl::string_view message) override {
1802 if (chand_->resolver_ == nullptr) return; // Shutting down.
1803 if (chand_->channelz_node_ != nullptr) {
1804 chand_->channelz_node_->AddTraceEvent(
1805 ConvertSeverityEnum(severity),
1806 grpc_slice_from_copied_buffer(message.data(), message.size()));
1807 }
1808 }
1809
1810 private:
ConvertSeverityEnum(TraceSeverity severity)1811 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1812 TraceSeverity severity) {
1813 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1814 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1815 return channelz::ChannelTrace::Error;
1816 }
1817
1818 ChannelData* chand_;
1819 };
1820
1821 //
1822 // ChannelData implementation
1823 //
1824
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1825 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1826 grpc_channel_element_args* args) {
1827 GPR_ASSERT(args->is_last);
1828 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1829 grpc_error* error = GRPC_ERROR_NONE;
1830 new (elem->channel_data) ChannelData(args, &error);
1831 return error;
1832 }
1833
Destroy(grpc_channel_element * elem)1834 void ChannelData::Destroy(grpc_channel_element* elem) {
1835 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1836 chand->~ChannelData();
1837 }
1838
GetEnableRetries(const grpc_channel_args * args)1839 bool GetEnableRetries(const grpc_channel_args* args) {
1840 return grpc_channel_arg_get_bool(
1841 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1842 }
1843
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1844 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1845 return static_cast<size_t>(grpc_channel_arg_get_integer(
1846 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1847 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1848 }
1849
GetSubchannelPool(const grpc_channel_args * args)1850 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1851 const grpc_channel_args* args) {
1852 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1853 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1854 if (use_local_subchannel_pool) {
1855 return MakeRefCounted<LocalSubchannelPool>();
1856 }
1857 return GlobalSubchannelPool::instance();
1858 }
1859
GetChannelzNode(const grpc_channel_args * args)1860 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1861 const grpc_arg* arg =
1862 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1863 if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1864 return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1865 }
1866 return nullptr;
1867 }
1868
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1869 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1870 : deadline_checking_enabled_(
1871 grpc_deadline_checking_enabled(args->channel_args)),
1872 enable_retries_(GetEnableRetries(args->channel_args)),
1873 per_rpc_retry_buffer_size_(
1874 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1875 owning_stack_(args->channel_stack),
1876 client_channel_factory_(
1877 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1878 channelz_node_(GetChannelzNode(args->channel_args)),
1879 work_serializer_(std::make_shared<WorkSerializer>()),
1880 interested_parties_(grpc_pollset_set_create()),
1881 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1882 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1883 disconnect_error_(GRPC_ERROR_NONE) {
1884 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1885 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1886 this, owning_stack_);
1887 }
1888 // Initialize data members.
1889 gpr_mu_init(&info_mu_);
1890 // Start backup polling.
1891 grpc_client_channel_start_backup_polling(interested_parties_);
1892 // Check client channel factory.
1893 if (client_channel_factory_ == nullptr) {
1894 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1895 "Missing client channel factory in args for client channel filter");
1896 return;
1897 }
1898 // Get server name to resolve, using proxy mapper if needed.
1899 const char* server_uri = grpc_channel_arg_get_string(
1900 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1901 if (server_uri == nullptr) {
1902 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1903 "server URI channel arg missing or wrong type in client channel "
1904 "filter");
1905 return;
1906 }
1907 // Get default service config. If none is specified via the client API,
1908 // we use an empty config.
1909 const char* service_config_json = grpc_channel_arg_get_string(
1910 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1911 if (service_config_json == nullptr) service_config_json = "{}";
1912 *error = GRPC_ERROR_NONE;
1913 default_service_config_ =
1914 ServiceConfig::Create(args->channel_args, service_config_json, error);
1915 if (*error != GRPC_ERROR_NONE) {
1916 default_service_config_.reset();
1917 return;
1918 }
1919 absl::StatusOr<URI> uri = URI::Parse(server_uri);
1920 if (uri.ok() && !uri->path().empty()) {
1921 server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1922 }
1923 char* proxy_name = nullptr;
1924 grpc_channel_args* new_args = nullptr;
1925 ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1926 &new_args);
1927 target_uri_.reset(proxy_name != nullptr ? proxy_name
1928 : gpr_strdup(server_uri));
1929 // Strip out service config channel arg, so that it doesn't affect
1930 // subchannel uniqueness when the args flow down to that layer.
1931 const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1932 channel_args_ = grpc_channel_args_copy_and_remove(
1933 new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1934 grpc_channel_args_destroy(new_args);
1935 keepalive_time_ = grpc_channel_args_find_integer(
1936 channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1937 {-1 /* default value, unset */, 1, INT_MAX});
1938 if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1939 std::string error_message =
1940 absl::StrCat("the target uri is not valid: ", target_uri_.get());
1941 *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1942 return;
1943 }
1944 *error = GRPC_ERROR_NONE;
1945 }
1946
~ChannelData()1947 ChannelData::~ChannelData() {
1948 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1949 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1950 }
1951 DestroyResolverAndLbPolicyLocked();
1952 grpc_channel_args_destroy(channel_args_);
1953 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1954 // Stop backup polling.
1955 grpc_client_channel_stop_backup_polling(interested_parties_);
1956 grpc_pollset_set_destroy(interested_parties_);
1957 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1958 gpr_mu_destroy(&info_mu_);
1959 }
1960
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1961 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1962 const Resolver::Result& resolver_result,
1963 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1964 // Prefer the LB policy config found in the service config.
1965 if (parsed_service_config->parsed_lb_config() != nullptr) {
1966 return parsed_service_config->parsed_lb_config();
1967 }
1968 // Try the deprecated LB policy name from the service config.
1969 // If not, try the setting from channel args.
1970 const char* policy_name = nullptr;
1971 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1972 policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1973 } else {
1974 const grpc_arg* channel_arg =
1975 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1976 policy_name = grpc_channel_arg_get_string(channel_arg);
1977 }
1978 // Use pick_first if nothing was specified and we didn't select grpclb
1979 // above.
1980 if (policy_name == nullptr) policy_name = "pick_first";
1981 // Now that we have the policy name, construct an empty config for it.
1982 Json config_json = Json::Array{Json::Object{
1983 {policy_name, Json::Object{}},
1984 }};
1985 grpc_error* parse_error = GRPC_ERROR_NONE;
1986 auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1987 config_json, &parse_error);
1988 // The policy name came from one of three places:
1989 // - The deprecated loadBalancingPolicy field in the service config,
1990 // in which case the code in ClientChannelServiceConfigParser
1991 // already verified that the policy does not require a config.
1992 // - One of the hard-coded values here, all of which are known to not
1993 // require a config.
1994 // - A channel arg, in which case the application did something that
1995 // is a misuse of our API.
1996 // In the first two cases, these assertions will always be true. In
1997 // the last case, this is probably fine for now.
1998 // TODO(roth): If the last case becomes a problem, add better error
1999 // handling here.
2000 GPR_ASSERT(lb_policy_config != nullptr);
2001 GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
2002 return lb_policy_config;
2003 }
2004
OnResolverResultChangedLocked(Resolver::Result result)2005 void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) {
2006 // Handle race conditions.
2007 if (resolver_ == nullptr) return;
2008 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2009 gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
2010 }
2011 // We only want to trace the address resolution in the follow cases:
2012 // (a) Address resolution resulted in service config change.
2013 // (b) Address resolution that causes number of backends to go from
2014 // zero to non-zero.
2015 // (c) Address resolution that causes number of backends to go from
2016 // non-zero to zero.
2017 // (d) Address resolution that causes a new LB policy to be created.
2018 //
2019 // We track a list of strings to eventually be concatenated and traced.
2020 absl::InlinedVector<const char*, 3> trace_strings;
2021 if (result.addresses.empty() && previous_resolution_contained_addresses_) {
2022 trace_strings.push_back("Address list became empty");
2023 } else if (!result.addresses.empty() &&
2024 !previous_resolution_contained_addresses_) {
2025 trace_strings.push_back("Address list became non-empty");
2026 }
2027 previous_resolution_contained_addresses_ = !result.addresses.empty();
2028 // The result of grpc_error_string() is owned by the error itself.
2029 // We're storing that string in trace_strings, so we need to make sure
2030 // that the error lives until we're done with the string.
2031 grpc_error* service_config_error =
2032 GRPC_ERROR_REF(result.service_config_error);
2033 if (service_config_error != GRPC_ERROR_NONE) {
2034 trace_strings.push_back(grpc_error_string(service_config_error));
2035 }
2036 // Choose the service config.
2037 RefCountedPtr<ServiceConfig> service_config;
2038 RefCountedPtr<ConfigSelector> config_selector;
2039 if (service_config_error != GRPC_ERROR_NONE) {
2040 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2041 gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
2042 this, grpc_error_string(service_config_error));
2043 }
2044 // If the service config was invalid, then fallback to the
2045 // previously returned service config.
2046 if (saved_service_config_ != nullptr) {
2047 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2048 gpr_log(GPR_INFO,
2049 "chand=%p: resolver returned invalid service config. "
2050 "Continuing to use previous service config.",
2051 this);
2052 }
2053 service_config = saved_service_config_;
2054 config_selector = saved_config_selector_;
2055 } else {
2056 // We received an invalid service config and we don't have a
2057 // previous service config to fall back to. Put the channel into
2058 // TRANSIENT_FAILURE.
2059 OnResolverErrorLocked(GRPC_ERROR_REF(service_config_error));
2060 trace_strings.push_back("no valid service config");
2061 }
2062 } else if (result.service_config == nullptr) {
2063 // Resolver did not return any service config.
2064 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2065 gpr_log(GPR_INFO,
2066 "chand=%p: resolver returned no service config. Using default "
2067 "service config for channel.",
2068 this);
2069 }
2070 service_config = default_service_config_;
2071 } else {
2072 // Use ServiceConfig and ConfigSelector returned by resolver.
2073 service_config = result.service_config;
2074 config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
2075 }
2076 if (service_config != nullptr) {
2077 // Extract global config for client channel.
2078 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2079 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2080 service_config->GetGlobalParsedConfig(
2081 internal::ClientChannelServiceConfigParser::ParserIndex()));
2082 // Choose LB policy config.
2083 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
2084 ChooseLbPolicy(result, parsed_service_config);
2085 // Check if the ServiceConfig has changed.
2086 const bool service_config_changed =
2087 saved_service_config_ == nullptr ||
2088 service_config->json_string() != saved_service_config_->json_string();
2089 // Check if the ConfigSelector has changed.
2090 const bool config_selector_changed = !ConfigSelector::Equals(
2091 saved_config_selector_.get(), config_selector.get());
2092 // If either has changed, apply the global parameters now.
2093 if (service_config_changed || config_selector_changed) {
2094 // Update service config in control plane.
2095 UpdateServiceConfigInControlPlaneLocked(
2096 std::move(service_config), std::move(config_selector),
2097 parsed_service_config, lb_policy_config->name());
2098 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2099 gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
2100 }
2101 // Create or update LB policy, as needed.
2102 CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
2103 std::move(result));
2104 if (service_config_changed || config_selector_changed) {
2105 // Start using new service config for calls.
2106 // This needs to happen after the LB policy has been updated, since
2107 // the ConfigSelector may need the LB policy to know about new
2108 // destinations before it can send RPCs to those destinations.
2109 UpdateServiceConfigInDataPlaneLocked();
2110 // TODO(ncteisen): might be worth somehow including a snippet of the
2111 // config in the trace, at the risk of bloating the trace logs.
2112 trace_strings.push_back("Service config changed");
2113 }
2114 }
2115 // Add channel trace event.
2116 if (!trace_strings.empty()) {
2117 std::string message =
2118 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
2119 if (channelz_node_ != nullptr) {
2120 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
2121 grpc_slice_from_cpp_string(message));
2122 }
2123 }
2124 GRPC_ERROR_UNREF(service_config_error);
2125 }
2126
OnResolverErrorLocked(grpc_error * error)2127 void ChannelData::OnResolverErrorLocked(grpc_error* error) {
2128 if (resolver_ == nullptr) {
2129 GRPC_ERROR_UNREF(error);
2130 return;
2131 }
2132 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2133 gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
2134 grpc_error_string(error));
2135 }
2136 // If we already have an LB policy from a previous resolution
2137 // result, then we continue to let it set the connectivity state.
2138 // Otherwise, we go into TRANSIENT_FAILURE.
2139 if (lb_policy_ == nullptr) {
2140 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2141 "Resolver transient failure", &error, 1);
2142 {
2143 MutexLock lock(&resolution_mu_);
2144 // Update resolver transient failure.
2145 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2146 resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
2147 // Process calls that were queued waiting for the resolver result.
2148 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2149 call = call->next) {
2150 grpc_call_element* elem = call->elem;
2151 CallData* calld = static_cast<CallData*>(elem->call_data);
2152 grpc_error* error = GRPC_ERROR_NONE;
2153 if (calld->CheckResolutionLocked(elem, &error)) {
2154 calld->AsyncResolutionDone(elem, error);
2155 }
2156 }
2157 }
2158 // Update connectivity state.
2159 UpdateStateAndPickerLocked(
2160 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
2161 "resolver failure",
2162 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2163 state_error));
2164 }
2165 GRPC_ERROR_UNREF(error);
2166 }
2167
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)2168 void ChannelData::CreateOrUpdateLbPolicyLocked(
2169 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
2170 Resolver::Result result) {
2171 // Construct update.
2172 LoadBalancingPolicy::UpdateArgs update_args;
2173 update_args.addresses = std::move(result.addresses);
2174 update_args.config = std::move(lb_policy_config);
2175 // Remove the config selector from channel args so that we're not holding
2176 // unnecessary refs that cause it to be destroyed somewhere other than in the
2177 // WorkSerializer.
2178 const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
2179 update_args.args =
2180 grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
2181 // Create policy if needed.
2182 if (lb_policy_ == nullptr) {
2183 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
2184 }
2185 // Update the policy.
2186 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2187 gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
2188 lb_policy_.get());
2189 }
2190 lb_policy_->UpdateLocked(std::move(update_args));
2191 }
2192
2193 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)2194 OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked(
2195 const grpc_channel_args& args) {
2196 LoadBalancingPolicy::Args lb_policy_args;
2197 lb_policy_args.work_serializer = work_serializer_;
2198 lb_policy_args.channel_control_helper =
2199 absl::make_unique<ClientChannelControlHelper>(this);
2200 lb_policy_args.args = &args;
2201 OrphanablePtr<LoadBalancingPolicy> lb_policy =
2202 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
2203 &grpc_client_channel_routing_trace);
2204 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2205 gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
2206 lb_policy.get());
2207 }
2208 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2209 interested_parties_);
2210 return lb_policy;
2211 }
2212
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)2213 void ChannelData::AddResolverQueuedCall(ResolverQueuedCall* call,
2214 grpc_polling_entity* pollent) {
2215 // Add call to queued calls list.
2216 call->next = resolver_queued_calls_;
2217 resolver_queued_calls_ = call;
2218 // Add call's pollent to channel's interested_parties, so that I/O
2219 // can be done under the call's CQ.
2220 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2221 }
2222
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)2223 void ChannelData::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
2224 grpc_polling_entity* pollent) {
2225 // Remove call's pollent from channel's interested_parties.
2226 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2227 // Remove from queued calls list.
2228 for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
2229 call = &(*call)->next) {
2230 if (*call == to_remove) {
2231 *call = to_remove->next;
2232 return;
2233 }
2234 }
2235 }
2236
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)2237 void ChannelData::UpdateServiceConfigInControlPlaneLocked(
2238 RefCountedPtr<ServiceConfig> service_config,
2239 RefCountedPtr<ConfigSelector> config_selector,
2240 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
2241 const char* lb_policy_name) {
2242 UniquePtr<char> service_config_json(
2243 gpr_strdup(service_config->json_string().c_str()));
2244 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2245 gpr_log(GPR_INFO,
2246 "chand=%p: resolver returned updated service config: \"%s\"", this,
2247 service_config_json.get());
2248 }
2249 // Save service config.
2250 saved_service_config_ = std::move(service_config);
2251 // Update health check service name if needed.
2252 if (health_check_service_name_ !=
2253 parsed_service_config->health_check_service_name()) {
2254 health_check_service_name_ =
2255 parsed_service_config->health_check_service_name();
2256 // Update health check service name used by existing subchannel wrappers.
2257 for (auto* subchannel_wrapper : subchannel_wrappers_) {
2258 subchannel_wrapper->UpdateHealthCheckServiceName(
2259 health_check_service_name_);
2260 }
2261 }
2262 // Swap out the data used by GetChannelInfo().
2263 UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
2264 {
2265 MutexLock lock(&info_mu_);
2266 info_lb_policy_name_ = std::move(lb_policy_name_owned);
2267 info_service_config_json_ = std::move(service_config_json);
2268 }
2269 // Save config selector.
2270 saved_config_selector_ = std::move(config_selector);
2271 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2272 gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
2273 saved_config_selector_.get());
2274 }
2275 }
2276
UpdateServiceConfigInDataPlaneLocked()2277 void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
2278 // Grab ref to service config.
2279 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
2280 // Grab ref to config selector. Use default if resolver didn't supply one.
2281 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
2282 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2283 gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
2284 saved_config_selector_.get());
2285 }
2286 if (config_selector == nullptr) {
2287 config_selector =
2288 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
2289 }
2290 // Get retry throttle data from service config.
2291 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2292 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2293 saved_service_config_->GetGlobalParsedConfig(
2294 internal::ClientChannelServiceConfigParser::ParserIndex()));
2295 absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
2296 retry_throttle_config = parsed_service_config->retry_throttling();
2297 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
2298 if (retry_throttle_config.has_value()) {
2299 retry_throttle_data = internal::ServerRetryThrottleMap::GetDataForServer(
2300 server_name_, retry_throttle_config.value().max_milli_tokens,
2301 retry_throttle_config.value().milli_token_ratio);
2302 }
2303 // Construct per-LB filter stack.
2304 std::vector<const grpc_channel_filter*> filters =
2305 config_selector->GetFilters();
2306 filters.push_back(&kDynamicTerminationFilterVtable);
2307 absl::InlinedVector<grpc_arg, 2> args_to_add;
2308 args_to_add.push_back(grpc_channel_arg_pointer_create(
2309 const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
2310 &kChannelDataArgPointerVtable));
2311 if (retry_throttle_data != nullptr) {
2312 args_to_add.push_back(grpc_channel_arg_pointer_create(
2313 const_cast<char*>(GRPC_ARG_RETRY_THROTTLE_DATA),
2314 retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
2315 }
2316 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
2317 channel_args_, args_to_add.data(), args_to_add.size());
2318 RefCountedPtr<DynamicFilters> dynamic_filters =
2319 DynamicFilters::Create(new_args, std::move(filters));
2320 GPR_ASSERT(dynamic_filters != nullptr);
2321 grpc_channel_args_destroy(new_args);
2322 // Grab data plane lock to update service config.
2323 //
2324 // We defer unreffing the old values (and deallocating memory) until
2325 // after releasing the lock to keep the critical section small.
2326 std::set<grpc_call_element*> calls_pending_resolver_result;
2327 {
2328 MutexLock lock(&resolution_mu_);
2329 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2330 resolver_transient_failure_error_ = GRPC_ERROR_NONE;
2331 // Update service config.
2332 received_service_config_data_ = true;
2333 // Old values will be unreffed after lock is released.
2334 service_config_.swap(service_config);
2335 config_selector_.swap(config_selector);
2336 dynamic_filters_.swap(dynamic_filters);
2337 // Process calls that were queued waiting for the resolver result.
2338 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2339 call = call->next) {
2340 grpc_call_element* elem = call->elem;
2341 CallData* calld = static_cast<CallData*>(elem->call_data);
2342 grpc_error* error = GRPC_ERROR_NONE;
2343 if (calld->CheckResolutionLocked(elem, &error)) {
2344 calld->AsyncResolutionDone(elem, error);
2345 }
2346 }
2347 }
2348 // Old values will be unreffed after lock is released when they go out
2349 // of scope.
2350 }
2351
CreateResolverLocked()2352 void ChannelData::CreateResolverLocked() {
2353 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2354 gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
2355 }
2356 resolver_ = ResolverRegistry::CreateResolver(
2357 target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
2358 absl::make_unique<ResolverResultHandler>(this));
2359 // Since the validity of the args was checked when the channel was created,
2360 // CreateResolver() must return a non-null result.
2361 GPR_ASSERT(resolver_ != nullptr);
2362 UpdateStateAndPickerLocked(
2363 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
2364 absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
2365 resolver_->StartLocked();
2366 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2367 gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
2368 }
2369 }
2370
DestroyResolverAndLbPolicyLocked()2371 void ChannelData::DestroyResolverAndLbPolicyLocked() {
2372 if (resolver_ != nullptr) {
2373 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2374 gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
2375 resolver_.get());
2376 }
2377 resolver_.reset();
2378 if (lb_policy_ != nullptr) {
2379 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2380 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
2381 lb_policy_.get());
2382 }
2383 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
2384 interested_parties_);
2385 lb_policy_.reset();
2386 }
2387 }
2388 }
2389
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)2390 void ChannelData::UpdateStateAndPickerLocked(
2391 grpc_connectivity_state state, const absl::Status& status,
2392 const char* reason,
2393 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
2394 // Clean the control plane when entering IDLE.
2395 if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2396 saved_service_config_.reset();
2397 saved_config_selector_.reset();
2398 }
2399 // Update connectivity state.
2400 state_tracker_.SetState(state, status, reason);
2401 if (channelz_node_ != nullptr) {
2402 channelz_node_->SetConnectivityState(state);
2403 channelz_node_->AddTraceEvent(
2404 channelz::ChannelTrace::Severity::Info,
2405 grpc_slice_from_static_string(
2406 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
2407 state)));
2408 }
2409 // Grab data plane lock to do subchannel updates and update the picker.
2410 //
2411 // Note that we want to minimize the work done while holding the data
2412 // plane lock, to keep the critical section small. So, for all of the
2413 // objects that we might wind up unreffing here, we actually hold onto
2414 // the refs until after we release the lock, and then unref them at
2415 // that point. This includes the following:
2416 // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
2417 // - ref stored in service_config_
2418 // - ref stored in config_selector_
2419 // - ref stored in dynamic_filters_
2420 // - ownership of the existing picker in picker_
2421 RefCountedPtr<ServiceConfig> service_config_to_unref;
2422 RefCountedPtr<ConfigSelector> config_selector_to_unref;
2423 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
2424 {
2425 MutexLock lock(&data_plane_mu_);
2426 // Handle subchannel updates.
2427 for (auto& p : pending_subchannel_updates_) {
2428 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2429 gpr_log(GPR_INFO,
2430 "chand=%p: updating subchannel wrapper %p data plane "
2431 "connected_subchannel to %p",
2432 this, p.first.get(), p.second.get());
2433 }
2434 // Note: We do not remove the entry from pending_subchannel_updates_
2435 // here, since this would unref the subchannel wrapper; instead,
2436 // we wait until we've released the lock to clear the map.
2437 p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
2438 }
2439 // Swap out the picker.
2440 // Note: Original value will be destroyed after the lock is released.
2441 picker_.swap(picker);
2442 // Clean the data plane if the updated picker is nullptr.
2443 if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2444 received_service_config_data_ = false;
2445 // Note: We save the objects to unref until after the lock is released.
2446 service_config_to_unref = std::move(service_config_);
2447 config_selector_to_unref = std::move(config_selector_);
2448 dynamic_filters_to_unref = std::move(dynamic_filters_);
2449 }
2450 // Re-process queued picks.
2451 for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
2452 call = call->next) {
2453 grpc_error* error = GRPC_ERROR_NONE;
2454 if (call->lb_call->PickSubchannelLocked(&error)) {
2455 call->lb_call->AsyncPickDone(error);
2456 }
2457 }
2458 }
2459 // Clear the pending update map after releasing the lock, to keep the
2460 // critical section small.
2461 pending_subchannel_updates_.clear();
2462 }
2463
DoPingLocked(grpc_transport_op * op)2464 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
2465 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
2466 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
2467 }
2468 LoadBalancingPolicy::PickResult result =
2469 picker_->Pick(LoadBalancingPolicy::PickArgs());
2470 ConnectedSubchannel* connected_subchannel = nullptr;
2471 if (result.subchannel != nullptr) {
2472 SubchannelWrapper* subchannel =
2473 static_cast<SubchannelWrapper*>(result.subchannel.get());
2474 connected_subchannel = subchannel->connected_subchannel();
2475 }
2476 if (connected_subchannel != nullptr) {
2477 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
2478 } else {
2479 if (result.error == GRPC_ERROR_NONE) {
2480 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2481 "LB policy dropped call on ping");
2482 }
2483 }
2484 return result.error;
2485 }
2486
StartTransportOpLocked(grpc_transport_op * op)2487 void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
2488 // Connectivity watch.
2489 if (op->start_connectivity_watch != nullptr) {
2490 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
2491 std::move(op->start_connectivity_watch));
2492 }
2493 if (op->stop_connectivity_watch != nullptr) {
2494 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
2495 }
2496 // Ping.
2497 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2498 grpc_error* error = DoPingLocked(op);
2499 if (error != GRPC_ERROR_NONE) {
2500 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
2501 GRPC_ERROR_REF(error));
2502 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
2503 }
2504 op->bind_pollset = nullptr;
2505 op->send_ping.on_initiate = nullptr;
2506 op->send_ping.on_ack = nullptr;
2507 }
2508 // Reset backoff.
2509 if (op->reset_connect_backoff) {
2510 if (lb_policy_ != nullptr) {
2511 lb_policy_->ResetBackoffLocked();
2512 }
2513 }
2514 // Disconnect or enter IDLE.
2515 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
2516 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2517 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
2518 grpc_error_string(op->disconnect_with_error));
2519 }
2520 DestroyResolverAndLbPolicyLocked();
2521 intptr_t value;
2522 if (grpc_error_get_int(op->disconnect_with_error,
2523 GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
2524 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
2525 if (disconnect_error() == GRPC_ERROR_NONE) {
2526 // Enter IDLE state.
2527 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
2528 "channel entering IDLE", nullptr);
2529 }
2530 GRPC_ERROR_UNREF(op->disconnect_with_error);
2531 } else {
2532 // Disconnect.
2533 GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
2534 GRPC_ERROR_NONE);
2535 disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
2536 UpdateStateAndPickerLocked(
2537 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
2538 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2539 GRPC_ERROR_REF(op->disconnect_with_error)));
2540 }
2541 }
2542 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
2543 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
2544 }
2545
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)2546 void ChannelData::StartTransportOp(grpc_channel_element* elem,
2547 grpc_transport_op* op) {
2548 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2549 GPR_ASSERT(op->set_accept_stream == false);
2550 // Handle bind_pollset.
2551 if (op->bind_pollset != nullptr) {
2552 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
2553 }
2554 // Pop into control plane work_serializer for remaining ops.
2555 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
2556 chand->work_serializer_->Run(
2557 [chand, op]() { chand->StartTransportOpLocked(op); }, DEBUG_LOCATION);
2558 }
2559
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)2560 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
2561 const grpc_channel_info* info) {
2562 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2563 MutexLock lock(&chand->info_mu_);
2564 if (info->lb_policy_name != nullptr) {
2565 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
2566 }
2567 if (info->service_config_json != nullptr) {
2568 *info->service_config_json =
2569 gpr_strdup(chand->info_service_config_json_.get());
2570 }
2571 }
2572
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)2573 void ChannelData::AddLbQueuedCall(LbQueuedCall* call,
2574 grpc_polling_entity* pollent) {
2575 // Add call to queued picks list.
2576 call->next = lb_queued_calls_;
2577 lb_queued_calls_ = call;
2578 // Add call's pollent to channel's interested_parties, so that I/O
2579 // can be done under the call's CQ.
2580 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2581 }
2582
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)2583 void ChannelData::RemoveLbQueuedCall(LbQueuedCall* to_remove,
2584 grpc_polling_entity* pollent) {
2585 // Remove call's pollent from channel's interested_parties.
2586 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2587 // Remove from queued picks list.
2588 for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
2589 call = &(*call)->next) {
2590 if (*call == to_remove) {
2591 *call = to_remove->next;
2592 return;
2593 }
2594 }
2595 }
2596
2597 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const2598 ChannelData::GetConnectedSubchannelInDataPlane(
2599 SubchannelInterface* subchannel) const {
2600 SubchannelWrapper* subchannel_wrapper =
2601 static_cast<SubchannelWrapper*>(subchannel);
2602 ConnectedSubchannel* connected_subchannel =
2603 subchannel_wrapper->connected_subchannel_in_data_plane();
2604 if (connected_subchannel == nullptr) return nullptr;
2605 return connected_subchannel->Ref();
2606 }
2607
TryToConnectLocked()2608 void ChannelData::TryToConnectLocked() {
2609 if (lb_policy_ != nullptr) {
2610 lb_policy_->ExitIdleLocked();
2611 } else if (resolver_ == nullptr) {
2612 CreateResolverLocked();
2613 }
2614 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2615 }
2616
CheckConnectivityState(bool try_to_connect)2617 grpc_connectivity_state ChannelData::CheckConnectivityState(
2618 bool try_to_connect) {
2619 grpc_connectivity_state out = state_tracker_.state();
2620 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2621 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2622 work_serializer_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION);
2623 }
2624 return out;
2625 }
2626
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2627 void ChannelData::AddConnectivityWatcher(
2628 grpc_connectivity_state initial_state,
2629 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2630 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2631 }
2632
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2633 void ChannelData::RemoveConnectivityWatcher(
2634 AsyncConnectivityStateWatcherInterface* watcher) {
2635 new ConnectivityWatcherRemover(this, watcher);
2636 }
2637
2638 //
2639 // CallData implementation
2640 //
2641
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)2642 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
2643 const grpc_call_element_args& args)
2644 : deadline_state_(elem, args,
2645 GPR_LIKELY(chand.deadline_checking_enabled())
2646 ? args.deadline
2647 : GRPC_MILLIS_INF_FUTURE),
2648 path_(grpc_slice_ref_internal(args.path)),
2649 call_start_time_(args.start_time),
2650 deadline_(args.deadline),
2651 arena_(args.arena),
2652 owning_call_(args.call_stack),
2653 call_combiner_(args.call_combiner),
2654 call_context_(args.context) {}
2655
~CallData()2656 CallData::~CallData() {
2657 grpc_slice_unref_internal(path_);
2658 GRPC_ERROR_UNREF(cancel_error_);
2659 // Make sure there are no remaining pending batches.
2660 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2661 GPR_ASSERT(pending_batches_[i] == nullptr);
2662 }
2663 }
2664
Init(grpc_call_element * elem,const grpc_call_element_args * args)2665 grpc_error* CallData::Init(grpc_call_element* elem,
2666 const grpc_call_element_args* args) {
2667 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2668 new (elem->call_data) CallData(elem, *chand, *args);
2669 return GRPC_ERROR_NONE;
2670 }
2671
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2672 void CallData::Destroy(grpc_call_element* elem,
2673 const grpc_call_final_info* /*final_info*/,
2674 grpc_closure* then_schedule_closure) {
2675 CallData* calld = static_cast<CallData*>(elem->call_data);
2676 RefCountedPtr<DynamicFilters::Call> dynamic_call =
2677 std::move(calld->dynamic_call_);
2678 calld->~CallData();
2679 if (GPR_LIKELY(dynamic_call != nullptr)) {
2680 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2681 } else {
2682 // TODO(yashkt) : This can potentially be a Closure::Run
2683 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
2684 }
2685 }
2686
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2687 void CallData::StartTransportStreamOpBatch(
2688 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2689 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2690 CallData* calld = static_cast<CallData*>(elem->call_data);
2691 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2692 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2693 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2694 }
2695 // Intercept recv_initial_metadata for config selector on-committed callback.
2696 if (batch->recv_initial_metadata) {
2697 calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
2698 }
2699 // If we've previously been cancelled, immediately fail any new batches.
2700 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2701 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2702 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2703 chand, calld, grpc_error_string(calld->cancel_error_));
2704 }
2705 // Note: This will release the call combiner.
2706 grpc_transport_stream_op_batch_finish_with_failure(
2707 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2708 return;
2709 }
2710 // Handle cancellation.
2711 if (GPR_UNLIKELY(batch->cancel_stream)) {
2712 // Stash a copy of cancel_error in our call data, so that we can use
2713 // it for subsequent operations. This ensures that if the call is
2714 // cancelled before any batches are passed down (e.g., if the deadline
2715 // is in the past when the call starts), we can return the right
2716 // error to the caller when the first batch does get passed down.
2717 GRPC_ERROR_UNREF(calld->cancel_error_);
2718 calld->cancel_error_ =
2719 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2720 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2721 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2722 calld, grpc_error_string(calld->cancel_error_));
2723 }
2724 // If we do not have a dynamic call (i.e., name resolution has not
2725 // yet completed), fail all pending batches. Otherwise, send the
2726 // cancellation down to the dynamic call.
2727 if (calld->dynamic_call_ == nullptr) {
2728 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2729 NoYieldCallCombiner);
2730 // Note: This will release the call combiner.
2731 grpc_transport_stream_op_batch_finish_with_failure(
2732 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2733 } else {
2734 // Note: This will release the call combiner.
2735 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2736 }
2737 return;
2738 }
2739 // Add the batch to the pending list.
2740 calld->PendingBatchesAdd(elem, batch);
2741 // Check if we've already created a dynamic call.
2742 // Note that once we have done so, we do not need to acquire the channel's
2743 // resolution mutex, which is more efficient (especially for streaming calls).
2744 if (calld->dynamic_call_ != nullptr) {
2745 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2746 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2747 chand, calld, calld->dynamic_call_.get());
2748 }
2749 calld->PendingBatchesResume(elem);
2750 return;
2751 }
2752 // We do not yet have a dynamic call.
2753 // For batches containing a send_initial_metadata op, acquire the
2754 // channel's resolution mutex to apply the service config to the call,
2755 // after which we will create a dynamic call.
2756 if (GPR_LIKELY(batch->send_initial_metadata)) {
2757 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2758 gpr_log(GPR_INFO,
2759 "chand=%p calld=%p: grabbing resolution mutex to apply service "
2760 "config",
2761 chand, calld);
2762 }
2763 CheckResolution(elem, GRPC_ERROR_NONE);
2764 } else {
2765 // For all other batches, release the call combiner.
2766 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2767 gpr_log(GPR_INFO,
2768 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2769 calld);
2770 }
2771 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2772 "batch does not include send_initial_metadata");
2773 }
2774 }
2775
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2776 void CallData::SetPollent(grpc_call_element* elem,
2777 grpc_polling_entity* pollent) {
2778 CallData* calld = static_cast<CallData*>(elem->call_data);
2779 calld->pollent_ = pollent;
2780 }
2781
2782 //
2783 // pending_batches management
2784 //
2785
GetBatchIndex(grpc_transport_stream_op_batch * batch)2786 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2787 // Note: It is important the send_initial_metadata be the first entry
2788 // here, since the code in pick_subchannel_locked() assumes it will be.
2789 if (batch->send_initial_metadata) return 0;
2790 if (batch->send_message) return 1;
2791 if (batch->send_trailing_metadata) return 2;
2792 if (batch->recv_initial_metadata) return 3;
2793 if (batch->recv_message) return 4;
2794 if (batch->recv_trailing_metadata) return 5;
2795 GPR_UNREACHABLE_CODE(return (size_t)-1);
2796 }
2797
2798 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2799 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2800 grpc_transport_stream_op_batch* batch) {
2801 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802 const size_t idx = GetBatchIndex(batch);
2803 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804 gpr_log(GPR_INFO,
2805 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2806 this, idx);
2807 }
2808 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2809 GPR_ASSERT(pending == nullptr);
2810 pending = batch;
2811 }
2812
2813 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2814 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2815 grpc_transport_stream_op_batch* batch =
2816 static_cast<grpc_transport_stream_op_batch*>(arg);
2817 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2818 // Note: This will release the call combiner.
2819 grpc_transport_stream_op_batch_finish_with_failure(
2820 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2821 }
2822
2823 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2824 void CallData::PendingBatchesFail(
2825 grpc_call_element* elem, grpc_error* error,
2826 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2827 GPR_ASSERT(error != GRPC_ERROR_NONE);
2828 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2829 size_t num_batches = 0;
2830 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831 if (pending_batches_[i] != nullptr) ++num_batches;
2832 }
2833 gpr_log(GPR_INFO,
2834 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2835 elem->channel_data, this, num_batches, grpc_error_string(error));
2836 }
2837 CallCombinerClosureList closures;
2838 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2839 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2840 if (batch != nullptr) {
2841 batch->handler_private.extra_arg = this;
2842 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2843 FailPendingBatchInCallCombiner, batch,
2844 grpc_schedule_on_exec_ctx);
2845 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2846 "PendingBatchesFail");
2847 batch = nullptr;
2848 }
2849 }
2850 if (yield_call_combiner_predicate(closures)) {
2851 closures.RunClosures(call_combiner_);
2852 } else {
2853 closures.RunClosuresWithoutYielding(call_combiner_);
2854 }
2855 GRPC_ERROR_UNREF(error);
2856 }
2857
2858 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)2859 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2860 grpc_error* /*ignored*/) {
2861 grpc_transport_stream_op_batch* batch =
2862 static_cast<grpc_transport_stream_op_batch*>(arg);
2863 auto* elem =
2864 static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2865 auto* calld = static_cast<CallData*>(elem->call_data);
2866 // Note: This will release the call combiner.
2867 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2868 }
2869
2870 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2871 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2872 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2873 // Retries not enabled; send down batches as-is.
2874 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2875 size_t num_batches = 0;
2876 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2877 if (pending_batches_[i] != nullptr) ++num_batches;
2878 }
2879 gpr_log(GPR_INFO,
2880 "chand=%p calld=%p: starting %" PRIuPTR
2881 " pending batches on dynamic_call=%p",
2882 chand, this, num_batches, dynamic_call_.get());
2883 }
2884 CallCombinerClosureList closures;
2885 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2886 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2887 if (batch != nullptr) {
2888 batch->handler_private.extra_arg = elem;
2889 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2890 ResumePendingBatchInCallCombiner, batch, nullptr);
2891 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2892 "PendingBatchesResume");
2893 batch = nullptr;
2894 }
2895 }
2896 // Note: This will release the call combiner.
2897 closures.RunClosures(call_combiner_);
2898 }
2899
2900 //
2901 // name resolution
2902 //
2903
2904 // A class to handle the call combiner cancellation callback for a
2905 // queued pick.
2906 class CallData::ResolverQueuedCallCanceller {
2907 public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2908 explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2909 auto* calld = static_cast<CallData*>(elem->call_data);
2910 GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2911 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2912 grpc_schedule_on_exec_ctx);
2913 calld->call_combiner_->SetNotifyOnCancel(&closure_);
2914 }
2915
2916 private:
CancelLocked(void * arg,grpc_error * error)2917 static void CancelLocked(void* arg, grpc_error* error) {
2918 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2919 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
2920 auto* calld = static_cast<CallData*>(self->elem_->call_data);
2921 {
2922 MutexLock lock(chand->resolution_mu());
2923 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2924 gpr_log(GPR_INFO,
2925 "chand=%p calld=%p: cancelling resolver queued pick: "
2926 "error=%s self=%p calld->resolver_pick_canceller=%p",
2927 chand, calld, grpc_error_string(error), self,
2928 calld->resolver_call_canceller_);
2929 }
2930 if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2931 // Remove pick from list of queued picks.
2932 calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2933 // Fail pending batches on the call.
2934 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2935 YieldCallCombinerIfPendingBatchesFound);
2936 }
2937 }
2938 GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2939 delete self;
2940 }
2941
2942 grpc_call_element* elem_;
2943 grpc_closure closure_;
2944 };
2945
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2946 void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2947 grpc_call_element* elem) {
2948 if (!queued_pending_resolver_result_) return;
2949 auto* chand = static_cast<ChannelData*>(elem->channel_data);
2950 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2951 gpr_log(GPR_INFO,
2952 "chand=%p calld=%p: removing from resolver queued picks list",
2953 chand, this);
2954 }
2955 chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2956 queued_pending_resolver_result_ = false;
2957 // Lame the call combiner canceller.
2958 resolver_call_canceller_ = nullptr;
2959 }
2960
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2961 void CallData::MaybeAddCallToResolverQueuedCallsLocked(
2962 grpc_call_element* elem) {
2963 if (queued_pending_resolver_result_) return;
2964 auto* chand = static_cast<ChannelData*>(elem->channel_data);
2965 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2966 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2967 chand, this);
2968 }
2969 queued_pending_resolver_result_ = true;
2970 resolver_queued_call_.elem = elem;
2971 chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2972 // Register call combiner cancellation callback.
2973 resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2974 }
2975
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2976 grpc_error* CallData::ApplyServiceConfigToCallLocked(
2977 grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2978 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2979 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2980 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2981 chand, this);
2982 }
2983 ConfigSelector* config_selector = chand->config_selector();
2984 if (config_selector != nullptr) {
2985 // Use the ConfigSelector to determine the config for the call.
2986 ConfigSelector::CallConfig call_config =
2987 config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2988 if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2989 on_call_committed_ = std::move(call_config.on_call_committed);
2990 // Create a ServiceConfigCallData for the call. This stores a ref to the
2991 // ServiceConfig and caches the right set of parsed configs to use for
2992 // the call. The MethodConfig will store itself in the call context,
2993 // so that it can be accessed by filters in the subchannel, and it
2994 // will be cleaned up when the call ends.
2995 auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2996 std::move(call_config.service_config), call_config.method_configs,
2997 std::move(call_config.call_attributes), call_context_);
2998 // Apply our own method params to the call.
2999 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
3000 service_config_call_data->GetMethodParsedConfig(
3001 internal::ClientChannelServiceConfigParser::ParserIndex()));
3002 if (method_params != nullptr) {
3003 // If the deadline from the service config is shorter than the one
3004 // from the client API, reset the deadline timer.
3005 if (chand->deadline_checking_enabled() && method_params->timeout() != 0) {
3006 const grpc_millis per_method_deadline =
3007 grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3008 method_params->timeout();
3009 if (per_method_deadline < deadline_) {
3010 deadline_ = per_method_deadline;
3011 grpc_deadline_state_reset(elem, deadline_);
3012 }
3013 }
3014 // If the service config set wait_for_ready and the application
3015 // did not explicitly set it, use the value from the service config.
3016 uint32_t* send_initial_metadata_flags =
3017 &pending_batches_[0]
3018 ->payload->send_initial_metadata.send_initial_metadata_flags;
3019 if (method_params->wait_for_ready().has_value() &&
3020 !(*send_initial_metadata_flags &
3021 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3022 if (method_params->wait_for_ready().value()) {
3023 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3024 } else {
3025 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3026 }
3027 }
3028 }
3029 // Set the dynamic filter stack.
3030 dynamic_filters_ = chand->dynamic_filters();
3031 }
3032 return GRPC_ERROR_NONE;
3033 }
3034
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error * error)3035 void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback(
3036 void* arg, grpc_error* error) {
3037 auto* self = static_cast<CallData*>(arg);
3038 if (self->on_call_committed_ != nullptr) {
3039 self->on_call_committed_();
3040 self->on_call_committed_ = nullptr;
3041 }
3042 // Chain to original callback.
3043 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3044 GRPC_ERROR_REF(error));
3045 }
3046
3047 // TODO(roth): Consider not intercepting this callback unless we
3048 // actually need to, if this causes a performance problem.
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)3049 void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
3050 grpc_transport_stream_op_batch* batch) {
3051 original_recv_initial_metadata_ready_ =
3052 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3053 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
3054 RecvInitialMetadataReadyForConfigSelectorCommitCallback,
3055 this, nullptr);
3056 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3057 &recv_initial_metadata_ready_;
3058 }
3059
AsyncResolutionDone(grpc_call_element * elem,grpc_error * error)3060 void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) {
3061 GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
3062 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
3063 }
3064
ResolutionDone(void * arg,grpc_error * error)3065 void CallData::ResolutionDone(void* arg, grpc_error* error) {
3066 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3067 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3068 CallData* calld = static_cast<CallData*>(elem->call_data);
3069 if (error != GRPC_ERROR_NONE) {
3070 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3071 gpr_log(GPR_INFO,
3072 "chand=%p calld=%p: error applying config to call: error=%s",
3073 chand, calld, grpc_error_string(error));
3074 }
3075 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3076 return;
3077 }
3078 calld->CreateDynamicCall(elem);
3079 }
3080
CheckResolution(void * arg,grpc_error * error)3081 void CallData::CheckResolution(void* arg, grpc_error* error) {
3082 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3083 CallData* calld = static_cast<CallData*>(elem->call_data);
3084 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3085 bool resolution_complete;
3086 {
3087 MutexLock lock(chand->resolution_mu());
3088 resolution_complete = calld->CheckResolutionLocked(elem, &error);
3089 }
3090 if (resolution_complete) {
3091 ResolutionDone(elem, error);
3092 GRPC_ERROR_UNREF(error);
3093 }
3094 }
3095
CheckResolutionLocked(grpc_call_element * elem,grpc_error ** error)3096 bool CallData::CheckResolutionLocked(grpc_call_element* elem,
3097 grpc_error** error) {
3098 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3099 // If we're still in IDLE, we need to start resolving.
3100 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
3101 // Bounce into the control plane work serializer to start resolving,
3102 // in case we are still in IDLE state. Since we are holding on to the
3103 // resolution mutex here, we offload it on the ExecCtx so that we don't
3104 // deadlock with ourselves.
3105 GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked");
3106 ExecCtx::Run(
3107 DEBUG_LOCATION,
3108 GRPC_CLOSURE_CREATE(
3109 [](void* arg, grpc_error* /*error*/) {
3110 auto* chand = static_cast<ChannelData*>(arg);
3111 chand->work_serializer()->Run(
3112 [chand]() {
3113 chand->CheckConnectivityState(/*try_to_connect=*/true);
3114 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(),
3115 "CheckResolutionLocked");
3116 },
3117 DEBUG_LOCATION);
3118 },
3119 chand, nullptr),
3120 GRPC_ERROR_NONE);
3121 }
3122 // Get send_initial_metadata batch and flags.
3123 auto& send_initial_metadata =
3124 pending_batches_[0]->payload->send_initial_metadata;
3125 grpc_metadata_batch* initial_metadata_batch =
3126 send_initial_metadata.send_initial_metadata;
3127 const uint32_t send_initial_metadata_flags =
3128 send_initial_metadata.send_initial_metadata_flags;
3129 // If we don't yet have a resolver result, we need to queue the call
3130 // until we get one.
3131 if (GPR_UNLIKELY(!chand->received_service_config_data())) {
3132 // If the resolver returned transient failure before returning the
3133 // first service config, fail any non-wait_for_ready calls.
3134 grpc_error* resolver_error = chand->resolver_transient_failure_error();
3135 if (resolver_error != GRPC_ERROR_NONE &&
3136 (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
3137 0) {
3138 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3139 *error = GRPC_ERROR_REF(resolver_error);
3140 return true;
3141 }
3142 // Either the resolver has not yet returned a result, or it has
3143 // returned transient failure but the call is wait_for_ready. In
3144 // either case, queue the call.
3145 MaybeAddCallToResolverQueuedCallsLocked(elem);
3146 return false;
3147 }
3148 // Apply service config to call if not yet applied.
3149 if (GPR_LIKELY(!service_config_applied_)) {
3150 service_config_applied_ = true;
3151 *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
3152 }
3153 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3154 return true;
3155 }
3156
CreateDynamicCall(grpc_call_element * elem)3157 void CallData::CreateDynamicCall(grpc_call_element* elem) {
3158 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3159 DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
3160 pollent_,
3161 path_,
3162 call_start_time_,
3163 deadline_,
3164 arena_,
3165 call_context_,
3166 call_combiner_};
3167 grpc_error* error = GRPC_ERROR_NONE;
3168 DynamicFilters* channel_stack = args.channel_stack.get();
3169 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
3170 if (error != GRPC_ERROR_NONE) {
3171 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3172 gpr_log(GPR_INFO,
3173 "chand=%p calld=%p: failed to create dynamic call: error=%s",
3174 chand, this, grpc_error_string(error));
3175 }
3176 PendingBatchesFail(elem, error, YieldCallCombiner);
3177 return;
3178 }
3179 PendingBatchesResume(elem);
3180 }
3181
3182 //
3183 // RetryingCall implementation
3184 //
3185
3186 // Retry support:
3187 //
3188 // In order to support retries, we act as a proxy for stream op batches.
3189 // When we get a batch from the surface, we add it to our list of pending
3190 // batches, and we then use those batches to construct separate "child"
3191 // batches to be started on the subchannel call. When the child batches
3192 // return, we then decide which pending batches have been completed and
3193 // schedule their callbacks accordingly. If a subchannel call fails and
3194 // we want to retry it, we do a new pick and start again, constructing
3195 // new "child" batches for the new subchannel call.
3196 //
3197 // Note that retries are committed when receiving data from the server
3198 // (except for Trailers-Only responses). However, there may be many
3199 // send ops started before receiving any data, so we may have already
3200 // completed some number of send ops (and returned the completions up to
3201 // the surface) by the time we realize that we need to retry. To deal
3202 // with this, we cache data for send ops, so that we can replay them on a
3203 // different subchannel call even after we have completed the original
3204 // batches.
3205 //
3206 // There are two sets of data to maintain:
3207 // - In call_data (in the parent channel), we maintain a list of pending
3208 // ops and cached data for send ops.
3209 // - In the subchannel call, we maintain state to indicate what ops have
3210 // already been sent down to that call.
3211 //
3212 // When constructing the "child" batches, we compare those two sets of
3213 // data to see which batches need to be sent to the subchannel call.
3214
3215 // TODO(roth): In subsequent PRs:
3216 // - add support for transparent retries (including initial metadata)
3217 // - figure out how to record stats in census for retries
3218 // (census filter is on top of this one)
3219 // - add census stats for retries
3220
RetryingCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,const ClientChannelMethodParsedConfig::RetryPolicy * retry_policy)3221 RetryingCall::RetryingCall(
3222 ChannelData* chand, const grpc_call_element_args& args,
3223 grpc_polling_entity* pollent,
3224 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
3225 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy)
3226 : chand_(chand),
3227 pollent_(pollent),
3228 retry_throttle_data_(std::move(retry_throttle_data)),
3229 retry_policy_(retry_policy),
3230 retry_backoff_(
3231 BackOff::Options()
3232 .set_initial_backoff(
3233 retry_policy_ == nullptr ? 0 : retry_policy_->initial_backoff)
3234 .set_multiplier(retry_policy_ == nullptr
3235 ? 0
3236 : retry_policy_->backoff_multiplier)
3237 .set_jitter(RETRY_BACKOFF_JITTER)
3238 .set_max_backoff(
3239 retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff)),
3240 path_(grpc_slice_ref_internal(args.path)),
3241 call_start_time_(args.start_time),
3242 deadline_(args.deadline),
3243 arena_(args.arena),
3244 owning_call_(args.call_stack),
3245 call_combiner_(args.call_combiner),
3246 call_context_(args.context),
3247 pending_send_initial_metadata_(false),
3248 pending_send_message_(false),
3249 pending_send_trailing_metadata_(false),
3250 enable_retries_(true),
3251 retry_committed_(false),
3252 last_attempt_got_server_pushback_(false) {}
3253
~RetryingCall()3254 RetryingCall::~RetryingCall() {
3255 grpc_slice_unref_internal(path_);
3256 GRPC_ERROR_UNREF(cancel_error_);
3257 // Make sure there are no remaining pending batches.
3258 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3259 GPR_ASSERT(pending_batches_[i].batch == nullptr);
3260 }
3261 }
3262
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3263 void RetryingCall::StartTransportStreamOpBatch(
3264 grpc_transport_stream_op_batch* batch) {
3265 // If we've previously been cancelled, immediately fail any new batches.
3266 if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
3267 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3268 gpr_log(GPR_INFO,
3269 "chand=%p retrying_call=%p: failing batch with error: %s", chand_,
3270 this, grpc_error_string(cancel_error_));
3271 }
3272 // Note: This will release the call combiner.
3273 grpc_transport_stream_op_batch_finish_with_failure(
3274 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3275 return;
3276 }
3277 // Handle cancellation.
3278 if (GPR_UNLIKELY(batch->cancel_stream)) {
3279 // Stash a copy of cancel_error in our call data, so that we can use
3280 // it for subsequent operations. This ensures that if the call is
3281 // cancelled before any batches are passed down (e.g., if the deadline
3282 // is in the past when the call starts), we can return the right
3283 // error to the caller when the first batch does get passed down.
3284 GRPC_ERROR_UNREF(cancel_error_);
3285 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3286 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3287 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: recording cancel_error=%s",
3288 chand_, this, grpc_error_string(cancel_error_));
3289 }
3290 // If we do not have an LB call (i.e., a pick has not yet been started),
3291 // fail all pending batches. Otherwise, send the cancellation down to the
3292 // LB call.
3293 if (lb_call_ == nullptr) {
3294 // TODO(roth): If there is a pending retry callback, do we need to
3295 // cancel it here?
3296 PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
3297 // Note: This will release the call combiner.
3298 grpc_transport_stream_op_batch_finish_with_failure(
3299 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3300 } else {
3301 // Note: This will release the call combiner.
3302 lb_call_->StartTransportStreamOpBatch(batch);
3303 }
3304 return;
3305 }
3306 // Add the batch to the pending list.
3307 PendingBatchesAdd(batch);
3308 // Create LB call if needed.
3309 // TODO(roth): If we get a new batch from the surface after the
3310 // initial retry attempt has failed, while the retry timer is pending,
3311 // we should queue the batch and not try to send it immediately.
3312 if (lb_call_ == nullptr) {
3313 // We do not yet have an LB call, so create one.
3314 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3315 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: creating LB call", chand_,
3316 this);
3317 }
3318 CreateLbCall(this, GRPC_ERROR_NONE);
3319 return;
3320 }
3321 // Send batches to LB call.
3322 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3323 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: starting batch on lb_call=%p",
3324 chand_, this, lb_call_.get());
3325 }
3326 PendingBatchesResume();
3327 }
3328
subchannel_call() const3329 RefCountedPtr<SubchannelCall> RetryingCall::subchannel_call() const {
3330 if (lb_call_ == nullptr) return nullptr;
3331 return lb_call_->subchannel_call();
3332 }
3333
3334 //
3335 // send op data caching
3336 //
3337
MaybeCacheSendOpsForBatch(PendingBatch * pending)3338 void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
3339 if (pending->send_ops_cached) return;
3340 pending->send_ops_cached = true;
3341 grpc_transport_stream_op_batch* batch = pending->batch;
3342 // Save a copy of metadata for send_initial_metadata ops.
3343 if (batch->send_initial_metadata) {
3344 seen_send_initial_metadata_ = true;
3345 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
3346 grpc_metadata_batch* send_initial_metadata =
3347 batch->payload->send_initial_metadata.send_initial_metadata;
3348 send_initial_metadata_storage_ =
3349 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3350 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
3351 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
3352 send_initial_metadata_storage_);
3353 send_initial_metadata_flags_ =
3354 batch->payload->send_initial_metadata.send_initial_metadata_flags;
3355 peer_string_ = batch->payload->send_initial_metadata.peer_string;
3356 }
3357 // Set up cache for send_message ops.
3358 if (batch->send_message) {
3359 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
3360 std::move(batch->payload->send_message.send_message));
3361 send_messages_.push_back(cache);
3362 }
3363 // Save metadata batch for send_trailing_metadata ops.
3364 if (batch->send_trailing_metadata) {
3365 seen_send_trailing_metadata_ = true;
3366 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
3367 grpc_metadata_batch* send_trailing_metadata =
3368 batch->payload->send_trailing_metadata.send_trailing_metadata;
3369 send_trailing_metadata_storage_ =
3370 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3371 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
3372 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
3373 send_trailing_metadata_storage_);
3374 }
3375 }
3376
FreeCachedSendInitialMetadata()3377 void RetryingCall::FreeCachedSendInitialMetadata() {
3378 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3379 gpr_log(GPR_INFO,
3380 "chand=%p retrying_call=%p: destroying send_initial_metadata",
3381 chand_, this);
3382 }
3383 grpc_metadata_batch_destroy(&send_initial_metadata_);
3384 }
3385
FreeCachedSendMessage(size_t idx)3386 void RetryingCall::FreeCachedSendMessage(size_t idx) {
3387 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3388 gpr_log(GPR_INFO,
3389 "chand=%p retrying_call=%p: destroying send_messages[%" PRIuPTR "]",
3390 chand_, this, idx);
3391 }
3392 send_messages_[idx]->Destroy();
3393 }
3394
FreeCachedSendTrailingMetadata()3395 void RetryingCall::FreeCachedSendTrailingMetadata() {
3396 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3397 gpr_log(GPR_INFO,
3398 "chand_=%p retrying_call=%p: destroying send_trailing_metadata",
3399 chand_, this);
3400 }
3401 grpc_metadata_batch_destroy(&send_trailing_metadata_);
3402 }
3403
FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState * retry_state)3404 void RetryingCall::FreeCachedSendOpDataAfterCommit(
3405 SubchannelCallRetryState* retry_state) {
3406 if (retry_state->completed_send_initial_metadata) {
3407 FreeCachedSendInitialMetadata();
3408 }
3409 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
3410 FreeCachedSendMessage(i);
3411 }
3412 if (retry_state->completed_send_trailing_metadata) {
3413 FreeCachedSendTrailingMetadata();
3414 }
3415 }
3416
FreeCachedSendOpDataForCompletedBatch(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)3417 void RetryingCall::FreeCachedSendOpDataForCompletedBatch(
3418 SubchannelCallBatchData* batch_data,
3419 SubchannelCallRetryState* retry_state) {
3420 if (batch_data->batch.send_initial_metadata) {
3421 FreeCachedSendInitialMetadata();
3422 }
3423 if (batch_data->batch.send_message) {
3424 FreeCachedSendMessage(retry_state->completed_send_message_count - 1);
3425 }
3426 if (batch_data->batch.send_trailing_metadata) {
3427 FreeCachedSendTrailingMetadata();
3428 }
3429 }
3430
3431 //
3432 // pending_batches management
3433 //
3434
GetBatchIndex(grpc_transport_stream_op_batch * batch)3435 size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
3436 // Note: It is important the send_initial_metadata be the first entry
3437 // here, since the code in pick_subchannel_locked() assumes it will be.
3438 if (batch->send_initial_metadata) return 0;
3439 if (batch->send_message) return 1;
3440 if (batch->send_trailing_metadata) return 2;
3441 if (batch->recv_initial_metadata) return 3;
3442 if (batch->recv_message) return 4;
3443 if (batch->recv_trailing_metadata) return 5;
3444 GPR_UNREACHABLE_CODE(return (size_t)-1);
3445 }
3446
3447 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3448 void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) {
3449 const size_t idx = GetBatchIndex(batch);
3450 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3451 gpr_log(
3452 GPR_INFO,
3453 "chand_=%p retrying_call=%p: adding pending batch at index %" PRIuPTR,
3454 chand_, this, idx);
3455 }
3456 PendingBatch* pending = &pending_batches_[idx];
3457 GPR_ASSERT(pending->batch == nullptr);
3458 pending->batch = batch;
3459 pending->send_ops_cached = false;
3460 if (enable_retries_) {
3461 // Update state in calld about pending batches.
3462 // Also check if the batch takes us over the retry buffer limit.
3463 // Note: We don't check the size of trailing metadata here, because
3464 // gRPC clients do not send trailing metadata.
3465 if (batch->send_initial_metadata) {
3466 pending_send_initial_metadata_ = true;
3467 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
3468 batch->payload->send_initial_metadata.send_initial_metadata);
3469 }
3470 if (batch->send_message) {
3471 pending_send_message_ = true;
3472 bytes_buffered_for_retry_ +=
3473 batch->payload->send_message.send_message->length();
3474 }
3475 if (batch->send_trailing_metadata) {
3476 pending_send_trailing_metadata_ = true;
3477 }
3478 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
3479 chand_->per_rpc_retry_buffer_size())) {
3480 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3481 gpr_log(GPR_INFO,
3482 "chand=%p retrying_call=%p: exceeded retry buffer size, "
3483 "committing",
3484 chand_, this);
3485 }
3486 SubchannelCallRetryState* retry_state =
3487 lb_call_ == nullptr ? nullptr
3488 : static_cast<SubchannelCallRetryState*>(
3489 lb_call_->GetParentData());
3490 RetryCommit(retry_state);
3491 // If we are not going to retry and have not yet started, pretend
3492 // retries are disabled so that we don't bother with retry overhead.
3493 if (num_attempts_completed_ == 0) {
3494 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3495 gpr_log(GPR_INFO,
3496 "chand=%p retrying_call=%p: disabling retries before first "
3497 "attempt",
3498 chand_, this);
3499 }
3500 // TODO(roth): Treat this as a commit?
3501 enable_retries_ = false;
3502 }
3503 }
3504 }
3505 }
3506
PendingBatchClear(PendingBatch * pending)3507 void RetryingCall::PendingBatchClear(PendingBatch* pending) {
3508 if (enable_retries_) {
3509 if (pending->batch->send_initial_metadata) {
3510 pending_send_initial_metadata_ = false;
3511 }
3512 if (pending->batch->send_message) {
3513 pending_send_message_ = false;
3514 }
3515 if (pending->batch->send_trailing_metadata) {
3516 pending_send_trailing_metadata_ = false;
3517 }
3518 }
3519 pending->batch = nullptr;
3520 }
3521
MaybeClearPendingBatch(PendingBatch * pending)3522 void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) {
3523 grpc_transport_stream_op_batch* batch = pending->batch;
3524 // We clear the pending batch if all of its callbacks have been
3525 // scheduled and reset to nullptr.
3526 if (batch->on_complete == nullptr &&
3527 (!batch->recv_initial_metadata ||
3528 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
3529 nullptr) &&
3530 (!batch->recv_message ||
3531 batch->payload->recv_message.recv_message_ready == nullptr) &&
3532 (!batch->recv_trailing_metadata ||
3533 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
3534 nullptr)) {
3535 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3536 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: clearing pending batch",
3537 chand_, this);
3538 }
3539 PendingBatchClear(pending);
3540 }
3541 }
3542
3543 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)3544 void RetryingCall::FailPendingBatchInCallCombiner(void* arg,
3545 grpc_error* error) {
3546 grpc_transport_stream_op_batch* batch =
3547 static_cast<grpc_transport_stream_op_batch*>(arg);
3548 RetryingCall* call =
3549 static_cast<RetryingCall*>(batch->handler_private.extra_arg);
3550 // Note: This will release the call combiner.
3551 grpc_transport_stream_op_batch_finish_with_failure(
3552 batch, GRPC_ERROR_REF(error), call->call_combiner_);
3553 }
3554
3555 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)3556 void RetryingCall::PendingBatchesFail(
3557 grpc_error* error,
3558 YieldCallCombinerPredicate yield_call_combiner_predicate) {
3559 GPR_ASSERT(error != GRPC_ERROR_NONE);
3560 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3561 size_t num_batches = 0;
3562 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3563 if (pending_batches_[i].batch != nullptr) ++num_batches;
3564 }
3565 gpr_log(GPR_INFO,
3566 "chand=%p retrying_call=%p: failing %" PRIuPTR
3567 " pending batches: %s",
3568 chand_, this, num_batches, grpc_error_string(error));
3569 }
3570 CallCombinerClosureList closures;
3571 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3572 PendingBatch* pending = &pending_batches_[i];
3573 grpc_transport_stream_op_batch* batch = pending->batch;
3574 if (batch != nullptr) {
3575 batch->handler_private.extra_arg = this;
3576 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3577 FailPendingBatchInCallCombiner, batch,
3578 grpc_schedule_on_exec_ctx);
3579 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
3580 "PendingBatchesFail");
3581 PendingBatchClear(pending);
3582 }
3583 }
3584 if (yield_call_combiner_predicate(closures)) {
3585 closures.RunClosures(call_combiner_);
3586 } else {
3587 closures.RunClosuresWithoutYielding(call_combiner_);
3588 }
3589 GRPC_ERROR_UNREF(error);
3590 }
3591
3592 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)3593 void RetryingCall::ResumePendingBatchInCallCombiner(void* arg,
3594 grpc_error* /*ignored*/) {
3595 grpc_transport_stream_op_batch* batch =
3596 static_cast<grpc_transport_stream_op_batch*>(arg);
3597 auto* lb_call =
3598 static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
3599 // Note: This will release the call combiner.
3600 lb_call->StartTransportStreamOpBatch(batch);
3601 }
3602
3603 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3604 void RetryingCall::PendingBatchesResume() {
3605 if (enable_retries_) {
3606 StartRetriableSubchannelBatches(this, GRPC_ERROR_NONE);
3607 return;
3608 }
3609 // Retries not enabled; send down batches as-is.
3610 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3611 size_t num_batches = 0;
3612 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3613 if (pending_batches_[i].batch != nullptr) ++num_batches;
3614 }
3615 gpr_log(GPR_INFO,
3616 "chand=%p retrying_call=%p: starting %" PRIuPTR
3617 " pending batches on lb_call=%p",
3618 chand_, this, num_batches, lb_call_.get());
3619 }
3620 CallCombinerClosureList closures;
3621 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3622 PendingBatch* pending = &pending_batches_[i];
3623 grpc_transport_stream_op_batch* batch = pending->batch;
3624 if (batch != nullptr) {
3625 batch->handler_private.extra_arg = lb_call_.get();
3626 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3627 ResumePendingBatchInCallCombiner, batch, nullptr);
3628 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3629 "PendingBatchesResume");
3630 PendingBatchClear(pending);
3631 }
3632 }
3633 // Note: This will release the call combiner.
3634 closures.RunClosures(call_combiner_);
3635 }
3636
3637 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)3638 RetryingCall::PendingBatch* RetryingCall::PendingBatchFind(
3639 const char* log_message, Predicate predicate) {
3640 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3641 PendingBatch* pending = &pending_batches_[i];
3642 grpc_transport_stream_op_batch* batch = pending->batch;
3643 if (batch != nullptr && predicate(batch)) {
3644 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3645 gpr_log(
3646 GPR_INFO,
3647 "chand=%p retrying_call=%p: %s pending batch at index %" PRIuPTR,
3648 chand_, this, log_message, i);
3649 }
3650 return pending;
3651 }
3652 }
3653 return nullptr;
3654 }
3655
3656 //
3657 // retry code
3658 //
3659
RetryCommit(SubchannelCallRetryState * retry_state)3660 void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) {
3661 if (retry_committed_) return;
3662 retry_committed_ = true;
3663 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3664 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: committing retries", chand_,
3665 this);
3666 }
3667 if (retry_state != nullptr) {
3668 FreeCachedSendOpDataAfterCommit(retry_state);
3669 }
3670 }
3671
DoRetry(SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)3672 void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state,
3673 grpc_millis server_pushback_ms) {
3674 GPR_ASSERT(retry_policy_ != nullptr);
3675 // Reset LB call.
3676 lb_call_.reset();
3677 // Compute backoff delay.
3678 grpc_millis next_attempt_time;
3679 if (server_pushback_ms >= 0) {
3680 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
3681 last_attempt_got_server_pushback_ = true;
3682 } else {
3683 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
3684 last_attempt_got_server_pushback_ = false;
3685 }
3686 next_attempt_time = retry_backoff_.NextAttemptTime();
3687 }
3688 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3689 gpr_log(GPR_INFO,
3690 "chand=%p retrying_call=%p: retrying failed call in %" PRId64 " ms",
3691 chand_, this, next_attempt_time - ExecCtx::Get()->Now());
3692 }
3693 // Schedule retry after computed delay.
3694 GRPC_CLOSURE_INIT(&retry_closure_, CreateLbCall, this, nullptr);
3695 grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
3696 // Update bookkeeping.
3697 if (retry_state != nullptr) retry_state->retry_dispatched = true;
3698 }
3699
MaybeRetry(SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)3700 bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data,
3701 grpc_status_code status,
3702 grpc_mdelem* server_pushback_md) {
3703 // Get retry policy.
3704 if (retry_policy_ == nullptr) return false;
3705 // If we've already dispatched a retry from this call, return true.
3706 // This catches the case where the batch has multiple callbacks
3707 // (i.e., it includes either recv_message or recv_initial_metadata).
3708 SubchannelCallRetryState* retry_state = nullptr;
3709 if (batch_data != nullptr) {
3710 retry_state = static_cast<SubchannelCallRetryState*>(
3711 batch_data->lb_call->GetParentData());
3712 if (retry_state->retry_dispatched) {
3713 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3714 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retry already dispatched",
3715 chand_, this);
3716 }
3717 return true;
3718 }
3719 }
3720 // Check status.
3721 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
3722 if (retry_throttle_data_ != nullptr) {
3723 retry_throttle_data_->RecordSuccess();
3724 }
3725 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3726 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call succeeded", chand_,
3727 this);
3728 }
3729 return false;
3730 }
3731 // Status is not OK. Check whether the status is retryable.
3732 if (!retry_policy_->retryable_status_codes.Contains(status)) {
3733 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3734 gpr_log(
3735 GPR_INFO,
3736 "chand=%p retrying_call=%p: status %s not configured as retryable",
3737 chand_, this, grpc_status_code_to_string(status));
3738 }
3739 return false;
3740 }
3741 // Record the failure and check whether retries are throttled.
3742 // Note that it's important for this check to come after the status
3743 // code check above, since we should only record failures whose statuses
3744 // match the configured retryable status codes, so that we don't count
3745 // things like failures due to malformed requests (INVALID_ARGUMENT).
3746 // Conversely, it's important for this to come before the remaining
3747 // checks, so that we don't fail to record failures due to other factors.
3748 if (retry_throttle_data_ != nullptr &&
3749 !retry_throttle_data_->RecordFailure()) {
3750 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3751 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries throttled", chand_,
3752 this);
3753 }
3754 return false;
3755 }
3756 // Check whether the call is committed.
3757 if (retry_committed_) {
3758 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3759 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries already committed",
3760 chand_, this);
3761 }
3762 return false;
3763 }
3764 // Check whether we have retries remaining.
3765 ++num_attempts_completed_;
3766 if (num_attempts_completed_ >= retry_policy_->max_attempts) {
3767 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3768 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: exceeded %d retry attempts",
3769 chand_, this, retry_policy_->max_attempts);
3770 }
3771 return false;
3772 }
3773 // If the call was cancelled from the surface, don't retry.
3774 if (cancel_error_ != GRPC_ERROR_NONE) {
3775 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3776 gpr_log(GPR_INFO,
3777 "chand=%p retrying_call=%p: call cancelled from surface, not "
3778 "retrying",
3779 chand_, this);
3780 }
3781 return false;
3782 }
3783 // Check server push-back.
3784 grpc_millis server_pushback_ms = -1;
3785 if (server_pushback_md != nullptr) {
3786 // If the value is "-1" or any other unparseable string, we do not retry.
3787 uint32_t ms;
3788 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
3789 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3790 gpr_log(
3791 GPR_INFO,
3792 "chand=%p retrying_call=%p: not retrying due to server push-back",
3793 chand_, this);
3794 }
3795 return false;
3796 } else {
3797 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3798 gpr_log(GPR_INFO,
3799 "chand=%p retrying_call=%p: server push-back: retry in %u ms",
3800 chand_, this, ms);
3801 }
3802 server_pushback_ms = static_cast<grpc_millis>(ms);
3803 }
3804 }
3805 DoRetry(retry_state, server_pushback_ms);
3806 return true;
3807 }
3808
3809 //
3810 // RetryingCall::SubchannelCallBatchData
3811 //
3812
3813 RetryingCall::SubchannelCallBatchData*
Create(RetryingCall * call,int refcount,bool set_on_complete)3814 RetryingCall::SubchannelCallBatchData::Create(RetryingCall* call, int refcount,
3815 bool set_on_complete) {
3816 return call->arena_->New<SubchannelCallBatchData>(call, refcount,
3817 set_on_complete);
3818 }
3819
SubchannelCallBatchData(RetryingCall * call,int refcount,bool set_on_complete)3820 RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData(
3821 RetryingCall* call, int refcount, bool set_on_complete)
3822 : call(call), lb_call(call->lb_call_) {
3823 SubchannelCallRetryState* retry_state =
3824 static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3825 batch.payload = &retry_state->batch_payload;
3826 gpr_ref_init(&refs, refcount);
3827 if (set_on_complete) {
3828 GRPC_CLOSURE_INIT(&on_complete, RetryingCall::OnComplete, this,
3829 grpc_schedule_on_exec_ctx);
3830 batch.on_complete = &on_complete;
3831 }
3832 GRPC_CALL_STACK_REF(call->owning_call_, "batch_data");
3833 }
3834
Destroy()3835 void RetryingCall::SubchannelCallBatchData::Destroy() {
3836 SubchannelCallRetryState* retry_state =
3837 static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3838 if (batch.send_initial_metadata) {
3839 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
3840 }
3841 if (batch.send_trailing_metadata) {
3842 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
3843 }
3844 if (batch.recv_initial_metadata) {
3845 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
3846 }
3847 if (batch.recv_trailing_metadata) {
3848 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
3849 }
3850 lb_call.reset();
3851 GRPC_CALL_STACK_UNREF(call->owning_call_, "batch_data");
3852 }
3853
3854 //
3855 // recv_initial_metadata callback handling
3856 //
3857
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)3858 void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg,
3859 grpc_error* error) {
3860 SubchannelCallBatchData* batch_data =
3861 static_cast<SubchannelCallBatchData*>(arg);
3862 // Find pending batch.
3863 PendingBatch* pending = batch_data->call->PendingBatchFind(
3864 "invoking recv_initial_metadata_ready for",
3865 [](grpc_transport_stream_op_batch* batch) {
3866 return batch->recv_initial_metadata &&
3867 batch->payload->recv_initial_metadata
3868 .recv_initial_metadata_ready != nullptr;
3869 });
3870 GPR_ASSERT(pending != nullptr);
3871 // Return metadata.
3872 SubchannelCallRetryState* retry_state =
3873 static_cast<SubchannelCallRetryState*>(
3874 batch_data->lb_call->GetParentData());
3875 grpc_metadata_batch_move(
3876 &retry_state->recv_initial_metadata,
3877 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
3878 // Update bookkeeping.
3879 // Note: Need to do this before invoking the callback, since invoking
3880 // the callback will result in yielding the call combiner.
3881 grpc_closure* recv_initial_metadata_ready =
3882 pending->batch->payload->recv_initial_metadata
3883 .recv_initial_metadata_ready;
3884 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3885 nullptr;
3886 batch_data->call->MaybeClearPendingBatch(pending);
3887 batch_data->Unref();
3888 // Invoke callback.
3889 Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
3890 GRPC_ERROR_REF(error));
3891 }
3892
RecvInitialMetadataReady(void * arg,grpc_error * error)3893 void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) {
3894 SubchannelCallBatchData* batch_data =
3895 static_cast<SubchannelCallBatchData*>(arg);
3896 RetryingCall* call = batch_data->call;
3897 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3898 gpr_log(
3899 GPR_INFO,
3900 "chand=%p retrying_call=%p: got recv_initial_metadata_ready, error=%s",
3901 call->chand_, call, grpc_error_string(error));
3902 }
3903 SubchannelCallRetryState* retry_state =
3904 static_cast<SubchannelCallRetryState*>(
3905 batch_data->lb_call->GetParentData());
3906 retry_state->completed_recv_initial_metadata = true;
3907 // If a retry was already dispatched, then we're not going to use the
3908 // result of this recv_initial_metadata op, so do nothing.
3909 if (retry_state->retry_dispatched) {
3910 GRPC_CALL_COMBINER_STOP(
3911 call->call_combiner_,
3912 "recv_initial_metadata_ready after retry dispatched");
3913 return;
3914 }
3915 // If we got an error or a Trailers-Only response and have not yet gotten
3916 // the recv_trailing_metadata_ready callback, then defer propagating this
3917 // callback back to the surface. We can evaluate whether to retry when
3918 // recv_trailing_metadata comes back.
3919 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
3920 error != GRPC_ERROR_NONE) &&
3921 !retry_state->completed_recv_trailing_metadata)) {
3922 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3923 gpr_log(
3924 GPR_INFO,
3925 "chand=%p retrying_call=%p: deferring recv_initial_metadata_ready "
3926 "(Trailers-Only)",
3927 call->chand_, call);
3928 }
3929 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
3930 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
3931 if (!retry_state->started_recv_trailing_metadata) {
3932 // recv_trailing_metadata not yet started by application; start it
3933 // ourselves to get status.
3934 call->StartInternalRecvTrailingMetadata();
3935 } else {
3936 GRPC_CALL_COMBINER_STOP(
3937 call->call_combiner_,
3938 "recv_initial_metadata_ready trailers-only or error");
3939 }
3940 return;
3941 }
3942 // Received valid initial metadata, so commit the call.
3943 call->RetryCommit(retry_state);
3944 // Invoke the callback to return the result to the surface.
3945 // Manually invoking a callback function; it does not take ownership of error.
3946 call->InvokeRecvInitialMetadataCallback(batch_data, error);
3947 }
3948
3949 //
3950 // recv_message callback handling
3951 //
3952
InvokeRecvMessageCallback(void * arg,grpc_error * error)3953 void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
3954 SubchannelCallBatchData* batch_data =
3955 static_cast<SubchannelCallBatchData*>(arg);
3956 RetryingCall* call = batch_data->call;
3957 // Find pending op.
3958 PendingBatch* pending = call->PendingBatchFind(
3959 "invoking recv_message_ready for",
3960 [](grpc_transport_stream_op_batch* batch) {
3961 return batch->recv_message &&
3962 batch->payload->recv_message.recv_message_ready != nullptr;
3963 });
3964 GPR_ASSERT(pending != nullptr);
3965 // Return payload.
3966 SubchannelCallRetryState* retry_state =
3967 static_cast<SubchannelCallRetryState*>(
3968 batch_data->lb_call->GetParentData());
3969 *pending->batch->payload->recv_message.recv_message =
3970 std::move(retry_state->recv_message);
3971 // Update bookkeeping.
3972 // Note: Need to do this before invoking the callback, since invoking
3973 // the callback will result in yielding the call combiner.
3974 grpc_closure* recv_message_ready =
3975 pending->batch->payload->recv_message.recv_message_ready;
3976 pending->batch->payload->recv_message.recv_message_ready = nullptr;
3977 call->MaybeClearPendingBatch(pending);
3978 batch_data->Unref();
3979 // Invoke callback.
3980 Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
3981 }
3982
RecvMessageReady(void * arg,grpc_error * error)3983 void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) {
3984 SubchannelCallBatchData* batch_data =
3985 static_cast<SubchannelCallBatchData*>(arg);
3986 RetryingCall* call = batch_data->call;
3987 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3988 gpr_log(GPR_INFO,
3989 "chand=%p retrying_call=%p: got recv_message_ready, error=%s",
3990 call->chand_, call, grpc_error_string(error));
3991 }
3992 SubchannelCallRetryState* retry_state =
3993 static_cast<SubchannelCallRetryState*>(
3994 batch_data->lb_call->GetParentData());
3995 ++retry_state->completed_recv_message_count;
3996 // If a retry was already dispatched, then we're not going to use the
3997 // result of this recv_message op, so do nothing.
3998 if (retry_state->retry_dispatched) {
3999 GRPC_CALL_COMBINER_STOP(call->call_combiner_,
4000 "recv_message_ready after retry dispatched");
4001 return;
4002 }
4003 // If we got an error or the payload was nullptr and we have not yet gotten
4004 // the recv_trailing_metadata_ready callback, then defer propagating this
4005 // callback back to the surface. We can evaluate whether to retry when
4006 // recv_trailing_metadata comes back.
4007 if (GPR_UNLIKELY(
4008 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
4009 !retry_state->completed_recv_trailing_metadata)) {
4010 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4011 gpr_log(
4012 GPR_INFO,
4013 "chand=%p retrying_call=%p: deferring recv_message_ready (nullptr "
4014 "message and recv_trailing_metadata pending)",
4015 call->chand_, call);
4016 }
4017 retry_state->recv_message_ready_deferred_batch = batch_data;
4018 retry_state->recv_message_error = GRPC_ERROR_REF(error);
4019 if (!retry_state->started_recv_trailing_metadata) {
4020 // recv_trailing_metadata not yet started by application; start it
4021 // ourselves to get status.
4022 call->StartInternalRecvTrailingMetadata();
4023 } else {
4024 GRPC_CALL_COMBINER_STOP(call->call_combiner_, "recv_message_ready null");
4025 }
4026 return;
4027 }
4028 // Received a valid message, so commit the call.
4029 call->RetryCommit(retry_state);
4030 // Invoke the callback to return the result to the surface.
4031 // Manually invoking a callback function; it does not take ownership of error.
4032 call->InvokeRecvMessageCallback(batch_data, error);
4033 }
4034
4035 //
4036 // recv_trailing_metadata handling
4037 //
4038
GetCallStatus(grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)4039 void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch,
4040 grpc_error* error, grpc_status_code* status,
4041 grpc_mdelem** server_pushback_md) {
4042 if (error != GRPC_ERROR_NONE) {
4043 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
4044 } else {
4045 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
4046 *status =
4047 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
4048 if (server_pushback_md != nullptr &&
4049 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
4050 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
4051 }
4052 }
4053 GRPC_ERROR_UNREF(error);
4054 }
4055
AddClosureForRecvTrailingMetadataReady(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4056 void RetryingCall::AddClosureForRecvTrailingMetadataReady(
4057 SubchannelCallBatchData* batch_data, grpc_error* error,
4058 CallCombinerClosureList* closures) {
4059 // Find pending batch.
4060 PendingBatch* pending = PendingBatchFind(
4061 "invoking recv_trailing_metadata for",
4062 [](grpc_transport_stream_op_batch* batch) {
4063 return batch->recv_trailing_metadata &&
4064 batch->payload->recv_trailing_metadata
4065 .recv_trailing_metadata_ready != nullptr;
4066 });
4067 // If we generated the recv_trailing_metadata op internally via
4068 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
4069 if (pending == nullptr) {
4070 GRPC_ERROR_UNREF(error);
4071 return;
4072 }
4073 // Return metadata.
4074 SubchannelCallRetryState* retry_state =
4075 static_cast<SubchannelCallRetryState*>(
4076 batch_data->lb_call->GetParentData());
4077 grpc_metadata_batch_move(
4078 &retry_state->recv_trailing_metadata,
4079 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
4080 // Add closure.
4081 closures->Add(pending->batch->payload->recv_trailing_metadata
4082 .recv_trailing_metadata_ready,
4083 error, "recv_trailing_metadata_ready for pending batch");
4084 // Update bookkeeping.
4085 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
4086 nullptr;
4087 MaybeClearPendingBatch(pending);
4088 }
4089
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4090 void RetryingCall::AddClosuresForDeferredRecvCallbacks(
4091 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4092 CallCombinerClosureList* closures) {
4093 if (batch_data->batch.recv_trailing_metadata) {
4094 // Add closure for deferred recv_initial_metadata_ready.
4095 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
4096 nullptr)) {
4097 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4098 InvokeRecvInitialMetadataCallback,
4099 retry_state->recv_initial_metadata_ready_deferred_batch,
4100 grpc_schedule_on_exec_ctx);
4101 closures->Add(&retry_state->recv_initial_metadata_ready,
4102 retry_state->recv_initial_metadata_error,
4103 "resuming recv_initial_metadata_ready");
4104 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
4105 }
4106 // Add closure for deferred recv_message_ready.
4107 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
4108 nullptr)) {
4109 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
4110 InvokeRecvMessageCallback,
4111 retry_state->recv_message_ready_deferred_batch,
4112 grpc_schedule_on_exec_ctx);
4113 closures->Add(&retry_state->recv_message_ready,
4114 retry_state->recv_message_error,
4115 "resuming recv_message_ready");
4116 retry_state->recv_message_ready_deferred_batch = nullptr;
4117 }
4118 }
4119 }
4120
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)4121 bool RetryingCall::PendingBatchIsUnstarted(
4122 PendingBatch* pending, SubchannelCallRetryState* retry_state) {
4123 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
4124 return false;
4125 }
4126 if (pending->batch->send_initial_metadata &&
4127 !retry_state->started_send_initial_metadata) {
4128 return true;
4129 }
4130 if (pending->batch->send_message &&
4131 retry_state->started_send_message_count < send_messages_.size()) {
4132 return true;
4133 }
4134 if (pending->batch->send_trailing_metadata &&
4135 !retry_state->started_send_trailing_metadata) {
4136 return true;
4137 }
4138 return false;
4139 }
4140
AddClosuresToFailUnstartedPendingBatches(SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)4141 void RetryingCall::AddClosuresToFailUnstartedPendingBatches(
4142 SubchannelCallRetryState* retry_state, grpc_error* error,
4143 CallCombinerClosureList* closures) {
4144 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4145 PendingBatch* pending = &pending_batches_[i];
4146 if (PendingBatchIsUnstarted(pending, retry_state)) {
4147 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4148 gpr_log(GPR_INFO,
4149 "chand=%p retrying_call=%p: failing unstarted pending batch at "
4150 "index "
4151 "%" PRIuPTR,
4152 chand_, this, i);
4153 }
4154 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
4155 "failing on_complete for pending batch");
4156 pending->batch->on_complete = nullptr;
4157 MaybeClearPendingBatch(pending);
4158 }
4159 }
4160 GRPC_ERROR_UNREF(error);
4161 }
4162
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)4163 void RetryingCall::RunClosuresForCompletedCall(
4164 SubchannelCallBatchData* batch_data, grpc_error* error) {
4165 SubchannelCallRetryState* retry_state =
4166 static_cast<SubchannelCallRetryState*>(
4167 batch_data->lb_call->GetParentData());
4168 // Construct list of closures to execute.
4169 CallCombinerClosureList closures;
4170 // First, add closure for recv_trailing_metadata_ready.
4171 AddClosureForRecvTrailingMetadataReady(batch_data, GRPC_ERROR_REF(error),
4172 &closures);
4173 // If there are deferred recv_initial_metadata_ready or recv_message_ready
4174 // callbacks, add them to closures.
4175 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
4176 // Add closures to fail any pending batches that have not yet been started.
4177 AddClosuresToFailUnstartedPendingBatches(retry_state, GRPC_ERROR_REF(error),
4178 &closures);
4179 // Don't need batch_data anymore.
4180 batch_data->Unref();
4181 // Schedule all of the closures identified above.
4182 // Note: This will release the call combiner.
4183 closures.RunClosures(call_combiner_);
4184 GRPC_ERROR_UNREF(error);
4185 }
4186
RecvTrailingMetadataReady(void * arg,grpc_error * error)4187 void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
4188 SubchannelCallBatchData* batch_data =
4189 static_cast<SubchannelCallBatchData*>(arg);
4190 RetryingCall* call = batch_data->call;
4191 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4192 gpr_log(
4193 GPR_INFO,
4194 "chand=%p retrying_call=%p: got recv_trailing_metadata_ready, error=%s",
4195 call->chand_, call, grpc_error_string(error));
4196 }
4197 SubchannelCallRetryState* retry_state =
4198 static_cast<SubchannelCallRetryState*>(
4199 batch_data->lb_call->GetParentData());
4200 retry_state->completed_recv_trailing_metadata = true;
4201 // Get the call's status and check for server pushback metadata.
4202 grpc_status_code status = GRPC_STATUS_OK;
4203 grpc_mdelem* server_pushback_md = nullptr;
4204 grpc_metadata_batch* md_batch =
4205 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
4206 call->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
4207 &server_pushback_md);
4208 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4209 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call finished, status=%s",
4210 call->chand_, call, grpc_status_code_to_string(status));
4211 }
4212 // Check if we should retry.
4213 if (call->MaybeRetry(batch_data, status, server_pushback_md)) {
4214 // Unref batch_data for deferred recv_initial_metadata_ready or
4215 // recv_message_ready callbacks, if any.
4216 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
4217 batch_data->Unref();
4218 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
4219 }
4220 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
4221 batch_data->Unref();
4222 GRPC_ERROR_UNREF(retry_state->recv_message_error);
4223 }
4224 batch_data->Unref();
4225 return;
4226 }
4227 // Not retrying, so commit the call.
4228 call->RetryCommit(retry_state);
4229 // Run any necessary closures.
4230 call->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
4231 }
4232
4233 //
4234 // on_complete callback handling
4235 //
4236
AddClosuresForCompletedPendingBatch(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4237 void RetryingCall::AddClosuresForCompletedPendingBatch(
4238 SubchannelCallBatchData* batch_data, grpc_error* error,
4239 CallCombinerClosureList* closures) {
4240 PendingBatch* pending = PendingBatchFind(
4241 "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
4242 // Match the pending batch with the same set of send ops as the
4243 // subchannel batch we've just completed.
4244 return batch->on_complete != nullptr &&
4245 batch_data->batch.send_initial_metadata ==
4246 batch->send_initial_metadata &&
4247 batch_data->batch.send_message == batch->send_message &&
4248 batch_data->batch.send_trailing_metadata ==
4249 batch->send_trailing_metadata;
4250 });
4251 // If batch_data is a replay batch, then there will be no pending
4252 // batch to complete.
4253 if (pending == nullptr) {
4254 GRPC_ERROR_UNREF(error);
4255 return;
4256 }
4257 // Add closure.
4258 closures->Add(pending->batch->on_complete, error,
4259 "on_complete for pending batch");
4260 pending->batch->on_complete = nullptr;
4261 MaybeClearPendingBatch(pending);
4262 }
4263
AddClosuresForReplayOrPendingSendOps(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4264 void RetryingCall::AddClosuresForReplayOrPendingSendOps(
4265 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4266 CallCombinerClosureList* closures) {
4267 bool have_pending_send_message_ops =
4268 retry_state->started_send_message_count < send_messages_.size();
4269 bool have_pending_send_trailing_metadata_op =
4270 seen_send_trailing_metadata_ &&
4271 !retry_state->started_send_trailing_metadata;
4272 if (!have_pending_send_message_ops &&
4273 !have_pending_send_trailing_metadata_op) {
4274 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4275 PendingBatch* pending = &pending_batches_[i];
4276 grpc_transport_stream_op_batch* batch = pending->batch;
4277 if (batch == nullptr || pending->send_ops_cached) continue;
4278 if (batch->send_message) have_pending_send_message_ops = true;
4279 if (batch->send_trailing_metadata) {
4280 have_pending_send_trailing_metadata_op = true;
4281 }
4282 }
4283 }
4284 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
4285 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4286 gpr_log(GPR_INFO,
4287 "chand=%p retrying_call=%p: starting next batch for pending send "
4288 "op(s)",
4289 chand_, this);
4290 }
4291 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
4292 StartRetriableSubchannelBatches, this,
4293 grpc_schedule_on_exec_ctx);
4294 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
4295 "starting next batch for send_* op(s)");
4296 }
4297 }
4298
OnComplete(void * arg,grpc_error * error)4299 void RetryingCall::OnComplete(void* arg, grpc_error* error) {
4300 SubchannelCallBatchData* batch_data =
4301 static_cast<SubchannelCallBatchData*>(arg);
4302 RetryingCall* call = batch_data->call;
4303 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4304 gpr_log(GPR_INFO,
4305 "chand=%p retrying_call=%p: got on_complete, error=%s, batch=%s",
4306 call->chand_, call, grpc_error_string(error),
4307 grpc_transport_stream_op_batch_string(&batch_data->batch).c_str());
4308 }
4309 SubchannelCallRetryState* retry_state =
4310 static_cast<SubchannelCallRetryState*>(
4311 batch_data->lb_call->GetParentData());
4312 // Update bookkeeping in retry_state.
4313 if (batch_data->batch.send_initial_metadata) {
4314 retry_state->completed_send_initial_metadata = true;
4315 }
4316 if (batch_data->batch.send_message) {
4317 ++retry_state->completed_send_message_count;
4318 }
4319 if (batch_data->batch.send_trailing_metadata) {
4320 retry_state->completed_send_trailing_metadata = true;
4321 }
4322 // If the call is committed, free cached data for send ops that we've just
4323 // completed.
4324 if (call->retry_committed_) {
4325 call->FreeCachedSendOpDataForCompletedBatch(batch_data, retry_state);
4326 }
4327 // Construct list of closures to execute.
4328 CallCombinerClosureList closures;
4329 // If a retry was already dispatched, that means we saw
4330 // recv_trailing_metadata before this, so we do nothing here.
4331 // Otherwise, invoke the callback to return the result to the surface.
4332 if (!retry_state->retry_dispatched) {
4333 // Add closure for the completed pending batch, if any.
4334 call->AddClosuresForCompletedPendingBatch(batch_data, GRPC_ERROR_REF(error),
4335 &closures);
4336 // If needed, add a callback to start any replay or pending send ops on
4337 // the subchannel call.
4338 if (!retry_state->completed_recv_trailing_metadata) {
4339 call->AddClosuresForReplayOrPendingSendOps(batch_data, retry_state,
4340 &closures);
4341 }
4342 }
4343 // Track number of pending subchannel send batches and determine if this
4344 // was the last one.
4345 --call->num_pending_retriable_subchannel_send_batches_;
4346 const bool last_send_batch_complete =
4347 call->num_pending_retriable_subchannel_send_batches_ == 0;
4348 // Don't need batch_data anymore.
4349 batch_data->Unref();
4350 // Schedule all of the closures identified above.
4351 // Note: This yeilds the call combiner.
4352 closures.RunClosures(call->call_combiner_);
4353 // If this was the last subchannel send batch, unref the call stack.
4354 if (last_send_batch_complete) {
4355 GRPC_CALL_STACK_UNREF(call->owning_call_, "subchannel_send_batches");
4356 }
4357 }
4358
4359 //
4360 // subchannel batch construction
4361 //
4362
StartBatchInCallCombiner(void * arg,grpc_error *)4363 void RetryingCall::StartBatchInCallCombiner(void* arg,
4364 grpc_error* /*ignored*/) {
4365 grpc_transport_stream_op_batch* batch =
4366 static_cast<grpc_transport_stream_op_batch*>(arg);
4367 auto* lb_call =
4368 static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
4369 // Note: This will release the call combiner.
4370 lb_call->StartTransportStreamOpBatch(batch);
4371 }
4372
AddClosureForSubchannelBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)4373 void RetryingCall::AddClosureForSubchannelBatch(
4374 grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
4375 batch->handler_private.extra_arg = lb_call_.get();
4376 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
4377 batch, grpc_schedule_on_exec_ctx);
4378 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4379 gpr_log(GPR_INFO,
4380 "chand=%p retrying_call=%p: starting subchannel batch: %s", chand_,
4381 this, grpc_transport_stream_op_batch_string(batch).c_str());
4382 }
4383 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
4384 "start_subchannel_batch");
4385 }
4386
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4387 void RetryingCall::AddRetriableSendInitialMetadataOp(
4388 SubchannelCallRetryState* retry_state,
4389 SubchannelCallBatchData* batch_data) {
4390 // Maps the number of retries to the corresponding metadata value slice.
4391 const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
4392 &GRPC_MDSTR_3, &GRPC_MDSTR_4};
4393 // We need to make a copy of the metadata batch for each attempt, since
4394 // the filters in the subchannel stack may modify this batch, and we don't
4395 // want those modifications to be passed forward to subsequent attempts.
4396 //
4397 // If we've already completed one or more attempts, add the
4398 // grpc-retry-attempts header.
4399 retry_state->send_initial_metadata_storage =
4400 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4401 sizeof(grpc_linked_mdelem) *
4402 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
4403 grpc_metadata_batch_copy(&send_initial_metadata_,
4404 &retry_state->send_initial_metadata,
4405 retry_state->send_initial_metadata_storage);
4406 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
4407 .grpc_previous_rpc_attempts != nullptr)) {
4408 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
4409 GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4410 }
4411 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
4412 grpc_mdelem retry_md = grpc_mdelem_create(
4413 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
4414 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
4415 grpc_error* error = grpc_metadata_batch_add_tail(
4416 &retry_state->send_initial_metadata,
4417 &retry_state
4418 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
4419 retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4420 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
4421 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
4422 grpc_error_string(error));
4423 GPR_ASSERT(false);
4424 }
4425 }
4426 retry_state->started_send_initial_metadata = true;
4427 batch_data->batch.send_initial_metadata = true;
4428 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
4429 &retry_state->send_initial_metadata;
4430 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
4431 send_initial_metadata_flags_;
4432 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
4433 }
4434
AddRetriableSendMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4435 void RetryingCall::AddRetriableSendMessageOp(
4436 SubchannelCallRetryState* retry_state,
4437 SubchannelCallBatchData* batch_data) {
4438 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4439 gpr_log(GPR_INFO,
4440 "chand=%p retrying_call=%p: starting calld->send_messages[%" PRIuPTR
4441 "]",
4442 chand_, this, retry_state->started_send_message_count);
4443 }
4444 ByteStreamCache* cache =
4445 send_messages_[retry_state->started_send_message_count];
4446 ++retry_state->started_send_message_count;
4447 retry_state->send_message.Init(cache);
4448 batch_data->batch.send_message = true;
4449 batch_data->batch.payload->send_message.send_message.reset(
4450 retry_state->send_message.get());
4451 }
4452
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4453 void RetryingCall::AddRetriableSendTrailingMetadataOp(
4454 SubchannelCallRetryState* retry_state,
4455 SubchannelCallBatchData* batch_data) {
4456 // We need to make a copy of the metadata batch for each attempt, since
4457 // the filters in the subchannel stack may modify this batch, and we don't
4458 // want those modifications to be passed forward to subsequent attempts.
4459 retry_state->send_trailing_metadata_storage =
4460 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4461 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
4462 grpc_metadata_batch_copy(&send_trailing_metadata_,
4463 &retry_state->send_trailing_metadata,
4464 retry_state->send_trailing_metadata_storage);
4465 retry_state->started_send_trailing_metadata = true;
4466 batch_data->batch.send_trailing_metadata = true;
4467 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
4468 &retry_state->send_trailing_metadata;
4469 }
4470
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4471 void RetryingCall::AddRetriableRecvInitialMetadataOp(
4472 SubchannelCallRetryState* retry_state,
4473 SubchannelCallBatchData* batch_data) {
4474 retry_state->started_recv_initial_metadata = true;
4475 batch_data->batch.recv_initial_metadata = true;
4476 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
4477 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
4478 &retry_state->recv_initial_metadata;
4479 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
4480 &retry_state->trailing_metadata_available;
4481 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4482 RecvInitialMetadataReady, batch_data,
4483 grpc_schedule_on_exec_ctx);
4484 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
4485 &retry_state->recv_initial_metadata_ready;
4486 }
4487
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4488 void RetryingCall::AddRetriableRecvMessageOp(
4489 SubchannelCallRetryState* retry_state,
4490 SubchannelCallBatchData* batch_data) {
4491 ++retry_state->started_recv_message_count;
4492 batch_data->batch.recv_message = true;
4493 batch_data->batch.payload->recv_message.recv_message =
4494 &retry_state->recv_message;
4495 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
4496 batch_data, grpc_schedule_on_exec_ctx);
4497 batch_data->batch.payload->recv_message.recv_message_ready =
4498 &retry_state->recv_message_ready;
4499 }
4500
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4501 void RetryingCall::AddRetriableRecvTrailingMetadataOp(
4502 SubchannelCallRetryState* retry_state,
4503 SubchannelCallBatchData* batch_data) {
4504 retry_state->started_recv_trailing_metadata = true;
4505 batch_data->batch.recv_trailing_metadata = true;
4506 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
4507 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
4508 &retry_state->recv_trailing_metadata;
4509 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
4510 &retry_state->collect_stats;
4511 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
4512 RecvTrailingMetadataReady, batch_data,
4513 grpc_schedule_on_exec_ctx);
4514 batch_data->batch.payload->recv_trailing_metadata
4515 .recv_trailing_metadata_ready =
4516 &retry_state->recv_trailing_metadata_ready;
4517 }
4518
StartInternalRecvTrailingMetadata()4519 void RetryingCall::StartInternalRecvTrailingMetadata() {
4520 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4521 gpr_log(
4522 GPR_INFO,
4523 "chand=%p retrying_call=%p: call failed but recv_trailing_metadata not "
4524 "started; starting it internally",
4525 chand_, this);
4526 }
4527 SubchannelCallRetryState* retry_state =
4528 static_cast<SubchannelCallRetryState*>(lb_call_->GetParentData());
4529 // Create batch_data with 2 refs, since this batch will be unreffed twice:
4530 // once for the recv_trailing_metadata_ready callback when the subchannel
4531 // batch returns, and again when we actually get a recv_trailing_metadata
4532 // op from the surface.
4533 SubchannelCallBatchData* batch_data =
4534 SubchannelCallBatchData::Create(this, 2, false /* set_on_complete */);
4535 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4536 retry_state->recv_trailing_metadata_internal_batch = batch_data;
4537 // Note: This will release the call combiner.
4538 lb_call_->StartTransportStreamOpBatch(&batch_data->batch);
4539 }
4540
4541 // If there are any cached send ops that need to be replayed on the
4542 // current subchannel call, creates and returns a new subchannel batch
4543 // to replay those ops. Otherwise, returns nullptr.
4544 RetryingCall::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(SubchannelCallRetryState * retry_state)4545 RetryingCall::MaybeCreateSubchannelBatchForReplay(
4546 SubchannelCallRetryState* retry_state) {
4547 SubchannelCallBatchData* replay_batch_data = nullptr;
4548 // send_initial_metadata.
4549 if (seen_send_initial_metadata_ &&
4550 !retry_state->started_send_initial_metadata &&
4551 !pending_send_initial_metadata_) {
4552 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4553 gpr_log(GPR_INFO,
4554 "chand=%p retrying_call=%p: replaying previously completed "
4555 "send_initial_metadata op",
4556 chand_, this);
4557 }
4558 replay_batch_data =
4559 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4560 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
4561 }
4562 // send_message.
4563 // Note that we can only have one send_message op in flight at a time.
4564 if (retry_state->started_send_message_count < send_messages_.size() &&
4565 retry_state->started_send_message_count ==
4566 retry_state->completed_send_message_count &&
4567 !pending_send_message_) {
4568 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4569 gpr_log(GPR_INFO,
4570 "chand=%p retrying_call=%p: replaying previously completed "
4571 "send_message op",
4572 chand_, this);
4573 }
4574 if (replay_batch_data == nullptr) {
4575 replay_batch_data =
4576 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4577 }
4578 AddRetriableSendMessageOp(retry_state, replay_batch_data);
4579 }
4580 // send_trailing_metadata.
4581 // Note that we only add this op if we have no more send_message ops
4582 // to start, since we can't send down any more send_message ops after
4583 // send_trailing_metadata.
4584 if (seen_send_trailing_metadata_ &&
4585 retry_state->started_send_message_count == send_messages_.size() &&
4586 !retry_state->started_send_trailing_metadata &&
4587 !pending_send_trailing_metadata_) {
4588 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4589 gpr_log(GPR_INFO,
4590 "chand=%p retrying_call=%p: replaying previously completed "
4591 "send_trailing_metadata op",
4592 chand_, this);
4593 }
4594 if (replay_batch_data == nullptr) {
4595 replay_batch_data =
4596 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4597 }
4598 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
4599 }
4600 return replay_batch_data;
4601 }
4602
AddSubchannelBatchesForPendingBatches(SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4603 void RetryingCall::AddSubchannelBatchesForPendingBatches(
4604 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
4605 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4606 PendingBatch* pending = &pending_batches_[i];
4607 grpc_transport_stream_op_batch* batch = pending->batch;
4608 if (batch == nullptr) continue;
4609 // Skip any batch that either (a) has already been started on this
4610 // subchannel call or (b) we can't start yet because we're still
4611 // replaying send ops that need to be completed first.
4612 // TODO(roth): Note that if any one op in the batch can't be sent
4613 // yet due to ops that we're replaying, we don't start any of the ops
4614 // in the batch. This is probably okay, but it could conceivably
4615 // lead to increased latency in some cases -- e.g., we could delay
4616 // starting a recv op due to it being in the same batch with a send
4617 // op. If/when we revamp the callback protocol in
4618 // transport_stream_op_batch, we may be able to fix this.
4619 if (batch->send_initial_metadata &&
4620 retry_state->started_send_initial_metadata) {
4621 continue;
4622 }
4623 if (batch->send_message && retry_state->completed_send_message_count <
4624 retry_state->started_send_message_count) {
4625 continue;
4626 }
4627 // Note that we only start send_trailing_metadata if we have no more
4628 // send_message ops to start, since we can't send down any more
4629 // send_message ops after send_trailing_metadata.
4630 if (batch->send_trailing_metadata &&
4631 (retry_state->started_send_message_count + batch->send_message <
4632 send_messages_.size() ||
4633 retry_state->started_send_trailing_metadata)) {
4634 continue;
4635 }
4636 if (batch->recv_initial_metadata &&
4637 retry_state->started_recv_initial_metadata) {
4638 continue;
4639 }
4640 if (batch->recv_message && retry_state->completed_recv_message_count <
4641 retry_state->started_recv_message_count) {
4642 continue;
4643 }
4644 if (batch->recv_trailing_metadata &&
4645 retry_state->started_recv_trailing_metadata) {
4646 // If we previously completed a recv_trailing_metadata op
4647 // initiated by StartInternalRecvTrailingMetadata(), use the
4648 // result of that instead of trying to re-start this op.
4649 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
4650 nullptr))) {
4651 // If the batch completed, then trigger the completion callback
4652 // directly, so that we return the previously returned results to
4653 // the application. Otherwise, just unref the internally
4654 // started subchannel batch, since we'll propagate the
4655 // completion when it completes.
4656 if (retry_state->completed_recv_trailing_metadata) {
4657 // Batches containing recv_trailing_metadata always succeed.
4658 closures->Add(
4659 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
4660 "re-executing recv_trailing_metadata_ready to propagate "
4661 "internally triggered result");
4662 } else {
4663 retry_state->recv_trailing_metadata_internal_batch->Unref();
4664 }
4665 retry_state->recv_trailing_metadata_internal_batch = nullptr;
4666 }
4667 continue;
4668 }
4669 // If we're not retrying, just send the batch as-is.
4670 // TODO(roth): This condition doesn't seem exactly right -- maybe need a
4671 // notion of "draining" once we've committed and are done replaying?
4672 if (retry_policy_ == nullptr || retry_committed_) {
4673 AddClosureForSubchannelBatch(batch, closures);
4674 PendingBatchClear(pending);
4675 continue;
4676 }
4677 // Create batch with the right number of callbacks.
4678 const bool has_send_ops = batch->send_initial_metadata ||
4679 batch->send_message ||
4680 batch->send_trailing_metadata;
4681 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
4682 batch->recv_message +
4683 batch->recv_trailing_metadata;
4684 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
4685 this, num_callbacks, has_send_ops /* set_on_complete */);
4686 // Cache send ops if needed.
4687 MaybeCacheSendOpsForBatch(pending);
4688 // send_initial_metadata.
4689 if (batch->send_initial_metadata) {
4690 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
4691 }
4692 // send_message.
4693 if (batch->send_message) {
4694 AddRetriableSendMessageOp(retry_state, batch_data);
4695 }
4696 // send_trailing_metadata.
4697 if (batch->send_trailing_metadata) {
4698 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
4699 }
4700 // recv_initial_metadata.
4701 if (batch->recv_initial_metadata) {
4702 // recv_flags is only used on the server side.
4703 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
4704 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
4705 }
4706 // recv_message.
4707 if (batch->recv_message) {
4708 AddRetriableRecvMessageOp(retry_state, batch_data);
4709 }
4710 // recv_trailing_metadata.
4711 if (batch->recv_trailing_metadata) {
4712 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4713 }
4714 AddClosureForSubchannelBatch(&batch_data->batch, closures);
4715 // Track number of pending subchannel send batches.
4716 // If this is the first one, take a ref to the call stack.
4717 if (batch->send_initial_metadata || batch->send_message ||
4718 batch->send_trailing_metadata) {
4719 if (num_pending_retriable_subchannel_send_batches_ == 0) {
4720 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
4721 }
4722 ++num_pending_retriable_subchannel_send_batches_;
4723 }
4724 }
4725 }
4726
StartRetriableSubchannelBatches(void * arg,grpc_error *)4727 void RetryingCall::StartRetriableSubchannelBatches(void* arg,
4728 grpc_error* /*ignored*/) {
4729 RetryingCall* call = static_cast<RetryingCall*>(arg);
4730 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4731 gpr_log(GPR_INFO,
4732 "chand=%p retrying_call=%p: constructing retriable batches",
4733 call->chand_, call);
4734 }
4735 SubchannelCallRetryState* retry_state =
4736 static_cast<SubchannelCallRetryState*>(call->lb_call_->GetParentData());
4737 // Construct list of closures to execute, one for each pending batch.
4738 CallCombinerClosureList closures;
4739 // Replay previously-returned send_* ops if needed.
4740 SubchannelCallBatchData* replay_batch_data =
4741 call->MaybeCreateSubchannelBatchForReplay(retry_state);
4742 if (replay_batch_data != nullptr) {
4743 call->AddClosureForSubchannelBatch(&replay_batch_data->batch, &closures);
4744 // Track number of pending subchannel send batches.
4745 // If this is the first one, take a ref to the call stack.
4746 if (call->num_pending_retriable_subchannel_send_batches_ == 0) {
4747 GRPC_CALL_STACK_REF(call->owning_call_, "subchannel_send_batches");
4748 }
4749 ++call->num_pending_retriable_subchannel_send_batches_;
4750 }
4751 // Now add pending batches.
4752 call->AddSubchannelBatchesForPendingBatches(retry_state, &closures);
4753 // Start batches on subchannel call.
4754 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4755 gpr_log(GPR_INFO,
4756 "chand=%p retrying_call=%p: starting %" PRIuPTR
4757 " retriable batches on lb_call=%p",
4758 call->chand_, call, closures.size(), call->lb_call_.get());
4759 }
4760 // Note: This will yield the call combiner.
4761 closures.RunClosures(call->call_combiner_);
4762 }
4763
CreateLbCall(void * arg,grpc_error *)4764 void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) {
4765 auto* call = static_cast<RetryingCall*>(arg);
4766 const size_t parent_data_size =
4767 call->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
4768 grpc_call_element_args args = {call->owning_call_, nullptr,
4769 call->call_context_, call->path_,
4770 call->call_start_time_, call->deadline_,
4771 call->arena_, call->call_combiner_};
4772 call->lb_call_ = LoadBalancedCall::Create(call->chand_, args, call->pollent_,
4773 parent_data_size);
4774 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
4775 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: create lb_call=%p",
4776 call->chand_, call, call->lb_call_.get());
4777 }
4778 if (parent_data_size > 0) {
4779 new (call->lb_call_->GetParentData())
4780 SubchannelCallRetryState(call->call_context_);
4781 }
4782 call->PendingBatchesResume();
4783 }
4784
4785 //
4786 // LoadBalancedCall::Metadata
4787 //
4788
4789 class LoadBalancedCall::Metadata
4790 : public LoadBalancingPolicy::MetadataInterface {
4791 public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)4792 Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
4793 : lb_call_(lb_call), batch_(batch) {}
4794
Add(absl::string_view key,absl::string_view value)4795 void Add(absl::string_view key, absl::string_view value) override {
4796 grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
4797 lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
4798 linked_mdelem->md = grpc_mdelem_from_slices(
4799 ExternallyManagedSlice(key.data(), key.size()),
4800 ExternallyManagedSlice(value.data(), value.size()));
4801 GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
4802 GRPC_ERROR_NONE);
4803 }
4804
begin() const4805 iterator begin() const override {
4806 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4807 "iterator size too large");
4808 return iterator(
4809 this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
4810 }
end() const4811 iterator end() const override {
4812 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4813 "iterator size too large");
4814 return iterator(this, 0);
4815 }
4816
erase(iterator it)4817 iterator erase(iterator it) override {
4818 grpc_linked_mdelem* linked_mdelem =
4819 reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
4820 intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
4821 grpc_metadata_batch_remove(batch_, linked_mdelem);
4822 return iterator(this, handle);
4823 }
4824
4825 private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const4826 grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
4827 if (entry != nullptr && batch_->idx.named.path == entry) {
4828 return entry->next;
4829 }
4830 return entry;
4831 }
4832
IteratorHandleNext(intptr_t handle) const4833 intptr_t IteratorHandleNext(intptr_t handle) const override {
4834 grpc_linked_mdelem* linked_mdelem =
4835 reinterpret_cast<grpc_linked_mdelem*>(handle);
4836 return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
4837 }
4838
IteratorHandleGet(intptr_t handle) const4839 std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
4840 intptr_t handle) const override {
4841 grpc_linked_mdelem* linked_mdelem =
4842 reinterpret_cast<grpc_linked_mdelem*>(handle);
4843 return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
4844 StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
4845 }
4846
4847 LoadBalancedCall* lb_call_;
4848 grpc_metadata_batch* batch_;
4849 };
4850
4851 //
4852 // LoadBalancedCall::LbCallState
4853 //
4854
4855 class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState {
4856 public:
LbCallState(LoadBalancedCall * lb_call)4857 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
4858
Alloc(size_t size)4859 void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
4860
GetBackendMetricData()4861 const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
4862 override {
4863 if (lb_call_->backend_metric_data_ == nullptr) {
4864 grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
4865 .x_endpoint_load_metrics_bin;
4866 if (md != nullptr) {
4867 lb_call_->backend_metric_data_ =
4868 ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
4869 }
4870 }
4871 return lb_call_->backend_metric_data_;
4872 }
4873
ExperimentalGetCallAttribute(const char * key)4874 absl::string_view ExperimentalGetCallAttribute(const char* key) override {
4875 auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
4876 lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
4877 auto& call_attributes = service_config_call_data->call_attributes();
4878 auto it = call_attributes.find(key);
4879 if (it == call_attributes.end()) return absl::string_view();
4880 return it->second;
4881 }
4882
4883 private:
4884 LoadBalancedCall* lb_call_;
4885 };
4886
4887 //
4888 // LoadBalancedCall
4889 //
4890
Create(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,size_t parent_data_size)4891 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Create(
4892 ChannelData* chand, const grpc_call_element_args& args,
4893 grpc_polling_entity* pollent, size_t parent_data_size) {
4894 const size_t alloc_size =
4895 parent_data_size > 0
4896 ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) +
4897 parent_data_size)
4898 : sizeof(LoadBalancedCall);
4899 auto* lb_call = static_cast<LoadBalancedCall*>(args.arena->Alloc(alloc_size));
4900 new (lb_call) LoadBalancedCall(chand, args, pollent);
4901 return lb_call;
4902 }
4903
LoadBalancedCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent)4904 LoadBalancedCall::LoadBalancedCall(ChannelData* chand,
4905 const grpc_call_element_args& args,
4906 grpc_polling_entity* pollent)
4907 : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
4908 ? "LoadBalancedCall"
4909 : nullptr),
4910 chand_(chand),
4911 path_(grpc_slice_ref_internal(args.path)),
4912 call_start_time_(args.start_time),
4913 deadline_(args.deadline),
4914 arena_(args.arena),
4915 owning_call_(args.call_stack),
4916 call_combiner_(args.call_combiner),
4917 call_context_(args.context),
4918 pollent_(pollent) {}
4919
~LoadBalancedCall()4920 LoadBalancedCall::~LoadBalancedCall() {
4921 grpc_slice_unref_internal(path_);
4922 GRPC_ERROR_UNREF(cancel_error_);
4923 if (backend_metric_data_ != nullptr) {
4924 backend_metric_data_
4925 ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
4926 }
4927 // Make sure there are no remaining pending batches.
4928 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4929 GPR_ASSERT(pending_batches_[i] == nullptr);
4930 }
4931 }
4932
Ref()4933 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref() {
4934 IncrementRefCount();
4935 return RefCountedPtr<LoadBalancedCall>(this);
4936 }
4937
Ref(const DebugLocation & location,const char * reason)4938 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref(
4939 const DebugLocation& location, const char* reason) {
4940 IncrementRefCount(location, reason);
4941 return RefCountedPtr<LoadBalancedCall>(this);
4942 }
4943
Unref()4944 void LoadBalancedCall::Unref() {
4945 if (GPR_UNLIKELY(refs_.Unref())) {
4946 this->~LoadBalancedCall();
4947 }
4948 }
4949
Unref(const DebugLocation & location,const char * reason)4950 void LoadBalancedCall::Unref(const DebugLocation& location,
4951 const char* reason) {
4952 if (GPR_UNLIKELY(refs_.Unref(location, reason))) {
4953 this->~LoadBalancedCall();
4954 }
4955 }
4956
IncrementRefCount()4957 void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); }
4958
IncrementRefCount(const DebugLocation & location,const char * reason)4959 void LoadBalancedCall::IncrementRefCount(const DebugLocation& location,
4960 const char* reason) {
4961 refs_.Ref(location, reason);
4962 }
4963
GetParentData()4964 void* LoadBalancedCall::GetParentData() {
4965 return reinterpret_cast<char*>(this) +
4966 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall));
4967 }
4968
GetBatchIndex(grpc_transport_stream_op_batch * batch)4969 size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
4970 // Note: It is important the send_initial_metadata be the first entry
4971 // here, since the code in pick_subchannel_locked() assumes it will be.
4972 if (batch->send_initial_metadata) return 0;
4973 if (batch->send_message) return 1;
4974 if (batch->send_trailing_metadata) return 2;
4975 if (batch->recv_initial_metadata) return 3;
4976 if (batch->recv_message) return 4;
4977 if (batch->recv_trailing_metadata) return 5;
4978 GPR_UNREACHABLE_CODE(return (size_t)-1);
4979 }
4980
4981 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)4982 void LoadBalancedCall::PendingBatchesAdd(
4983 grpc_transport_stream_op_batch* batch) {
4984 const size_t idx = GetBatchIndex(batch);
4985 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4986 gpr_log(GPR_INFO,
4987 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
4988 chand_, this, idx);
4989 }
4990 GPR_ASSERT(pending_batches_[idx] == nullptr);
4991 pending_batches_[idx] = batch;
4992 }
4993
4994 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)4995 void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg,
4996 grpc_error* error) {
4997 grpc_transport_stream_op_batch* batch =
4998 static_cast<grpc_transport_stream_op_batch*>(arg);
4999 auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
5000 // Note: This will release the call combiner.
5001 grpc_transport_stream_op_batch_finish_with_failure(
5002 batch, GRPC_ERROR_REF(error), self->call_combiner_);
5003 }
5004
5005 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)5006 void LoadBalancedCall::PendingBatchesFail(
5007 grpc_error* error,
5008 YieldCallCombinerPredicate yield_call_combiner_predicate) {
5009 GPR_ASSERT(error != GRPC_ERROR_NONE);
5010 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5011 size_t num_batches = 0;
5012 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5013 if (pending_batches_[i] != nullptr) ++num_batches;
5014 }
5015 gpr_log(GPR_INFO,
5016 "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
5017 chand_, this, num_batches, grpc_error_string(error));
5018 }
5019 CallCombinerClosureList closures;
5020 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5021 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5022 if (batch != nullptr) {
5023 batch->handler_private.extra_arg = this;
5024 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5025 FailPendingBatchInCallCombiner, batch,
5026 grpc_schedule_on_exec_ctx);
5027 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
5028 "PendingBatchesFail");
5029 batch = nullptr;
5030 }
5031 }
5032 if (yield_call_combiner_predicate(closures)) {
5033 closures.RunClosures(call_combiner_);
5034 } else {
5035 closures.RunClosuresWithoutYielding(call_combiner_);
5036 }
5037 GRPC_ERROR_UNREF(error);
5038 }
5039
5040 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)5041 void LoadBalancedCall::ResumePendingBatchInCallCombiner(
5042 void* arg, grpc_error* /*ignored*/) {
5043 grpc_transport_stream_op_batch* batch =
5044 static_cast<grpc_transport_stream_op_batch*>(arg);
5045 SubchannelCall* subchannel_call =
5046 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
5047 // Note: This will release the call combiner.
5048 subchannel_call->StartTransportStreamOpBatch(batch);
5049 }
5050
5051 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()5052 void LoadBalancedCall::PendingBatchesResume() {
5053 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5054 size_t num_batches = 0;
5055 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5056 if (pending_batches_[i] != nullptr) ++num_batches;
5057 }
5058 gpr_log(GPR_INFO,
5059 "chand=%p lb_call=%p: starting %" PRIuPTR
5060 " pending batches on subchannel_call=%p",
5061 chand_, this, num_batches, subchannel_call_.get());
5062 }
5063 CallCombinerClosureList closures;
5064 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5065 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5066 if (batch != nullptr) {
5067 batch->handler_private.extra_arg = subchannel_call_.get();
5068 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5069 ResumePendingBatchInCallCombiner, batch,
5070 grpc_schedule_on_exec_ctx);
5071 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
5072 "PendingBatchesResume");
5073 batch = nullptr;
5074 }
5075 }
5076 // Note: This will release the call combiner.
5077 closures.RunClosures(call_combiner_);
5078 }
5079
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)5080 void LoadBalancedCall::StartTransportStreamOpBatch(
5081 grpc_transport_stream_op_batch* batch) {
5082 // Intercept recv_trailing_metadata_ready for LB callback.
5083 if (batch->recv_trailing_metadata) {
5084 InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
5085 }
5086 // If we've previously been cancelled, immediately fail any new batches.
5087 if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
5088 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5089 gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
5090 chand_, this, grpc_error_string(cancel_error_));
5091 }
5092 // Note: This will release the call combiner.
5093 grpc_transport_stream_op_batch_finish_with_failure(
5094 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5095 return;
5096 }
5097 // Handle cancellation.
5098 if (GPR_UNLIKELY(batch->cancel_stream)) {
5099 // Stash a copy of cancel_error in our call data, so that we can use
5100 // it for subsequent operations. This ensures that if the call is
5101 // cancelled before any batches are passed down (e.g., if the deadline
5102 // is in the past when the call starts), we can return the right
5103 // error to the caller when the first batch does get passed down.
5104 GRPC_ERROR_UNREF(cancel_error_);
5105 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
5106 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5107 gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
5108 chand_, this, grpc_error_string(cancel_error_));
5109 }
5110 // If we do not have a subchannel call (i.e., a pick has not yet
5111 // been started), fail all pending batches. Otherwise, send the
5112 // cancellation down to the subchannel call.
5113 if (subchannel_call_ == nullptr) {
5114 PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
5115 // Note: This will release the call combiner.
5116 grpc_transport_stream_op_batch_finish_with_failure(
5117 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5118 } else {
5119 // Note: This will release the call combiner.
5120 subchannel_call_->StartTransportStreamOpBatch(batch);
5121 }
5122 return;
5123 }
5124 // Add the batch to the pending list.
5125 PendingBatchesAdd(batch);
5126 // Check if we've already gotten a subchannel call.
5127 // Note that once we have picked a subchannel, we do not need to acquire
5128 // the channel's data plane mutex, which is more efficient (especially for
5129 // streaming calls).
5130 if (subchannel_call_ != nullptr) {
5131 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5132 gpr_log(GPR_INFO,
5133 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
5134 chand_, this, subchannel_call_.get());
5135 }
5136 PendingBatchesResume();
5137 return;
5138 }
5139 // We do not yet have a subchannel call.
5140 // For batches containing a send_initial_metadata op, acquire the
5141 // channel's data plane mutex to pick a subchannel.
5142 if (GPR_LIKELY(batch->send_initial_metadata)) {
5143 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5144 gpr_log(GPR_INFO,
5145 "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
5146 chand_, this);
5147 }
5148 PickSubchannel(this, GRPC_ERROR_NONE);
5149 } else {
5150 // For all other batches, release the call combiner.
5151 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5152 gpr_log(GPR_INFO,
5153 "chand=%p lb_call=%p: saved batch, yielding call combiner",
5154 chand_, this);
5155 }
5156 GRPC_CALL_COMBINER_STOP(call_combiner_,
5157 "batch does not include send_initial_metadata");
5158 }
5159 }
5160
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)5161 void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy(
5162 void* arg, grpc_error* error) {
5163 auto* self = static_cast<LoadBalancedCall*>(arg);
5164 if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
5165 // Set error if call did not succeed.
5166 grpc_error* error_for_lb = GRPC_ERROR_NONE;
5167 if (error != GRPC_ERROR_NONE) {
5168 error_for_lb = error;
5169 } else {
5170 const auto& fields = self->recv_trailing_metadata_->idx.named;
5171 GPR_ASSERT(fields.grpc_status != nullptr);
5172 grpc_status_code status =
5173 grpc_get_status_code_from_metadata(fields.grpc_status->md);
5174 std::string msg;
5175 if (status != GRPC_STATUS_OK) {
5176 error_for_lb = grpc_error_set_int(
5177 GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
5178 GRPC_ERROR_INT_GRPC_STATUS, status);
5179 if (fields.grpc_message != nullptr) {
5180 error_for_lb = grpc_error_set_str(
5181 error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
5182 grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
5183 }
5184 }
5185 }
5186 // Invoke callback to LB policy.
5187 Metadata trailing_metadata(self, self->recv_trailing_metadata_);
5188 LbCallState lb_call_state(self);
5189 self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
5190 &lb_call_state);
5191 if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
5192 }
5193 // Chain to original callback.
5194 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
5195 GRPC_ERROR_REF(error));
5196 }
5197
5198 // TODO(roth): Consider not intercepting this callback unless we
5199 // actually need to, if this causes a performance problem.
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)5200 void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
5201 grpc_transport_stream_op_batch* batch) {
5202 recv_trailing_metadata_ =
5203 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
5204 original_recv_trailing_metadata_ready_ =
5205 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
5206 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
5207 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
5208 grpc_schedule_on_exec_ctx);
5209 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
5210 &recv_trailing_metadata_ready_;
5211 }
5212
CreateSubchannelCall()5213 void LoadBalancedCall::CreateSubchannelCall() {
5214 SubchannelCall::Args call_args = {
5215 std::move(connected_subchannel_), pollent_, path_, call_start_time_,
5216 deadline_, arena_,
5217 // TODO(roth): When we implement hedging support, we will probably
5218 // need to use a separate call context for each subchannel call.
5219 call_context_, call_combiner_};
5220 grpc_error* error = GRPC_ERROR_NONE;
5221 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
5222 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5223 gpr_log(GPR_INFO,
5224 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
5225 this, subchannel_call_.get(), grpc_error_string(error));
5226 }
5227 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
5228 PendingBatchesFail(error, YieldCallCombiner);
5229 } else {
5230 PendingBatchesResume();
5231 }
5232 }
5233
5234 // A class to handle the call combiner cancellation callback for a
5235 // queued pick.
5236 // TODO(roth): When we implement hedging support, we won't be able to
5237 // register a call combiner cancellation closure for each LB pick,
5238 // because there may be multiple LB picks happening in parallel.
5239 // Instead, we will probably need to maintain a list in the CallData
5240 // object of pending LB picks to be cancelled when the closure runs.
5241 class LoadBalancedCall::LbQueuedCallCanceller {
5242 public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)5243 explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
5244 : lb_call_(std::move(lb_call)) {
5245 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
5246 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
5247 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
5248 }
5249
5250 private:
CancelLocked(void * arg,grpc_error * error)5251 static void CancelLocked(void* arg, grpc_error* error) {
5252 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
5253 auto* lb_call = self->lb_call_.get();
5254 auto* chand = lb_call->chand_;
5255 {
5256 MutexLock lock(chand->data_plane_mu());
5257 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5258 gpr_log(GPR_INFO,
5259 "chand=%p lb_call=%p: cancelling queued pick: "
5260 "error=%s self=%p calld->pick_canceller=%p",
5261 chand, lb_call, grpc_error_string(error), self,
5262 lb_call->lb_call_canceller_);
5263 }
5264 if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
5265 // Remove pick from list of queued picks.
5266 lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
5267 // Fail pending batches on the call.
5268 lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
5269 YieldCallCombinerIfPendingBatchesFound);
5270 }
5271 }
5272 GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
5273 delete self;
5274 }
5275
5276 RefCountedPtr<LoadBalancedCall> lb_call_;
5277 grpc_closure closure_;
5278 };
5279
MaybeRemoveCallFromLbQueuedCallsLocked()5280 void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
5281 if (!queued_pending_lb_pick_) return;
5282 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5283 gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
5284 chand_, this);
5285 }
5286 chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
5287 queued_pending_lb_pick_ = false;
5288 // Lame the call combiner canceller.
5289 lb_call_canceller_ = nullptr;
5290 }
5291
MaybeAddCallToLbQueuedCallsLocked()5292 void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
5293 if (queued_pending_lb_pick_) return;
5294 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5295 gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
5296 chand_, this);
5297 }
5298 queued_pending_lb_pick_ = true;
5299 queued_call_.lb_call = this;
5300 chand_->AddLbQueuedCall(&queued_call_, pollent_);
5301 // Register call combiner cancellation callback.
5302 lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
5303 }
5304
AsyncPickDone(grpc_error * error)5305 void LoadBalancedCall::AsyncPickDone(grpc_error* error) {
5306 GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
5307 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
5308 }
5309
PickDone(void * arg,grpc_error * error)5310 void LoadBalancedCall::PickDone(void* arg, grpc_error* error) {
5311 auto* self = static_cast<LoadBalancedCall*>(arg);
5312 if (error != GRPC_ERROR_NONE) {
5313 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5314 gpr_log(GPR_INFO,
5315 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
5316 self->chand_, self, grpc_error_string(error));
5317 }
5318 self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
5319 return;
5320 }
5321 self->CreateSubchannelCall();
5322 }
5323
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)5324 const char* PickResultTypeName(
5325 LoadBalancingPolicy::PickResult::ResultType type) {
5326 switch (type) {
5327 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
5328 return "COMPLETE";
5329 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5330 return "QUEUE";
5331 case LoadBalancingPolicy::PickResult::PICK_FAILED:
5332 return "FAILED";
5333 }
5334 GPR_UNREACHABLE_CODE(return "UNKNOWN");
5335 }
5336
PickSubchannel(void * arg,grpc_error * error)5337 void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) {
5338 auto* self = static_cast<LoadBalancedCall*>(arg);
5339 bool pick_complete;
5340 {
5341 MutexLock lock(self->chand_->data_plane_mu());
5342 pick_complete = self->PickSubchannelLocked(&error);
5343 }
5344 if (pick_complete) {
5345 PickDone(self, error);
5346 GRPC_ERROR_UNREF(error);
5347 }
5348 }
5349
PickSubchannelLocked(grpc_error ** error)5350 bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) {
5351 GPR_ASSERT(connected_subchannel_ == nullptr);
5352 GPR_ASSERT(subchannel_call_ == nullptr);
5353 // Grab initial metadata.
5354 auto& send_initial_metadata =
5355 pending_batches_[0]->payload->send_initial_metadata;
5356 grpc_metadata_batch* initial_metadata_batch =
5357 send_initial_metadata.send_initial_metadata;
5358 const uint32_t send_initial_metadata_flags =
5359 send_initial_metadata.send_initial_metadata_flags;
5360 // Perform LB pick.
5361 LoadBalancingPolicy::PickArgs pick_args;
5362 pick_args.path = StringViewFromSlice(path_);
5363 LbCallState lb_call_state(this);
5364 pick_args.call_state = &lb_call_state;
5365 Metadata initial_metadata(this, initial_metadata_batch);
5366 pick_args.initial_metadata = &initial_metadata;
5367 auto result = chand_->picker()->Pick(pick_args);
5368 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5369 gpr_log(
5370 GPR_INFO,
5371 "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
5372 chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
5373 grpc_error_string(result.error));
5374 }
5375 switch (result.type) {
5376 case LoadBalancingPolicy::PickResult::PICK_FAILED: {
5377 // If we're shutting down, fail all RPCs.
5378 grpc_error* disconnect_error = chand_->disconnect_error();
5379 if (disconnect_error != GRPC_ERROR_NONE) {
5380 GRPC_ERROR_UNREF(result.error);
5381 MaybeRemoveCallFromLbQueuedCallsLocked();
5382 *error = GRPC_ERROR_REF(disconnect_error);
5383 return true;
5384 }
5385 // If wait_for_ready is false, then the error indicates the RPC
5386 // attempt's final status.
5387 if ((send_initial_metadata_flags &
5388 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
5389 grpc_error* new_error =
5390 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
5391 "Failed to pick subchannel", &result.error, 1);
5392 GRPC_ERROR_UNREF(result.error);
5393 *error = new_error;
5394 MaybeRemoveCallFromLbQueuedCallsLocked();
5395 return true;
5396 }
5397 // If wait_for_ready is true, then queue to retry when we get a new
5398 // picker.
5399 GRPC_ERROR_UNREF(result.error);
5400 }
5401 // Fallthrough
5402 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5403 MaybeAddCallToLbQueuedCallsLocked();
5404 return false;
5405 default: // PICK_COMPLETE
5406 MaybeRemoveCallFromLbQueuedCallsLocked();
5407 // Handle drops.
5408 if (GPR_UNLIKELY(result.subchannel == nullptr)) {
5409 result.error = grpc_error_set_int(
5410 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
5411 "Call dropped by load balancing policy"),
5412 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
5413 } else {
5414 // Grab a ref to the connected subchannel while we're still
5415 // holding the data plane mutex.
5416 connected_subchannel_ =
5417 chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
5418 GPR_ASSERT(connected_subchannel_ != nullptr);
5419 }
5420 lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
5421 *error = result.error;
5422 return true;
5423 }
5424 }
5425
5426 } // namespace
5427 } // namespace grpc_core
5428
5429 /*************************************************************************
5430 * EXPORTED SYMBOLS
5431 */
5432
5433 using grpc_core::CallData;
5434 using grpc_core::ChannelData;
5435
5436 const grpc_channel_filter grpc_client_channel_filter = {
5437 CallData::StartTransportStreamOpBatch,
5438 ChannelData::StartTransportOp,
5439 sizeof(CallData),
5440 CallData::Init,
5441 CallData::SetPollent,
5442 CallData::Destroy,
5443 sizeof(ChannelData),
5444 ChannelData::Init,
5445 ChannelData::Destroy,
5446 ChannelData::GetChannelInfo,
5447 "client-channel",
5448 };
5449
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)5450 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
5451 grpc_channel_element* elem, int try_to_connect) {
5452 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5453 return chand->CheckConnectivityState(try_to_connect);
5454 }
5455
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)5456 int grpc_client_channel_num_external_connectivity_watchers(
5457 grpc_channel_element* elem) {
5458 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5459 return chand->NumExternalConnectivityWatchers();
5460 }
5461
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)5462 void grpc_client_channel_watch_connectivity_state(
5463 grpc_channel_element* elem, grpc_polling_entity pollent,
5464 grpc_connectivity_state* state, grpc_closure* on_complete,
5465 grpc_closure* watcher_timer_init) {
5466 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5467 if (state == nullptr) {
5468 // Handle cancellation.
5469 GPR_ASSERT(watcher_timer_init == nullptr);
5470 chand->RemoveExternalConnectivityWatcher(on_complete, /*cancel=*/true);
5471 return;
5472 }
5473 // Handle addition.
5474 return chand->AddExternalConnectivityWatcher(pollent, state, on_complete,
5475 watcher_timer_init);
5476 }
5477
grpc_client_channel_start_connectivity_watch(grpc_channel_element * elem,grpc_connectivity_state initial_state,grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface> watcher)5478 void grpc_client_channel_start_connectivity_watch(
5479 grpc_channel_element* elem, grpc_connectivity_state initial_state,
5480 grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
5481 watcher) {
5482 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5483 chand->AddConnectivityWatcher(initial_state, std::move(watcher));
5484 }
5485
grpc_client_channel_stop_connectivity_watch(grpc_channel_element * elem,grpc_core::AsyncConnectivityStateWatcherInterface * watcher)5486 void grpc_client_channel_stop_connectivity_watch(
5487 grpc_channel_element* elem,
5488 grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
5489 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5490 chand->RemoveConnectivityWatcher(watcher);
5491 }
5492