• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26 
27 #include <set>
28 
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38 
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41 
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_throttle.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/manual_constructor.h"
65 #include "src/core/lib/gprpp/sync.h"
66 #include "src/core/lib/iomgr/iomgr.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/iomgr/work_serializer.h"
69 #include "src/core/lib/profiling/timers.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/slice/slice_string_helpers.h"
72 #include "src/core/lib/surface/channel.h"
73 #include "src/core/lib/transport/connectivity_state.h"
74 #include "src/core/lib/transport/error_utils.h"
75 #include "src/core/lib/transport/metadata.h"
76 #include "src/core/lib/transport/metadata_batch.h"
77 #include "src/core/lib/transport/static_metadata.h"
78 #include "src/core/lib/transport/status_metadata.h"
79 
80 //
81 // Client channel filter
82 //
83 
84 // By default, we buffer 256 KiB per RPC for retries.
85 // TODO(roth): Do we have any data to suggest a better value?
86 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
87 
88 // This value was picked arbitrarily.  It can be changed if there is
89 // any even moderately compelling reason to do so.
90 #define RETRY_BACKOFF_JITTER 0.2
91 
92 // Max number of batches that can be pending on a call at any given
93 // time.  This includes one batch for each of the following ops:
94 //   recv_initial_metadata
95 //   send_initial_metadata
96 //   recv_message
97 //   send_message
98 //   recv_trailing_metadata
99 //   send_trailing_metadata
100 #define MAX_PENDING_BATCHES 6
101 
102 // Channel arg containing a pointer to the ChannelData object.
103 #define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
104 
105 // Channel arg containing a pointer to the RetryThrottleData object.
106 #define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
107 
108 namespace grpc_core {
109 
110 using internal::ClientChannelGlobalParsedConfig;
111 using internal::ClientChannelMethodParsedConfig;
112 using internal::ClientChannelServiceConfigParser;
113 using internal::ServerRetryThrottleData;
114 
115 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
116 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
117 
118 namespace {
119 
120 //
121 // ChannelData definition
122 //
123 
124 class LoadBalancedCall;
125 
126 class ChannelData {
127  public:
128   struct ResolverQueuedCall {
129     grpc_call_element* elem;
130     ResolverQueuedCall* next = nullptr;
131   };
132   struct LbQueuedCall {
133     LoadBalancedCall* lb_call;
134     LbQueuedCall* next = nullptr;
135   };
136 
137   static grpc_error* Init(grpc_channel_element* elem,
138                           grpc_channel_element_args* args);
139   static void Destroy(grpc_channel_element* elem);
140   static void StartTransportOp(grpc_channel_element* elem,
141                                grpc_transport_op* op);
142   static void GetChannelInfo(grpc_channel_element* elem,
143                              const grpc_channel_info* info);
144 
deadline_checking_enabled() const145   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const146   bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const147   size_t per_rpc_retry_buffer_size() const {
148     return per_rpc_retry_buffer_size_;
149   }
owning_stack() const150   grpc_channel_stack* owning_stack() const { return owning_stack_; }
151 
152   // Note: Does NOT return a new ref.
disconnect_error() const153   grpc_error* disconnect_error() const {
154     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
155   }
156 
resolution_mu() const157   Mutex* resolution_mu() const { return &resolution_mu_; }
158   // These methods all require holding resolution_mu_.
159   void AddResolverQueuedCall(ResolverQueuedCall* call,
160                              grpc_polling_entity* pollent);
161   void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
162                                 grpc_polling_entity* pollent);
received_service_config_data() const163   bool received_service_config_data() const {
164     return received_service_config_data_;
165   }
resolver_transient_failure_error() const166   grpc_error* resolver_transient_failure_error() const {
167     return resolver_transient_failure_error_;
168   }
service_config() const169   RefCountedPtr<ServiceConfig> service_config() const {
170     return service_config_;
171   }
config_selector() const172   ConfigSelector* config_selector() const { return config_selector_.get(); }
dynamic_filters() const173   RefCountedPtr<DynamicFilters> dynamic_filters() const {
174     return dynamic_filters_;
175   }
176 
data_plane_mu() const177   Mutex* data_plane_mu() const { return &data_plane_mu_; }
178   // These methods all require holding data_plane_mu_.
picker() const179   LoadBalancingPolicy::SubchannelPicker* picker() const {
180     return picker_.get();
181   }
182   void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent);
183   void RemoveLbQueuedCall(LbQueuedCall* to_remove,
184                           grpc_polling_entity* pollent);
185   RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
186       SubchannelInterface* subchannel) const;
187 
work_serializer() const188   WorkSerializer* work_serializer() const { return work_serializer_.get(); }
189 
190   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
191 
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)192   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
193                                       grpc_connectivity_state* state,
194                                       grpc_closure* on_complete,
195                                       grpc_closure* watcher_timer_init) {
196     new ExternalConnectivityWatcher(this, pollent, state, on_complete,
197                                     watcher_timer_init);
198   }
199 
RemoveExternalConnectivityWatcher(grpc_closure * on_complete,bool cancel)200   void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
201                                          bool cancel) {
202     ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
203         this, on_complete, cancel);
204   }
205 
NumExternalConnectivityWatchers() const206   int NumExternalConnectivityWatchers() const {
207     MutexLock lock(&external_watchers_mu_);
208     return static_cast<int>(external_watchers_.size());
209   }
210 
211   void AddConnectivityWatcher(
212       grpc_connectivity_state initial_state,
213       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
214   void RemoveConnectivityWatcher(
215       AsyncConnectivityStateWatcherInterface* watcher);
216 
217  private:
218   class SubchannelWrapper;
219   class ClientChannelControlHelper;
220   class ConnectivityWatcherAdder;
221   class ConnectivityWatcherRemover;
222 
223   // Represents a pending connectivity callback from an external caller
224   // via grpc_client_channel_watch_connectivity_state().
225   class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
226    public:
227     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
228                                 grpc_connectivity_state* state,
229                                 grpc_closure* on_complete,
230                                 grpc_closure* watcher_timer_init);
231 
232     ~ExternalConnectivityWatcher() override;
233 
234     // Removes the watcher from the external_watchers_ map.
235     static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
236                                                      grpc_closure* on_complete,
237                                                      bool cancel);
238 
239     void Notify(grpc_connectivity_state state,
240                 const absl::Status& /* status */) override;
241 
242     void Cancel();
243 
244    private:
245     // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
246     // from Start().
247     void AddWatcherLocked();
248     void RemoveWatcherLocked();
249 
250     ChannelData* chand_;
251     grpc_polling_entity pollent_;
252     grpc_connectivity_state initial_state_;
253     grpc_connectivity_state* state_;
254     grpc_closure* on_complete_;
255     grpc_closure* watcher_timer_init_;
256     Atomic<bool> done_{false};
257   };
258 
259   class ResolverResultHandler : public Resolver::ResultHandler {
260    public:
ResolverResultHandler(ChannelData * chand)261     explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) {
262       GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
263     }
264 
~ResolverResultHandler()265     ~ResolverResultHandler() override {
266       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
267         gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
268       }
269       GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
270     }
271 
ReturnResult(Resolver::Result result)272     void ReturnResult(Resolver::Result result) override {
273       chand_->OnResolverResultChangedLocked(std::move(result));
274     }
275 
ReturnError(grpc_error * error)276     void ReturnError(grpc_error* error) override {
277       chand_->OnResolverErrorLocked(error);
278     }
279 
280    private:
281     ChannelData* chand_;
282   };
283 
284   ChannelData(grpc_channel_element_args* args, grpc_error** error);
285   ~ChannelData();
286 
287   // Note: All methods with "Locked" suffix must be invoked from within
288   // work_serializer_.
289 
290   void OnResolverResultChangedLocked(Resolver::Result result);
291   void OnResolverErrorLocked(grpc_error* error);
292 
293   void CreateOrUpdateLbPolicyLocked(
294       RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
295       Resolver::Result result);
296   OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
297       const grpc_channel_args& args);
298 
299   void UpdateStateAndPickerLocked(
300       grpc_connectivity_state state, const absl::Status& status,
301       const char* reason,
302       std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
303 
304   void UpdateServiceConfigInControlPlaneLocked(
305       RefCountedPtr<ServiceConfig> service_config,
306       RefCountedPtr<ConfigSelector> config_selector,
307       const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
308       const char* lb_policy_name);
309 
310   void UpdateServiceConfigInDataPlaneLocked();
311 
312   void CreateResolverLocked();
313   void DestroyResolverAndLbPolicyLocked();
314 
315   grpc_error* DoPingLocked(grpc_transport_op* op);
316 
317   void StartTransportOpLocked(grpc_transport_op* op);
318 
319   void TryToConnectLocked();
320 
321   //
322   // Fields set at construction and never modified.
323   //
324   const bool deadline_checking_enabled_;
325   const bool enable_retries_;
326   const size_t per_rpc_retry_buffer_size_;
327   grpc_channel_stack* owning_stack_;
328   ClientChannelFactory* client_channel_factory_;
329   const grpc_channel_args* channel_args_;
330   RefCountedPtr<ServiceConfig> default_service_config_;
331   std::string server_name_;
332   UniquePtr<char> target_uri_;
333   channelz::ChannelNode* channelz_node_;
334 
335   //
336   // Fields related to name resolution.  Guarded by resolution_mu_.
337   //
338   mutable Mutex resolution_mu_;
339   // Linked list of calls queued waiting for resolver result.
340   ResolverQueuedCall* resolver_queued_calls_ = nullptr;
341   // Data from service config.
342   grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
343   bool received_service_config_data_ = false;
344   RefCountedPtr<ServiceConfig> service_config_;
345   RefCountedPtr<ConfigSelector> config_selector_;
346   RefCountedPtr<DynamicFilters> dynamic_filters_;
347 
348   //
349   // Fields used in the data plane.  Guarded by data_plane_mu_.
350   //
351   mutable Mutex data_plane_mu_;
352   std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_;
353   // Linked list of calls queued waiting for LB pick.
354   LbQueuedCall* lb_queued_calls_ = nullptr;
355 
356   //
357   // Fields used in the control plane.  Guarded by work_serializer.
358   //
359   std::shared_ptr<WorkSerializer> work_serializer_;
360   grpc_pollset_set* interested_parties_;
361   ConnectivityStateTracker state_tracker_;
362   OrphanablePtr<Resolver> resolver_;
363   bool previous_resolution_contained_addresses_ = false;
364   RefCountedPtr<ServiceConfig> saved_service_config_;
365   RefCountedPtr<ConfigSelector> saved_config_selector_;
366   absl::optional<std::string> health_check_service_name_;
367   OrphanablePtr<LoadBalancingPolicy> lb_policy_;
368   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
369   // The number of SubchannelWrapper instances referencing a given Subchannel.
370   std::map<Subchannel*, int> subchannel_refcount_map_;
371   // The set of SubchannelWrappers that currently exist.
372   // No need to hold a ref, since the map is updated in the control-plane
373   // work_serializer when the SubchannelWrappers are created and destroyed.
374   std::set<SubchannelWrapper*> subchannel_wrappers_;
375   // Pending ConnectedSubchannel updates for each SubchannelWrapper.
376   // Updates are queued here in the control plane work_serializer and then
377   // applied in the data plane mutex when the picker is updated.
378   std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
379       pending_subchannel_updates_;
380   int keepalive_time_ = -1;
381 
382   //
383   // Fields accessed from both data plane mutex and control plane
384   // work_serializer.
385   //
386   Atomic<grpc_error*> disconnect_error_;
387 
388   //
389   // Fields guarded by a mutex, since they need to be accessed
390   // synchronously via get_channel_info().
391   //
392   Mutex info_mu_;
393   UniquePtr<char> info_lb_policy_name_;
394   UniquePtr<char> info_service_config_json_;
395 
396   //
397   // Fields guarded by a mutex, since they need to be accessed
398   // synchronously via grpc_channel_num_external_connectivity_watchers().
399   //
400   mutable Mutex external_watchers_mu_;
401   std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
402       external_watchers_;
403 };
404 
405 //
406 // CallData definition
407 //
408 
409 class CallData {
410  public:
411   static grpc_error* Init(grpc_call_element* elem,
412                           const grpc_call_element_args* args);
413   static void Destroy(grpc_call_element* elem,
414                       const grpc_call_final_info* final_info,
415                       grpc_closure* then_schedule_closure);
416   static void StartTransportStreamOpBatch(
417       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
418   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
419 
420   // Invoked by channel for queued calls when name resolution is completed.
421   static void CheckResolution(void* arg, grpc_error* error);
422   // Helper function for applying the service config to a call while
423   // holding ChannelData::resolution_mu_.
424   // Returns true if the service config has been applied to the call, in which
425   // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
426   // with the returned error.
427   bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error);
428   // Schedules a callback to continue processing the call once
429   // resolution is complete.  The callback will not run until after this
430   // method returns.
431   void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error);
432 
433  private:
434   class ResolverQueuedCallCanceller;
435 
436   CallData(grpc_call_element* elem, const ChannelData& chand,
437            const grpc_call_element_args& args);
438   ~CallData();
439 
440   // Returns the index into pending_batches_ to be used for batch.
441   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
442   void PendingBatchesAdd(grpc_call_element* elem,
443                          grpc_transport_stream_op_batch* batch);
444   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
445   // A predicate type and some useful implementations for PendingBatchesFail().
446   typedef bool (*YieldCallCombinerPredicate)(
447       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)448   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
449     return true;
450   }
NoYieldCallCombiner(const CallCombinerClosureList &)451   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
452     return false;
453   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)454   static bool YieldCallCombinerIfPendingBatchesFound(
455       const CallCombinerClosureList& closures) {
456     return closures.size() > 0;
457   }
458   // Fails all pending batches.
459   // If yield_call_combiner_predicate returns true, assumes responsibility for
460   // yielding the call combiner.
461   void PendingBatchesFail(
462       grpc_call_element* elem, grpc_error* error,
463       YieldCallCombinerPredicate yield_call_combiner_predicate);
464   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
465   // Resumes all pending batches on lb_call_.
466   void PendingBatchesResume(grpc_call_element* elem);
467 
468   // Applies service config to the call.  Must be invoked once we know
469   // that the resolver has returned results to the channel.
470   // If an error is returned, the error indicates the status with which
471   // the call should be failed.
472   grpc_error* ApplyServiceConfigToCallLocked(
473       grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
474   // Invoked when the resolver result is applied to the caller, on both
475   // success or failure.
476   static void ResolutionDone(void* arg, grpc_error* error);
477   // Removes the call (if present) from the channel's list of calls queued
478   // for name resolution.
479   void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem);
480   // Adds the call (if not already present) to the channel's list of
481   // calls queued for name resolution.
482   void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem);
483 
484   static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
485       void* arg, grpc_error* error);
486   void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
487       grpc_transport_stream_op_batch* batch);
488 
489   void CreateDynamicCall(grpc_call_element* elem);
490 
491   // State for handling deadlines.
492   // The code in deadline_filter.c requires this to be the first field.
493   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
494   // and this struct both independently store pointers to the call stack
495   // and call combiner.  If/when we have time, find a way to avoid this
496   // without breaking the grpc_deadline_state abstraction.
497   grpc_deadline_state deadline_state_;
498 
499   grpc_slice path_;  // Request path.
500   gpr_cycle_counter call_start_time_;
501   grpc_millis deadline_;
502   Arena* arena_;
503   grpc_call_stack* owning_call_;
504   CallCombiner* call_combiner_;
505   grpc_call_context_element* call_context_;
506 
507   grpc_polling_entity* pollent_ = nullptr;
508 
509   grpc_closure pick_closure_;
510 
511   // Accessed while holding ChannelData::resolution_mu_.
512   bool service_config_applied_ = false;
513   bool queued_pending_resolver_result_ = false;
514   ChannelData::ResolverQueuedCall resolver_queued_call_;
515   ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
516 
517   std::function<void()> on_call_committed_;
518 
519   grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
520   grpc_closure recv_initial_metadata_ready_;
521 
522   RefCountedPtr<DynamicFilters> dynamic_filters_;
523   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
524 
525   // Batches are added to this list when received from above.
526   // They are removed when we are done handling the batch (i.e., when
527   // either we have invoked all of the batch's callbacks or we have
528   // passed the batch down to the LB call and are not intercepting any of
529   // its callbacks).
530   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
531 
532   // Set when we get a cancel_stream op.
533   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
534 };
535 
536 //
537 // RetryingCall definition
538 //
539 
540 class RetryingCall {
541  public:
542   RetryingCall(
543       ChannelData* chand, const grpc_call_element_args& args,
544       grpc_polling_entity* pollent,
545       RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
546       const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy);
547   ~RetryingCall();
548 
549   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
550 
551   RefCountedPtr<SubchannelCall> subchannel_call() const;
552 
553  private:
554   // State used for starting a retryable batch on a subchannel call.
555   // This provides its own grpc_transport_stream_op_batch and other data
556   // structures needed to populate the ops in the batch.
557   // We allocate one struct on the arena for each attempt at starting a
558   // batch on a given subchannel call.
559   struct SubchannelCallBatchData {
560     // Creates a SubchannelCallBatchData object on the call's arena with the
561     // specified refcount.  If set_on_complete is true, the batch's
562     // on_complete callback will be set to point to on_complete();
563     // otherwise, the batch's on_complete callback will be null.
564     static SubchannelCallBatchData* Create(RetryingCall* call, int refcount,
565                                            bool set_on_complete);
566 
Unrefgrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallBatchData567     void Unref() {
568       if (gpr_unref(&refs)) Destroy();
569     }
570 
571     SubchannelCallBatchData(RetryingCall* call, int refcount,
572                             bool set_on_complete);
573     // All dtor code must be added in `Destroy()`. This is because we may
574     // call closures in `SubchannelCallBatchData` after they are unrefed by
575     // `Unref()`, and msan would complain about accessing this class
576     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
577     // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallBatchData578     ~SubchannelCallBatchData() { Destroy(); }
579     void Destroy();
580 
581     gpr_refcount refs;
582     grpc_call_element* elem;
583     RetryingCall* call;
584     RefCountedPtr<LoadBalancedCall> lb_call;
585     // The batch to use in the subchannel call.
586     // Its payload field points to SubchannelCallRetryState::batch_payload.
587     grpc_transport_stream_op_batch batch;
588     // For intercepting on_complete.
589     grpc_closure on_complete;
590   };
591 
592   // Retry state associated with a subchannel call.
593   // Stored in the parent_data of the subchannel call object.
594   struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallRetryState595     explicit SubchannelCallRetryState(grpc_call_context_element* context)
596         : batch_payload(context),
597           started_send_initial_metadata(false),
598           completed_send_initial_metadata(false),
599           started_send_trailing_metadata(false),
600           completed_send_trailing_metadata(false),
601           started_recv_initial_metadata(false),
602           completed_recv_initial_metadata(false),
603           started_recv_trailing_metadata(false),
604           completed_recv_trailing_metadata(false),
605           retry_dispatched(false) {}
606 
607     // SubchannelCallBatchData.batch.payload points to this.
608     grpc_transport_stream_op_batch_payload batch_payload;
609     // For send_initial_metadata.
610     // Note that we need to make a copy of the initial metadata for each
611     // subchannel call instead of just referring to the copy in call_data,
612     // because filters in the subchannel stack will probably add entries,
613     // so we need to start in a pristine state for each attempt of the call.
614     grpc_linked_mdelem* send_initial_metadata_storage;
615     grpc_metadata_batch send_initial_metadata;
616     // For send_message.
617     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
618     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
619     // For send_trailing_metadata.
620     grpc_linked_mdelem* send_trailing_metadata_storage;
621     grpc_metadata_batch send_trailing_metadata;
622     // For intercepting recv_initial_metadata.
623     grpc_metadata_batch recv_initial_metadata;
624     grpc_closure recv_initial_metadata_ready;
625     bool trailing_metadata_available = false;
626     // For intercepting recv_message.
627     grpc_closure recv_message_ready;
628     OrphanablePtr<ByteStream> recv_message;
629     // For intercepting recv_trailing_metadata.
630     grpc_metadata_batch recv_trailing_metadata;
631     grpc_transport_stream_stats collect_stats;
632     grpc_closure recv_trailing_metadata_ready;
633     // These fields indicate which ops have been started and completed on
634     // this subchannel call.
635     size_t started_send_message_count = 0;
636     size_t completed_send_message_count = 0;
637     size_t started_recv_message_count = 0;
638     size_t completed_recv_message_count = 0;
639     bool started_send_initial_metadata : 1;
640     bool completed_send_initial_metadata : 1;
641     bool started_send_trailing_metadata : 1;
642     bool completed_send_trailing_metadata : 1;
643     bool started_recv_initial_metadata : 1;
644     bool completed_recv_initial_metadata : 1;
645     bool started_recv_trailing_metadata : 1;
646     bool completed_recv_trailing_metadata : 1;
647     // State for callback processing.
648     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
649         nullptr;
650     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
651     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
652     grpc_error* recv_message_error = GRPC_ERROR_NONE;
653     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
654     // NOTE: Do not move this next to the metadata bitfields above. That would
655     //       save space but will also result in a data race because compiler
656     //       will generate a 2 byte store which overwrites the meta-data
657     //       fields upon setting this field.
658     bool retry_dispatched : 1;
659   };
660 
661   // Pending batches stored in call data.
662   struct PendingBatch {
663     // The pending batch.  If nullptr, this slot is empty.
664     grpc_transport_stream_op_batch* batch = nullptr;
665     // Indicates whether payload for send ops has been cached in CallData.
666     bool send_ops_cached = false;
667   };
668 
669   // Caches data for send ops so that it can be retried later, if not
670   // already cached.
671   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
672   void FreeCachedSendInitialMetadata();
673   // Frees cached send_message at index idx.
674   void FreeCachedSendMessage(size_t idx);
675   void FreeCachedSendTrailingMetadata();
676   // Frees cached send ops that have already been completed after
677   // committing the call.
678   void FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState* retry_state);
679   // Frees cached send ops that were completed by the completed batch in
680   // batch_data.  Used when batches are completed after the call is committed.
681   void FreeCachedSendOpDataForCompletedBatch(
682       SubchannelCallBatchData* batch_data,
683       SubchannelCallRetryState* retry_state);
684 
685   // Returns the index into pending_batches_ to be used for batch.
686   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
687   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
688   void PendingBatchClear(PendingBatch* pending);
689   void MaybeClearPendingBatch(PendingBatch* pending);
690   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
691   // A predicate type and some useful implementations for PendingBatchesFail().
692   typedef bool (*YieldCallCombinerPredicate)(
693       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)694   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
695     return true;
696   }
NoYieldCallCombiner(const CallCombinerClosureList &)697   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
698     return false;
699   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)700   static bool YieldCallCombinerIfPendingBatchesFound(
701       const CallCombinerClosureList& closures) {
702     return closures.size() > 0;
703   }
704   // Fails all pending batches.
705   // If yield_call_combiner_predicate returns true, assumes responsibility for
706   // yielding the call combiner.
707   void PendingBatchesFail(
708       grpc_error* error,
709       YieldCallCombinerPredicate yield_call_combiner_predicate);
710   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
711   // Resumes all pending batches on lb_call_.
712   void PendingBatchesResume();
713   // Returns a pointer to the first pending batch for which predicate(batch)
714   // returns true, or null if not found.
715   template <typename Predicate>
716   PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
717 
718   // Commits the call so that no further retry attempts will be performed.
719   void RetryCommit(SubchannelCallRetryState* retry_state);
720   // Starts a retry after appropriate back-off.
721   void DoRetry(SubchannelCallRetryState* retry_state,
722                grpc_millis server_pushback_ms);
723   // Returns true if the call is being retried.
724   bool MaybeRetry(SubchannelCallBatchData* batch_data, grpc_status_code status,
725                   grpc_mdelem* server_pushback_md);
726 
727   // Invokes recv_initial_metadata_ready for a subchannel batch.
728   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
729   // Intercepts recv_initial_metadata_ready callback for retries.
730   // Commits the call and returns the initial metadata up the stack.
731   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
732 
733   // Invokes recv_message_ready for a subchannel batch.
734   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
735   // Intercepts recv_message_ready callback for retries.
736   // Commits the call and returns the message up the stack.
737   static void RecvMessageReady(void* arg, grpc_error* error);
738 
739   // Sets *status and *server_pushback_md based on md_batch and error.
740   // Only sets *server_pushback_md if server_pushback_md != nullptr.
741   void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
742                      grpc_status_code* status,
743                      grpc_mdelem** server_pushback_md);
744   // Adds recv_trailing_metadata_ready closure to closures.
745   void AddClosureForRecvTrailingMetadataReady(
746       SubchannelCallBatchData* batch_data, grpc_error* error,
747       CallCombinerClosureList* closures);
748   // Adds any necessary closures for deferred recv_initial_metadata and
749   // recv_message callbacks to closures.
750   static void AddClosuresForDeferredRecvCallbacks(
751       SubchannelCallBatchData* batch_data,
752       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
753   // Returns true if any op in the batch was not yet started.
754   // Only looks at send ops, since recv ops are always started immediately.
755   bool PendingBatchIsUnstarted(PendingBatch* pending,
756                                SubchannelCallRetryState* retry_state);
757   // For any pending batch containing an op that has not yet been started,
758   // adds the pending batch's completion closures to closures.
759   void AddClosuresToFailUnstartedPendingBatches(
760       SubchannelCallRetryState* retry_state, grpc_error* error,
761       CallCombinerClosureList* closures);
762   // Runs necessary closures upon completion of a call attempt.
763   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
764                                    grpc_error* error);
765   // Intercepts recv_trailing_metadata_ready callback for retries.
766   // Commits the call and returns the trailing metadata up the stack.
767   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
768 
769   // Adds the on_complete closure for the pending batch completed in
770   // batch_data to closures.
771   void AddClosuresForCompletedPendingBatch(SubchannelCallBatchData* batch_data,
772                                            grpc_error* error,
773                                            CallCombinerClosureList* closures);
774 
775   // If there are any cached ops to replay or pending ops to start on the
776   // subchannel call, adds a closure to closures to invoke
777   // StartRetriableSubchannelBatches().
778   void AddClosuresForReplayOrPendingSendOps(
779       SubchannelCallBatchData* batch_data,
780       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
781 
782   // Callback used to intercept on_complete from subchannel calls.
783   // Called only when retries are enabled.
784   static void OnComplete(void* arg, grpc_error* error);
785 
786   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
787   // Adds a closure to closures that will execute batch in the call combiner.
788   void AddClosureForSubchannelBatch(grpc_transport_stream_op_batch* batch,
789                                     CallCombinerClosureList* closures);
790   // Adds retriable send_initial_metadata op to batch_data.
791   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
792                                          SubchannelCallBatchData* batch_data);
793   // Adds retriable send_message op to batch_data.
794   void AddRetriableSendMessageOp(SubchannelCallRetryState* retry_state,
795                                  SubchannelCallBatchData* batch_data);
796   // Adds retriable send_trailing_metadata op to batch_data.
797   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
798                                           SubchannelCallBatchData* batch_data);
799   // Adds retriable recv_initial_metadata op to batch_data.
800   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
801                                          SubchannelCallBatchData* batch_data);
802   // Adds retriable recv_message op to batch_data.
803   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
804                                  SubchannelCallBatchData* batch_data);
805   // Adds retriable recv_trailing_metadata op to batch_data.
806   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
807                                           SubchannelCallBatchData* batch_data);
808   // Helper function used to start a recv_trailing_metadata batch.  This
809   // is used in the case where a recv_initial_metadata or recv_message
810   // op fails in a way that we know the call is over but when the application
811   // has not yet started its own recv_trailing_metadata op.
812   void StartInternalRecvTrailingMetadata();
813   // If there are any cached send ops that need to be replayed on the
814   // current subchannel call, creates and returns a new subchannel batch
815   // to replay those ops.  Otherwise, returns nullptr.
816   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
817       SubchannelCallRetryState* retry_state);
818   // Adds subchannel batches for pending batches to closures.
819   void AddSubchannelBatchesForPendingBatches(
820       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
821   // Constructs and starts whatever subchannel batches are needed on the
822   // subchannel call.
823   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
824 
825   static void CreateLbCall(void* arg, grpc_error* error);
826 
827   ChannelData* chand_;
828   grpc_polling_entity* pollent_;
829   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
830   const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
831   BackOff retry_backoff_;
832 
833   grpc_slice path_;  // Request path.
834   gpr_cycle_counter call_start_time_;
835   grpc_millis deadline_;
836   Arena* arena_;
837   grpc_call_stack* owning_call_;
838   CallCombiner* call_combiner_;
839   grpc_call_context_element* call_context_;
840 
841   grpc_closure retry_closure_;
842 
843   RefCountedPtr<LoadBalancedCall> lb_call_;
844 
845   // Batches are added to this list when received from above.
846   // They are removed when we are done handling the batch (i.e., when
847   // either we have invoked all of the batch's callbacks or we have
848   // passed the batch down to the LB call and are not intercepting any of
849   // its callbacks).
850   // TODO(roth): Now that the retry code is split out into its own call
851   // object, revamp this to work in a cleaner way, since we no longer need
852   // for batches to ever wait for name resolution or LB picks.
853   PendingBatch pending_batches_[MAX_PENDING_BATCHES];
854   bool pending_send_initial_metadata_ : 1;
855   bool pending_send_message_ : 1;
856   bool pending_send_trailing_metadata_ : 1;
857 
858   // Set when we get a cancel_stream op.
859   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
860 
861   // Retry state.
862   bool enable_retries_ : 1;
863   bool retry_committed_ : 1;
864   bool last_attempt_got_server_pushback_ : 1;
865   int num_attempts_completed_ = 0;
866   size_t bytes_buffered_for_retry_ = 0;
867   grpc_timer retry_timer_;
868 
869   // The number of pending retriable subchannel batches containing send ops.
870   // We hold a ref to the call stack while this is non-zero, since replay
871   // batches may not complete until after all callbacks have been returned
872   // to the surface, and we need to make sure that the call is not destroyed
873   // until all of these batches have completed.
874   // Note that we actually only need to track replay batches, but it's
875   // easier to track all batches with send ops.
876   int num_pending_retriable_subchannel_send_batches_ = 0;
877 
878   // Cached data for retrying send ops.
879   // send_initial_metadata
880   bool seen_send_initial_metadata_ = false;
881   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
882   grpc_metadata_batch send_initial_metadata_;
883   uint32_t send_initial_metadata_flags_;
884   gpr_atm* peer_string_;
885   // send_message
886   // When we get a send_message op, we replace the original byte stream
887   // with a CachingByteStream that caches the slices to a local buffer for
888   // use in retries.
889   // Note: We inline the cache for the first 3 send_message ops and use
890   // dynamic allocation after that.  This number was essentially picked
891   // at random; it could be changed in the future to tune performance.
892   absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
893   // send_trailing_metadata
894   bool seen_send_trailing_metadata_ = false;
895   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
896   grpc_metadata_batch send_trailing_metadata_;
897 };
898 
899 //
900 // LoadBalancedCall definition
901 //
902 
903 // This object is ref-counted, but it cannot inherit from RefCounted<>,
904 // because it is allocated on the arena and can't free its memory when
905 // its refcount goes to zero.  So instead, it manually implements the
906 // same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
907 class LoadBalancedCall {
908  public:
909   static RefCountedPtr<LoadBalancedCall> Create(
910       ChannelData* chand, const grpc_call_element_args& args,
911       grpc_polling_entity* pollent, size_t parent_data_size);
912 
913   LoadBalancedCall(ChannelData* chand, const grpc_call_element_args& args,
914                    grpc_polling_entity* pollent);
915   ~LoadBalancedCall();
916 
917   // Interface of RefCounted<>.
918   RefCountedPtr<LoadBalancedCall> Ref() GRPC_MUST_USE_RESULT;
919   RefCountedPtr<LoadBalancedCall> Ref(const DebugLocation& location,
920                                       const char* reason) GRPC_MUST_USE_RESULT;
921   // When refcount drops to 0, destroys itself and the associated call stack,
922   // but does NOT free the memory because it's in the call arena.
923   void Unref();
924   void Unref(const DebugLocation& location, const char* reason);
925 
926   void* GetParentData();
927 
928   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
929 
930   // Invoked by channel for queued LB picks when the picker is updated.
931   static void PickSubchannel(void* arg, grpc_error* error);
932   // Helper function for performing an LB pick while holding the data plane
933   // mutex.  Returns true if the pick is complete, in which case the caller
934   // must invoke PickDone() or AsyncPickDone() with the returned error.
935   bool PickSubchannelLocked(grpc_error** error);
936   // Schedules a callback to process the completed pick.  The callback
937   // will not run until after this method returns.
938   void AsyncPickDone(grpc_error* error);
939 
subchannel_call() const940   RefCountedPtr<SubchannelCall> subchannel_call() const {
941     return subchannel_call_;
942   }
943 
944  private:
945   // Allow RefCountedPtr<> to access IncrementRefCount().
946   template <typename T>
947   friend class ::grpc_core::RefCountedPtr;
948 
949   class LbQueuedCallCanceller;
950   class Metadata;
951   class LbCallState;
952 
953   // Interface of RefCounted<>.
954   void IncrementRefCount();
955   void IncrementRefCount(const DebugLocation& location, const char* reason);
956 
957   // Returns the index into pending_batches_ to be used for batch.
958   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
959   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
960   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
961   // A predicate type and some useful implementations for PendingBatchesFail().
962   typedef bool (*YieldCallCombinerPredicate)(
963       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)964   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
965     return true;
966   }
NoYieldCallCombiner(const CallCombinerClosureList &)967   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
968     return false;
969   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)970   static bool YieldCallCombinerIfPendingBatchesFound(
971       const CallCombinerClosureList& closures) {
972     return closures.size() > 0;
973   }
974   // Fails all pending batches.
975   // If yield_call_combiner_predicate returns true, assumes responsibility for
976   // yielding the call combiner.
977   void PendingBatchesFail(
978       grpc_error* error,
979       YieldCallCombinerPredicate yield_call_combiner_predicate);
980   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
981   // Resumes all pending batches on subchannel_call_.
982   void PendingBatchesResume();
983 
984   static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
985       void* arg, grpc_error* error);
986   void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
987       grpc_transport_stream_op_batch* batch);
988 
989   void CreateSubchannelCall();
990   // Invoked when a pick is completed, on both success or failure.
991   static void PickDone(void* arg, grpc_error* error);
992   // Removes the call from the channel's list of queued picks if present.
993   void MaybeRemoveCallFromLbQueuedCallsLocked();
994   // Adds the call to the channel's list of queued picks if not already present.
995   void MaybeAddCallToLbQueuedCallsLocked();
996 
997   RefCount refs_;
998 
999   ChannelData* chand_;
1000 
1001   // TODO(roth): Instead of duplicating these fields in every filter
1002   // that uses any one of them, we should store them in the call
1003   // context.  This will save per-call memory overhead.
1004   grpc_slice path_;  // Request path.
1005   gpr_cycle_counter call_start_time_;
1006   grpc_millis deadline_;
1007   Arena* arena_;
1008   grpc_call_stack* owning_call_;
1009   CallCombiner* call_combiner_;
1010   grpc_call_context_element* call_context_;
1011 
1012   // Set when we get a cancel_stream op.
1013   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
1014 
1015   grpc_polling_entity* pollent_ = nullptr;
1016 
1017   grpc_closure pick_closure_;
1018 
1019   // Accessed while holding ChannelData::data_plane_mu_.
1020   ChannelData::LbQueuedCall queued_call_;
1021   bool queued_pending_lb_pick_ = false;
1022   const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
1023   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1024   std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*,
1025                      LoadBalancingPolicy::CallState*)>
1026       lb_recv_trailing_metadata_ready_;
1027   LbQueuedCallCanceller* lb_call_canceller_ = nullptr;
1028 
1029   RefCountedPtr<SubchannelCall> subchannel_call_;
1030 
1031   // For intercepting recv_trailing_metadata_ready for the LB policy.
1032   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
1033   grpc_closure recv_trailing_metadata_ready_;
1034   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
1035 
1036   // Batches are added to this list when received from above.
1037   // They are removed when we are done handling the batch (i.e., when
1038   // either we have invoked all of the batch's callbacks or we have
1039   // passed the batch down to the subchannel call and are not
1040   // intercepting any of its callbacks).
1041   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
1042 };
1043 
1044 //
1045 // dynamic termination filter
1046 //
1047 
1048 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
ChannelDataArgCopy(void * p)1049 void* ChannelDataArgCopy(void* p) { return p; }
ChannelDataArgDestroy(void *)1050 void ChannelDataArgDestroy(void* /*p*/) {}
ChannelDataArgCmp(void * p,void * q)1051 int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1052 const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
1053     ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
1054 
1055 // Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
RetryThrottleDataArgCopy(void * p)1056 void* RetryThrottleDataArgCopy(void* p) {
1057   auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1058   retry_throttle_data->Ref().release();
1059   return p;
1060 }
RetryThrottleDataArgDestroy(void * p)1061 void RetryThrottleDataArgDestroy(void* p) {
1062   auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1063   retry_throttle_data->Unref();
1064 }
RetryThrottleDataArgCmp(void * p,void * q)1065 int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1066 const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
1067     RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
1068     RetryThrottleDataArgCmp};
1069 
1070 class DynamicTerminationFilterChannelData {
1071  public:
1072   static grpc_error* Init(grpc_channel_element* elem,
1073                           grpc_channel_element_args* args);
1074 
Destroy(grpc_channel_element * elem)1075   static void Destroy(grpc_channel_element* elem) {
1076     auto* chand =
1077         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1078     chand->~DynamicTerminationFilterChannelData();
1079   }
1080 
1081   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)1082   static void StartTransportOp(grpc_channel_element* /*elem*/,
1083                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)1084   static void GetChannelInfo(grpc_channel_element* /*elem*/,
1085                              const grpc_channel_info* /*info*/) {}
1086 
chand() const1087   ChannelData* chand() const { return chand_; }
retry_throttle_data() const1088   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
1089     return retry_throttle_data_;
1090   }
1091 
1092  private:
GetRetryThrottleDataFromArgs(const grpc_channel_args * args)1093   static RefCountedPtr<ServerRetryThrottleData> GetRetryThrottleDataFromArgs(
1094       const grpc_channel_args* args) {
1095     auto* retry_throttle_data =
1096         grpc_channel_args_find_pointer<ServerRetryThrottleData>(
1097             args, GRPC_ARG_RETRY_THROTTLE_DATA);
1098     if (retry_throttle_data == nullptr) return nullptr;
1099     return retry_throttle_data->Ref();
1100   }
1101 
DynamicTerminationFilterChannelData(const grpc_channel_args * args)1102   explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
1103       : chand_(grpc_channel_args_find_pointer<ChannelData>(
1104             args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
1105         retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
1106 
1107   ChannelData* chand_;
1108   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
1109 };
1110 
1111 class DynamicTerminationFilterCallData {
1112  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)1113   static grpc_error* Init(grpc_call_element* elem,
1114                           const grpc_call_element_args* args) {
1115     new (elem->call_data) DynamicTerminationFilterCallData(*args);
1116     return GRPC_ERROR_NONE;
1117   }
1118 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1119   static void Destroy(grpc_call_element* elem,
1120                       const grpc_call_final_info* /*final_info*/,
1121                       grpc_closure* then_schedule_closure) {
1122     auto* calld =
1123         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1124     auto* chand =
1125         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1126     RefCountedPtr<SubchannelCall> subchannel_call;
1127     if (chand->chand()->enable_retries()) {
1128       if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
1129         subchannel_call = calld->retrying_call_->subchannel_call();
1130         calld->retrying_call_->~RetryingCall();
1131       }
1132     } else {
1133       if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
1134         subchannel_call = calld->lb_call_->subchannel_call();
1135       }
1136     }
1137     calld->~DynamicTerminationFilterCallData();
1138     if (GPR_LIKELY(subchannel_call != nullptr)) {
1139       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
1140     } else {
1141       // TODO(yashkt) : This can potentially be a Closure::Run
1142       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1143     }
1144   }
1145 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1146   static void StartTransportStreamOpBatch(
1147       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1148     auto* calld =
1149         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1150     auto* chand =
1151         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1152     if (chand->chand()->enable_retries()) {
1153       calld->retrying_call_->StartTransportStreamOpBatch(batch);
1154     } else {
1155       calld->lb_call_->StartTransportStreamOpBatch(batch);
1156     }
1157   }
1158 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1159   static void SetPollent(grpc_call_element* elem,
1160                          grpc_polling_entity* pollent) {
1161     auto* calld =
1162         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1163     auto* chand =
1164         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1165     ChannelData* client_channel = chand->chand();
1166     grpc_call_element_args args = {
1167         calld->owning_call_,     nullptr,
1168         calld->call_context_,    calld->path_,
1169         calld->call_start_time_, calld->deadline_,
1170         calld->arena_,           calld->call_combiner_};
1171     if (client_channel->enable_retries()) {
1172       // Get retry settings from service config.
1173       auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1174           calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1175       GPR_ASSERT(svc_cfg_call_data != nullptr);
1176       auto* method_config = static_cast<const ClientChannelMethodParsedConfig*>(
1177           svc_cfg_call_data->GetMethodParsedConfig(
1178               ClientChannelServiceConfigParser::ParserIndex()));
1179       // Create retrying call.
1180       calld->retrying_call_ = calld->arena_->New<RetryingCall>(
1181           client_channel, args, pollent, chand->retry_throttle_data(),
1182           method_config == nullptr ? nullptr : method_config->retry_policy());
1183       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184         gpr_log(
1185             GPR_INFO,
1186             "chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
1187             client_channel, calld, calld->retrying_call_);
1188       }
1189     } else {
1190       calld->lb_call_ =
1191           LoadBalancedCall::Create(client_channel, args, pollent, 0);
1192       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1193         gpr_log(GPR_INFO,
1194                 "chand=%p dynamic_termination_calld=%p: create lb_call=%p",
1195                 chand, client_channel, calld->lb_call_.get());
1196       }
1197     }
1198   }
1199 
1200  private:
DynamicTerminationFilterCallData(const grpc_call_element_args & args)1201   explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
1202       : path_(grpc_slice_ref_internal(args.path)),
1203         call_start_time_(args.start_time),
1204         deadline_(args.deadline),
1205         arena_(args.arena),
1206         owning_call_(args.call_stack),
1207         call_combiner_(args.call_combiner),
1208         call_context_(args.context) {}
1209 
~DynamicTerminationFilterCallData()1210   ~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
1211 
1212   grpc_slice path_;  // Request path.
1213   gpr_cycle_counter call_start_time_;
1214   grpc_millis deadline_;
1215   Arena* arena_;
1216   grpc_call_stack* owning_call_;
1217   CallCombiner* call_combiner_;
1218   grpc_call_context_element* call_context_;
1219 
1220   RetryingCall* retrying_call_ = nullptr;
1221   RefCountedPtr<LoadBalancedCall> lb_call_;
1222 };
1223 
1224 const grpc_channel_filter kDynamicTerminationFilterVtable = {
1225     DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
1226     DynamicTerminationFilterChannelData::StartTransportOp,
1227     sizeof(DynamicTerminationFilterCallData),
1228     DynamicTerminationFilterCallData::Init,
1229     DynamicTerminationFilterCallData::SetPollent,
1230     DynamicTerminationFilterCallData::Destroy,
1231     sizeof(DynamicTerminationFilterChannelData),
1232     DynamicTerminationFilterChannelData::Init,
1233     DynamicTerminationFilterChannelData::Destroy,
1234     DynamicTerminationFilterChannelData::GetChannelInfo,
1235     "dynamic_filter_termination",
1236 };
1237 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1238 grpc_error* DynamicTerminationFilterChannelData::Init(
1239     grpc_channel_element* elem, grpc_channel_element_args* args) {
1240   GPR_ASSERT(args->is_last);
1241   GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
1242   new (elem->channel_data)
1243       DynamicTerminationFilterChannelData(args->channel_args);
1244   return GRPC_ERROR_NONE;
1245 }
1246 
1247 //
1248 // ChannelData::SubchannelWrapper
1249 //
1250 
1251 // This class is a wrapper for Subchannel that hides details of the
1252 // channel's implementation (such as the health check service name and
1253 // connected subchannel) from the LB policy API.
1254 //
1255 // Note that no synchronization is needed here, because even if the
1256 // underlying subchannel is shared between channels, this wrapper will only
1257 // be used within one channel, so it will always be synchronized by the
1258 // control plane work_serializer.
1259 class ChannelData::SubchannelWrapper : public SubchannelInterface {
1260  public:
SubchannelWrapper(ChannelData * chand,Subchannel * subchannel,absl::optional<std::string> health_check_service_name)1261   SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
1262                     absl::optional<std::string> health_check_service_name)
1263       : SubchannelInterface(
1264             GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
1265                 ? "SubchannelWrapper"
1266                 : nullptr),
1267         chand_(chand),
1268         subchannel_(subchannel),
1269         health_check_service_name_(std::move(health_check_service_name)) {
1270     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1271       gpr_log(GPR_INFO,
1272               "chand=%p: creating subchannel wrapper %p for subchannel %p",
1273               chand, this, subchannel_);
1274     }
1275     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
1276     auto* subchannel_node = subchannel_->channelz_node();
1277     if (subchannel_node != nullptr) {
1278       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1279       if (it == chand_->subchannel_refcount_map_.end()) {
1280         chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
1281         it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
1282       }
1283       ++it->second;
1284     }
1285     chand_->subchannel_wrappers_.insert(this);
1286   }
1287 
~SubchannelWrapper()1288   ~SubchannelWrapper() override {
1289     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1290       gpr_log(GPR_INFO,
1291               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
1292               chand_, this, subchannel_);
1293     }
1294     chand_->subchannel_wrappers_.erase(this);
1295     auto* subchannel_node = subchannel_->channelz_node();
1296     if (subchannel_node != nullptr) {
1297       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1298       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
1299       --it->second;
1300       if (it->second == 0) {
1301         chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
1302         chand_->subchannel_refcount_map_.erase(it);
1303       }
1304     }
1305     GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1306     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
1307   }
1308 
CheckConnectivityState()1309   grpc_connectivity_state CheckConnectivityState() override {
1310     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
1311     grpc_connectivity_state connectivity_state =
1312         subchannel_->CheckConnectivityState(health_check_service_name_,
1313                                             &connected_subchannel);
1314     MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
1315     return connectivity_state;
1316   }
1317 
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1318   void WatchConnectivityState(
1319       grpc_connectivity_state initial_state,
1320       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
1321     auto& watcher_wrapper = watcher_map_[watcher.get()];
1322     GPR_ASSERT(watcher_wrapper == nullptr);
1323     watcher_wrapper = new WatcherWrapper(std::move(watcher),
1324                                          Ref(DEBUG_LOCATION, "WatcherWrapper"),
1325                                          initial_state);
1326     subchannel_->WatchConnectivityState(
1327         initial_state, health_check_service_name_,
1328         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1329             watcher_wrapper));
1330   }
1331 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1332   void CancelConnectivityStateWatch(
1333       ConnectivityStateWatcherInterface* watcher) override {
1334     auto it = watcher_map_.find(watcher);
1335     GPR_ASSERT(it != watcher_map_.end());
1336     subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1337                                               it->second);
1338     watcher_map_.erase(it);
1339   }
1340 
AttemptToConnect()1341   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1342 
ResetBackoff()1343   void ResetBackoff() override { subchannel_->ResetBackoff(); }
1344 
channel_args()1345   const grpc_channel_args* channel_args() override {
1346     return subchannel_->channel_args();
1347   }
1348 
ThrottleKeepaliveTime(int new_keepalive_time)1349   void ThrottleKeepaliveTime(int new_keepalive_time) {
1350     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
1351   }
1352 
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)1353   void UpdateHealthCheckServiceName(
1354       absl::optional<std::string> health_check_service_name) {
1355     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1356       gpr_log(GPR_INFO,
1357               "chand=%p: subchannel wrapper %p: updating health check service "
1358               "name from \"%s\" to \"%s\"",
1359               chand_, this, health_check_service_name_->c_str(),
1360               health_check_service_name->c_str());
1361     }
1362     for (auto& p : watcher_map_) {
1363       WatcherWrapper*& watcher_wrapper = p.second;
1364       // Cancel the current watcher and create a new one using the new
1365       // health check service name.
1366       // TODO(roth): If there is not already an existing health watch
1367       // call for the new name, then the watcher will initially report
1368       // state CONNECTING.  If the LB policy is currently reporting
1369       // state READY, this may cause it to switch to CONNECTING before
1370       // switching back to READY.  This could cause a small delay for
1371       // RPCs being started on the channel.  If/when this becomes a
1372       // problem, we may be able to handle it by waiting for the new
1373       // watcher to report READY before we use it to replace the old one.
1374       WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
1375       subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1376                                                 watcher_wrapper);
1377       watcher_wrapper = replacement;
1378       subchannel_->WatchConnectivityState(
1379           replacement->last_seen_state(), health_check_service_name,
1380           RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1381               replacement));
1382     }
1383     // Save the new health check service name.
1384     health_check_service_name_ = std::move(health_check_service_name);
1385   }
1386 
1387   // Caller must be holding the control-plane work_serializer.
connected_subchannel() const1388   ConnectedSubchannel* connected_subchannel() const {
1389     return connected_subchannel_.get();
1390   }
1391 
1392   // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const1393   ConnectedSubchannel* connected_subchannel_in_data_plane() const {
1394     return connected_subchannel_in_data_plane_.get();
1395   }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1396   void set_connected_subchannel_in_data_plane(
1397       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1398     connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
1399   }
1400 
1401  private:
1402   // Subchannel and SubchannelInterface have different interfaces for
1403   // their respective ConnectivityStateWatcherInterface classes.
1404   // The one in Subchannel updates the ConnectedSubchannel along with
1405   // the state, whereas the one in SubchannelInterface does not expose
1406   // the ConnectedSubchannel.
1407   //
1408   // This wrapper provides a bridge between the two.  It implements
1409   // Subchannel::ConnectivityStateWatcherInterface and wraps
1410   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
1411   // that was passed in by the LB policy.  We pass an instance of this
1412   // class to the underlying Subchannel, and when we get updates from
1413   // the subchannel, we pass those on to the wrapped watcher to return
1414   // the update to the LB policy.  This allows us to set the connected
1415   // subchannel before passing the result back to the LB policy.
1416   class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
1417    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)1418     WatcherWrapper(
1419         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1420             watcher,
1421         RefCountedPtr<SubchannelWrapper> parent,
1422         grpc_connectivity_state initial_state)
1423         : watcher_(std::move(watcher)),
1424           parent_(std::move(parent)),
1425           last_seen_state_(initial_state) {}
1426 
~WatcherWrapper()1427     ~WatcherWrapper() override {
1428       auto* parent = parent_.release();  // ref owned by lambda
1429       parent->chand_->work_serializer_->Run(
1430           [parent]() { parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); },
1431           DEBUG_LOCATION);
1432     }
1433 
OnConnectivityStateChange()1434     void OnConnectivityStateChange() override {
1435       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1436         gpr_log(GPR_INFO,
1437                 "chand=%p: connectivity change for subchannel wrapper %p "
1438                 "subchannel %p; hopping into work_serializer",
1439                 parent_->chand_, parent_.get(), parent_->subchannel_);
1440       }
1441       Ref().release();  // ref owned by lambda
1442       parent_->chand_->work_serializer_->Run(
1443           [this]() {
1444             ApplyUpdateInControlPlaneWorkSerializer();
1445             Unref();
1446           },
1447           DEBUG_LOCATION);
1448     }
1449 
interested_parties()1450     grpc_pollset_set* interested_parties() override {
1451       SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1452           watcher_.get();
1453       if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1454       return watcher->interested_parties();
1455     }
1456 
MakeReplacement()1457     WatcherWrapper* MakeReplacement() {
1458       auto* replacement =
1459           new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
1460       replacement_ = replacement;
1461       return replacement;
1462     }
1463 
last_seen_state() const1464     grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1465 
1466    private:
ApplyUpdateInControlPlaneWorkSerializer()1467     void ApplyUpdateInControlPlaneWorkSerializer() {
1468       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1469         gpr_log(GPR_INFO,
1470                 "chand=%p: processing connectivity change in work serializer "
1471                 "for subchannel wrapper %p subchannel %p "
1472                 "watcher=%p",
1473                 parent_->chand_, parent_.get(), parent_->subchannel_,
1474                 watcher_.get());
1475       }
1476       ConnectivityStateChange state_change = PopConnectivityStateChange();
1477       absl::optional<absl::Cord> keepalive_throttling =
1478           state_change.status.GetPayload(kKeepaliveThrottlingKey);
1479       if (keepalive_throttling.has_value()) {
1480         int new_keepalive_time = -1;
1481         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
1482                              &new_keepalive_time)) {
1483           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
1484             parent_->chand_->keepalive_time_ = new_keepalive_time;
1485             if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1486               gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
1487                       parent_->chand_, parent_->chand_->keepalive_time_);
1488             }
1489             // Propagate the new keepalive time to all subchannels. This is so
1490             // that new transports created by any subchannel (and not just the
1491             // subchannel that received the GOAWAY), use the new keepalive time.
1492             for (auto* subchannel_wrapper :
1493                  parent_->chand_->subchannel_wrappers_) {
1494               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
1495             }
1496           }
1497         } else {
1498           gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
1499                   parent_->chand_,
1500                   std::string(keepalive_throttling.value()).c_str());
1501         }
1502       }
1503       // Ignore update if the parent WatcherWrapper has been replaced
1504       // since this callback was scheduled.
1505       if (watcher_ != nullptr) {
1506         last_seen_state_ = state_change.state;
1507         parent_->MaybeUpdateConnectedSubchannel(
1508             std::move(state_change.connected_subchannel));
1509         watcher_->OnConnectivityStateChange(state_change.state);
1510       }
1511     }
1512 
1513     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1514         watcher_;
1515     RefCountedPtr<SubchannelWrapper> parent_;
1516     grpc_connectivity_state last_seen_state_;
1517     WatcherWrapper* replacement_ = nullptr;
1518   };
1519 
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1520   void MaybeUpdateConnectedSubchannel(
1521       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1522     // Update the connected subchannel only if the channel is not shutting
1523     // down.  This is because once the channel is shutting down, we
1524     // ignore picker updates from the LB policy, which means that
1525     // UpdateStateAndPickerLocked() will never process the entries
1526     // in chand_->pending_subchannel_updates_.  So we don't want to add
1527     // entries there that will never be processed, since that would
1528     // leave dangling refs to the channel and prevent its destruction.
1529     grpc_error* disconnect_error = chand_->disconnect_error();
1530     if (disconnect_error != GRPC_ERROR_NONE) return;
1531     // Not shutting down, so do the update.
1532     if (connected_subchannel_ != connected_subchannel) {
1533       connected_subchannel_ = std::move(connected_subchannel);
1534       // Record the new connected subchannel so that it can be updated
1535       // in the data plane mutex the next time the picker is updated.
1536       chand_->pending_subchannel_updates_[Ref(
1537           DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1538     }
1539   }
1540 
1541   ChannelData* chand_;
1542   Subchannel* subchannel_;
1543   absl::optional<std::string> health_check_service_name_;
1544   // Maps from the address of the watcher passed to us by the LB policy
1545   // to the address of the WrapperWatcher that we passed to the underlying
1546   // subchannel.  This is needed so that when the LB policy calls
1547   // CancelConnectivityStateWatch() with its watcher, we know the
1548   // corresponding WrapperWatcher to cancel on the underlying subchannel.
1549   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1550   // To be accessed only in the control plane work_serializer.
1551   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1552   // To be accessed only in the data plane mutex.
1553   RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1554 };
1555 
1556 //
1557 // ChannelData::ExternalConnectivityWatcher
1558 //
1559 
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)1560 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1561     ChannelData* chand, grpc_polling_entity pollent,
1562     grpc_connectivity_state* state, grpc_closure* on_complete,
1563     grpc_closure* watcher_timer_init)
1564     : chand_(chand),
1565       pollent_(pollent),
1566       initial_state_(*state),
1567       state_(state),
1568       on_complete_(on_complete),
1569       watcher_timer_init_(watcher_timer_init) {
1570   grpc_polling_entity_add_to_pollset_set(&pollent_,
1571                                          chand_->interested_parties_);
1572   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1573   {
1574     MutexLock lock(&chand_->external_watchers_mu_);
1575     // Will be deleted when the watch is complete.
1576     GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
1577     // Store a ref to the watcher in the external_watchers_ map.
1578     chand->external_watchers_[on_complete] =
1579         Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
1580   }
1581   // Pass the ref from creating the object to Start().
1582   chand_->work_serializer_->Run(
1583       [this]() {
1584         // The ref is passed to AddWatcherLocked().
1585         AddWatcherLocked();
1586       },
1587       DEBUG_LOCATION);
1588 }
1589 
~ExternalConnectivityWatcher()1590 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1591   grpc_polling_entity_del_from_pollset_set(&pollent_,
1592                                            chand_->interested_parties_);
1593   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1594                            "ExternalConnectivityWatcher");
1595 }
1596 
1597 void ChannelData::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ChannelData * chand,grpc_closure * on_complete,bool cancel)1598     RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
1599                                          grpc_closure* on_complete,
1600                                          bool cancel) {
1601   RefCountedPtr<ExternalConnectivityWatcher> watcher;
1602   {
1603     MutexLock lock(&chand->external_watchers_mu_);
1604     auto it = chand->external_watchers_.find(on_complete);
1605     if (it != chand->external_watchers_.end()) {
1606       watcher = std::move(it->second);
1607       chand->external_watchers_.erase(it);
1608     }
1609   }
1610   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
1611   // the mutex before calling it.
1612   if (watcher != nullptr && cancel) watcher->Cancel();
1613 }
1614 
Notify(grpc_connectivity_state state,const absl::Status &)1615 void ChannelData::ExternalConnectivityWatcher::Notify(
1616     grpc_connectivity_state state, const absl::Status& /* status */) {
1617   bool done = false;
1618   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1619                                    MemoryOrder::RELAXED)) {
1620     return;  // Already done.
1621   }
1622   // Remove external watcher.
1623   chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
1624   // Report new state to the user.
1625   *state_ = state;
1626   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
1627   // Hop back into the work_serializer to clean up.
1628   // Not needed in state SHUTDOWN, because the tracker will
1629   // automatically remove all watchers in that case.
1630   if (state != GRPC_CHANNEL_SHUTDOWN) {
1631     chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1632                                   DEBUG_LOCATION);
1633   }
1634 }
1635 
Cancel()1636 void ChannelData::ExternalConnectivityWatcher::Cancel() {
1637   bool done = false;
1638   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1639                                    MemoryOrder::RELAXED)) {
1640     return;  // Already done.
1641   }
1642   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
1643   // Hop back into the work_serializer to clean up.
1644   chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1645                                 DEBUG_LOCATION);
1646 }
1647 
AddWatcherLocked()1648 void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
1649   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
1650   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1651   chand_->state_tracker_.AddWatcher(
1652       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1653 }
1654 
RemoveWatcherLocked()1655 void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1656   chand_->state_tracker_.RemoveWatcher(this);
1657 }
1658 
1659 //
1660 // ChannelData::ConnectivityWatcherAdder
1661 //
1662 
1663 class ChannelData::ConnectivityWatcherAdder {
1664  public:
ConnectivityWatcherAdder(ChannelData * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1665   ConnectivityWatcherAdder(
1666       ChannelData* chand, grpc_connectivity_state initial_state,
1667       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1668       : chand_(chand),
1669         initial_state_(initial_state),
1670         watcher_(std::move(watcher)) {
1671     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1672     chand_->work_serializer_->Run([this]() { AddWatcherLocked(); },
1673                                   DEBUG_LOCATION);
1674   }
1675 
1676  private:
AddWatcherLocked()1677   void AddWatcherLocked() {
1678     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1679     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1680     delete this;
1681   }
1682 
1683   ChannelData* chand_;
1684   grpc_connectivity_state initial_state_;
1685   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1686 };
1687 
1688 //
1689 // ChannelData::ConnectivityWatcherRemover
1690 //
1691 
1692 class ChannelData::ConnectivityWatcherRemover {
1693  public:
ConnectivityWatcherRemover(ChannelData * chand,AsyncConnectivityStateWatcherInterface * watcher)1694   ConnectivityWatcherRemover(ChannelData* chand,
1695                              AsyncConnectivityStateWatcherInterface* watcher)
1696       : chand_(chand), watcher_(watcher) {
1697     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1698     chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1699                                   DEBUG_LOCATION);
1700   }
1701 
1702  private:
RemoveWatcherLocked()1703   void RemoveWatcherLocked() {
1704     chand_->state_tracker_.RemoveWatcher(watcher_);
1705     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1706                              "ConnectivityWatcherRemover");
1707     delete this;
1708   }
1709 
1710   ChannelData* chand_;
1711   AsyncConnectivityStateWatcherInterface* watcher_;
1712 };
1713 
1714 //
1715 // ChannelData::ClientChannelControlHelper
1716 //
1717 
1718 class ChannelData::ClientChannelControlHelper
1719     : public LoadBalancingPolicy::ChannelControlHelper {
1720  public:
ClientChannelControlHelper(ChannelData * chand)1721   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1722     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1723   }
1724 
~ClientChannelControlHelper()1725   ~ClientChannelControlHelper() override {
1726     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1727                              "ClientChannelControlHelper");
1728   }
1729 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)1730   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1731       ServerAddress address, const grpc_channel_args& args) override {
1732     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
1733     // Determine health check service name.
1734     bool inhibit_health_checking = grpc_channel_arg_get_bool(
1735         grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1736     absl::optional<std::string> health_check_service_name;
1737     if (!inhibit_health_checking) {
1738       health_check_service_name = chand_->health_check_service_name_;
1739     }
1740     // Remove channel args that should not affect subchannel uniqueness.
1741     static const char* args_to_remove[] = {
1742         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1743         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1744     };
1745     // Add channel args needed for the subchannel.
1746     absl::InlinedVector<grpc_arg, 3> args_to_add = {
1747         Subchannel::CreateSubchannelAddressArg(&address.address()),
1748         SubchannelPoolInterface::CreateChannelArg(
1749             chand_->subchannel_pool_.get()),
1750     };
1751     if (address.args() != nullptr) {
1752       for (size_t j = 0; j < address.args()->num_args; ++j) {
1753         args_to_add.emplace_back(address.args()->args[j]);
1754       }
1755     }
1756     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1757         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
1758         args_to_add.data(), args_to_add.size());
1759     gpr_free(args_to_add[0].value.string);
1760     // Create subchannel.
1761     Subchannel* subchannel =
1762         chand_->client_channel_factory_->CreateSubchannel(new_args);
1763     grpc_channel_args_destroy(new_args);
1764     if (subchannel == nullptr) return nullptr;
1765     // Make sure the subchannel has updated keepalive time.
1766     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1767     // Create and return wrapper for the subchannel.
1768     return MakeRefCounted<SubchannelWrapper>(
1769         chand_, subchannel, std::move(health_check_service_name));
1770   }
1771 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1772   void UpdateState(
1773       grpc_connectivity_state state, const absl::Status& status,
1774       std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1775     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1776     grpc_error* disconnect_error = chand_->disconnect_error();
1777     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1778       const char* extra = disconnect_error == GRPC_ERROR_NONE
1779                               ? ""
1780                               : " (ignoring -- channel shutting down)";
1781       gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1782               chand_, ConnectivityStateName(state), status.ToString().c_str(),
1783               picker.get(), extra);
1784     }
1785     // Do update only if not shutting down.
1786     if (disconnect_error == GRPC_ERROR_NONE) {
1787       chand_->UpdateStateAndPickerLocked(state, status, "helper",
1788                                          std::move(picker));
1789     }
1790   }
1791 
RequestReresolution()1792   void RequestReresolution() override {
1793     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1794     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1795       gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1796     }
1797     chand_->resolver_->RequestReresolutionLocked();
1798   }
1799 
AddTraceEvent(TraceSeverity severity,absl::string_view message)1800   void AddTraceEvent(TraceSeverity severity,
1801                      absl::string_view message) override {
1802     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1803     if (chand_->channelz_node_ != nullptr) {
1804       chand_->channelz_node_->AddTraceEvent(
1805           ConvertSeverityEnum(severity),
1806           grpc_slice_from_copied_buffer(message.data(), message.size()));
1807     }
1808   }
1809 
1810  private:
ConvertSeverityEnum(TraceSeverity severity)1811   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1812       TraceSeverity severity) {
1813     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1814     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1815     return channelz::ChannelTrace::Error;
1816   }
1817 
1818   ChannelData* chand_;
1819 };
1820 
1821 //
1822 // ChannelData implementation
1823 //
1824 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1825 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1826                               grpc_channel_element_args* args) {
1827   GPR_ASSERT(args->is_last);
1828   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1829   grpc_error* error = GRPC_ERROR_NONE;
1830   new (elem->channel_data) ChannelData(args, &error);
1831   return error;
1832 }
1833 
Destroy(grpc_channel_element * elem)1834 void ChannelData::Destroy(grpc_channel_element* elem) {
1835   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1836   chand->~ChannelData();
1837 }
1838 
GetEnableRetries(const grpc_channel_args * args)1839 bool GetEnableRetries(const grpc_channel_args* args) {
1840   return grpc_channel_arg_get_bool(
1841       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1842 }
1843 
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1844 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1845   return static_cast<size_t>(grpc_channel_arg_get_integer(
1846       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1847       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1848 }
1849 
GetSubchannelPool(const grpc_channel_args * args)1850 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1851     const grpc_channel_args* args) {
1852   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1853       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1854   if (use_local_subchannel_pool) {
1855     return MakeRefCounted<LocalSubchannelPool>();
1856   }
1857   return GlobalSubchannelPool::instance();
1858 }
1859 
GetChannelzNode(const grpc_channel_args * args)1860 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1861   const grpc_arg* arg =
1862       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1863   if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1864     return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1865   }
1866   return nullptr;
1867 }
1868 
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1869 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1870     : deadline_checking_enabled_(
1871           grpc_deadline_checking_enabled(args->channel_args)),
1872       enable_retries_(GetEnableRetries(args->channel_args)),
1873       per_rpc_retry_buffer_size_(
1874           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1875       owning_stack_(args->channel_stack),
1876       client_channel_factory_(
1877           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1878       channelz_node_(GetChannelzNode(args->channel_args)),
1879       work_serializer_(std::make_shared<WorkSerializer>()),
1880       interested_parties_(grpc_pollset_set_create()),
1881       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1882       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1883       disconnect_error_(GRPC_ERROR_NONE) {
1884   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1885     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1886             this, owning_stack_);
1887   }
1888   // Start backup polling.
1889   grpc_client_channel_start_backup_polling(interested_parties_);
1890   // Check client channel factory.
1891   if (client_channel_factory_ == nullptr) {
1892     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1893         "Missing client channel factory in args for client channel filter");
1894     return;
1895   }
1896   // Get server name to resolve, using proxy mapper if needed.
1897   const char* server_uri = grpc_channel_arg_get_string(
1898       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1899   if (server_uri == nullptr) {
1900     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1901         "server URI channel arg missing or wrong type in client channel "
1902         "filter");
1903     return;
1904   }
1905   // Get default service config.  If none is specified via the client API,
1906   // we use an empty config.
1907   const char* service_config_json = grpc_channel_arg_get_string(
1908       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1909   if (service_config_json == nullptr) service_config_json = "{}";
1910   *error = GRPC_ERROR_NONE;
1911   default_service_config_ =
1912       ServiceConfig::Create(args->channel_args, service_config_json, error);
1913   if (*error != GRPC_ERROR_NONE) {
1914     default_service_config_.reset();
1915     return;
1916   }
1917   absl::StatusOr<URI> uri = URI::Parse(server_uri);
1918   if (uri.ok() && !uri->path().empty()) {
1919     server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1920   }
1921   char* proxy_name = nullptr;
1922   grpc_channel_args* new_args = nullptr;
1923   ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1924                                &new_args);
1925   target_uri_.reset(proxy_name != nullptr ? proxy_name
1926                                           : gpr_strdup(server_uri));
1927   // Strip out service config channel arg, so that it doesn't affect
1928   // subchannel uniqueness when the args flow down to that layer.
1929   const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1930   channel_args_ = grpc_channel_args_copy_and_remove(
1931       new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1932   grpc_channel_args_destroy(new_args);
1933   keepalive_time_ = grpc_channel_args_find_integer(
1934       channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1935       {-1 /* default value, unset */, 1, INT_MAX});
1936   if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1937     std::string error_message =
1938         absl::StrCat("the target uri is not valid: ", target_uri_.get());
1939     *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1940     return;
1941   }
1942   *error = GRPC_ERROR_NONE;
1943 }
1944 
~ChannelData()1945 ChannelData::~ChannelData() {
1946   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1947     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1948   }
1949   DestroyResolverAndLbPolicyLocked();
1950   grpc_channel_args_destroy(channel_args_);
1951   GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1952   // Stop backup polling.
1953   grpc_client_channel_stop_backup_polling(interested_parties_);
1954   grpc_pollset_set_destroy(interested_parties_);
1955   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1956 }
1957 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1958 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1959     const Resolver::Result& resolver_result,
1960     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1961   // Prefer the LB policy config found in the service config.
1962   if (parsed_service_config->parsed_lb_config() != nullptr) {
1963     return parsed_service_config->parsed_lb_config();
1964   }
1965   // Try the deprecated LB policy name from the service config.
1966   // If not, try the setting from channel args.
1967   const char* policy_name = nullptr;
1968   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1969     policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1970   } else {
1971     const grpc_arg* channel_arg =
1972         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1973     policy_name = grpc_channel_arg_get_string(channel_arg);
1974   }
1975   // Use pick_first if nothing was specified and we didn't select grpclb
1976   // above.
1977   if (policy_name == nullptr) policy_name = "pick_first";
1978   // Now that we have the policy name, construct an empty config for it.
1979   Json config_json = Json::Array{Json::Object{
1980       {policy_name, Json::Object{}},
1981   }};
1982   grpc_error* parse_error = GRPC_ERROR_NONE;
1983   auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1984       config_json, &parse_error);
1985   // The policy name came from one of three places:
1986   // - The deprecated loadBalancingPolicy field in the service config,
1987   //   in which case the code in ClientChannelServiceConfigParser
1988   //   already verified that the policy does not require a config.
1989   // - One of the hard-coded values here, all of which are known to not
1990   //   require a config.
1991   // - A channel arg, in which case the application did something that
1992   //   is a misuse of our API.
1993   // In the first two cases, these assertions will always be true.  In
1994   // the last case, this is probably fine for now.
1995   // TODO(roth): If the last case becomes a problem, add better error
1996   // handling here.
1997   GPR_ASSERT(lb_policy_config != nullptr);
1998   GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
1999   return lb_policy_config;
2000 }
2001 
OnResolverResultChangedLocked(Resolver::Result result)2002 void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) {
2003   // Handle race conditions.
2004   if (resolver_ == nullptr) return;
2005   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2006     gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
2007   }
2008   // We only want to trace the address resolution in the follow cases:
2009   // (a) Address resolution resulted in service config change.
2010   // (b) Address resolution that causes number of backends to go from
2011   //     zero to non-zero.
2012   // (c) Address resolution that causes number of backends to go from
2013   //     non-zero to zero.
2014   // (d) Address resolution that causes a new LB policy to be created.
2015   //
2016   // We track a list of strings to eventually be concatenated and traced.
2017   absl::InlinedVector<const char*, 3> trace_strings;
2018   if (result.addresses.empty() && previous_resolution_contained_addresses_) {
2019     trace_strings.push_back("Address list became empty");
2020   } else if (!result.addresses.empty() &&
2021              !previous_resolution_contained_addresses_) {
2022     trace_strings.push_back("Address list became non-empty");
2023   }
2024   previous_resolution_contained_addresses_ = !result.addresses.empty();
2025   // The result of grpc_error_string() is owned by the error itself.
2026   // We're storing that string in trace_strings, so we need to make sure
2027   // that the error lives until we're done with the string.
2028   grpc_error* service_config_error =
2029       GRPC_ERROR_REF(result.service_config_error);
2030   if (service_config_error != GRPC_ERROR_NONE) {
2031     trace_strings.push_back(grpc_error_string(service_config_error));
2032   }
2033   // Choose the service config.
2034   RefCountedPtr<ServiceConfig> service_config;
2035   RefCountedPtr<ConfigSelector> config_selector;
2036   if (service_config_error != GRPC_ERROR_NONE) {
2037     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2038       gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
2039               this, grpc_error_string(service_config_error));
2040     }
2041     // If the service config was invalid, then fallback to the
2042     // previously returned service config.
2043     if (saved_service_config_ != nullptr) {
2044       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2045         gpr_log(GPR_INFO,
2046                 "chand=%p: resolver returned invalid service config. "
2047                 "Continuing to use previous service config.",
2048                 this);
2049       }
2050       service_config = saved_service_config_;
2051       config_selector = saved_config_selector_;
2052     } else {
2053       // We received an invalid service config and we don't have a
2054       // previous service config to fall back to.  Put the channel into
2055       // TRANSIENT_FAILURE.
2056       OnResolverErrorLocked(GRPC_ERROR_REF(service_config_error));
2057       trace_strings.push_back("no valid service config");
2058     }
2059   } else if (result.service_config == nullptr) {
2060     // Resolver did not return any service config.
2061     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2062       gpr_log(GPR_INFO,
2063               "chand=%p: resolver returned no service config. Using default "
2064               "service config for channel.",
2065               this);
2066     }
2067     service_config = default_service_config_;
2068   } else {
2069     // Use ServiceConfig and ConfigSelector returned by resolver.
2070     service_config = result.service_config;
2071     config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
2072   }
2073   if (service_config != nullptr) {
2074     // Extract global config for client channel.
2075     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2076         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2077             service_config->GetGlobalParsedConfig(
2078                 internal::ClientChannelServiceConfigParser::ParserIndex()));
2079     // Choose LB policy config.
2080     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
2081         ChooseLbPolicy(result, parsed_service_config);
2082     // Check if the ServiceConfig has changed.
2083     const bool service_config_changed =
2084         saved_service_config_ == nullptr ||
2085         service_config->json_string() != saved_service_config_->json_string();
2086     // Check if the ConfigSelector has changed.
2087     const bool config_selector_changed = !ConfigSelector::Equals(
2088         saved_config_selector_.get(), config_selector.get());
2089     // If either has changed, apply the global parameters now.
2090     if (service_config_changed || config_selector_changed) {
2091       // Update service config in control plane.
2092       UpdateServiceConfigInControlPlaneLocked(
2093           std::move(service_config), std::move(config_selector),
2094           parsed_service_config, lb_policy_config->name());
2095     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2096       gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
2097     }
2098     // Create or update LB policy, as needed.
2099     CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
2100                                  std::move(result));
2101     if (service_config_changed || config_selector_changed) {
2102       // Start using new service config for calls.
2103       // This needs to happen after the LB policy has been updated, since
2104       // the ConfigSelector may need the LB policy to know about new
2105       // destinations before it can send RPCs to those destinations.
2106       UpdateServiceConfigInDataPlaneLocked();
2107       // TODO(ncteisen): might be worth somehow including a snippet of the
2108       // config in the trace, at the risk of bloating the trace logs.
2109       trace_strings.push_back("Service config changed");
2110     }
2111   }
2112   // Add channel trace event.
2113   if (!trace_strings.empty()) {
2114     std::string message =
2115         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
2116     if (channelz_node_ != nullptr) {
2117       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
2118                                     grpc_slice_from_cpp_string(message));
2119     }
2120   }
2121   GRPC_ERROR_UNREF(service_config_error);
2122 }
2123 
OnResolverErrorLocked(grpc_error * error)2124 void ChannelData::OnResolverErrorLocked(grpc_error* error) {
2125   if (resolver_ == nullptr) {
2126     GRPC_ERROR_UNREF(error);
2127     return;
2128   }
2129   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2130     gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
2131             grpc_error_string(error));
2132   }
2133   // If we already have an LB policy from a previous resolution
2134   // result, then we continue to let it set the connectivity state.
2135   // Otherwise, we go into TRANSIENT_FAILURE.
2136   if (lb_policy_ == nullptr) {
2137     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2138         "Resolver transient failure", &error, 1);
2139     {
2140       MutexLock lock(&resolution_mu_);
2141       // Update resolver transient failure.
2142       GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2143       resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
2144       // Process calls that were queued waiting for the resolver result.
2145       for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2146            call = call->next) {
2147         grpc_call_element* elem = call->elem;
2148         CallData* calld = static_cast<CallData*>(elem->call_data);
2149         grpc_error* error = GRPC_ERROR_NONE;
2150         if (calld->CheckResolutionLocked(elem, &error)) {
2151           calld->AsyncResolutionDone(elem, error);
2152         }
2153       }
2154     }
2155     // Update connectivity state.
2156     UpdateStateAndPickerLocked(
2157         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
2158         "resolver failure",
2159         absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2160             state_error));
2161   }
2162   GRPC_ERROR_UNREF(error);
2163 }
2164 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)2165 void ChannelData::CreateOrUpdateLbPolicyLocked(
2166     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
2167     Resolver::Result result) {
2168   // Construct update.
2169   LoadBalancingPolicy::UpdateArgs update_args;
2170   update_args.addresses = std::move(result.addresses);
2171   update_args.config = std::move(lb_policy_config);
2172   // Remove the config selector from channel args so that we're not holding
2173   // unnecessary refs that cause it to be destroyed somewhere other than in the
2174   // WorkSerializer.
2175   const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
2176   update_args.args =
2177       grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
2178   // Create policy if needed.
2179   if (lb_policy_ == nullptr) {
2180     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
2181   }
2182   // Update the policy.
2183   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2184     gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
2185             lb_policy_.get());
2186   }
2187   lb_policy_->UpdateLocked(std::move(update_args));
2188 }
2189 
2190 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)2191 OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked(
2192     const grpc_channel_args& args) {
2193   LoadBalancingPolicy::Args lb_policy_args;
2194   lb_policy_args.work_serializer = work_serializer_;
2195   lb_policy_args.channel_control_helper =
2196       absl::make_unique<ClientChannelControlHelper>(this);
2197   lb_policy_args.args = &args;
2198   OrphanablePtr<LoadBalancingPolicy> lb_policy =
2199       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
2200                                          &grpc_client_channel_routing_trace);
2201   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2202     gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
2203             lb_policy.get());
2204   }
2205   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2206                                    interested_parties_);
2207   return lb_policy;
2208 }
2209 
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)2210 void ChannelData::AddResolverQueuedCall(ResolverQueuedCall* call,
2211                                         grpc_polling_entity* pollent) {
2212   // Add call to queued calls list.
2213   call->next = resolver_queued_calls_;
2214   resolver_queued_calls_ = call;
2215   // Add call's pollent to channel's interested_parties, so that I/O
2216   // can be done under the call's CQ.
2217   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2218 }
2219 
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)2220 void ChannelData::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
2221                                            grpc_polling_entity* pollent) {
2222   // Remove call's pollent from channel's interested_parties.
2223   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2224   // Remove from queued calls list.
2225   for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
2226        call = &(*call)->next) {
2227     if (*call == to_remove) {
2228       *call = to_remove->next;
2229       return;
2230     }
2231   }
2232 }
2233 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)2234 void ChannelData::UpdateServiceConfigInControlPlaneLocked(
2235     RefCountedPtr<ServiceConfig> service_config,
2236     RefCountedPtr<ConfigSelector> config_selector,
2237     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
2238     const char* lb_policy_name) {
2239   UniquePtr<char> service_config_json(
2240       gpr_strdup(service_config->json_string().c_str()));
2241   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2242     gpr_log(GPR_INFO,
2243             "chand=%p: resolver returned updated service config: \"%s\"", this,
2244             service_config_json.get());
2245   }
2246   // Save service config.
2247   saved_service_config_ = std::move(service_config);
2248   // Update health check service name if needed.
2249   if (health_check_service_name_ !=
2250       parsed_service_config->health_check_service_name()) {
2251     health_check_service_name_ =
2252         parsed_service_config->health_check_service_name();
2253     // Update health check service name used by existing subchannel wrappers.
2254     for (auto* subchannel_wrapper : subchannel_wrappers_) {
2255       subchannel_wrapper->UpdateHealthCheckServiceName(
2256           health_check_service_name_);
2257     }
2258   }
2259   // Swap out the data used by GetChannelInfo().
2260   UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
2261   {
2262     MutexLock lock(&info_mu_);
2263     info_lb_policy_name_ = std::move(lb_policy_name_owned);
2264     info_service_config_json_ = std::move(service_config_json);
2265   }
2266   // Save config selector.
2267   saved_config_selector_ = std::move(config_selector);
2268   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2269     gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
2270             saved_config_selector_.get());
2271   }
2272 }
2273 
UpdateServiceConfigInDataPlaneLocked()2274 void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
2275   // Grab ref to service config.
2276   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
2277   // Grab ref to config selector.  Use default if resolver didn't supply one.
2278   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
2279   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2280     gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
2281             saved_config_selector_.get());
2282   }
2283   if (config_selector == nullptr) {
2284     config_selector =
2285         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
2286   }
2287   // Get retry throttle data from service config.
2288   const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2289       static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2290           saved_service_config_->GetGlobalParsedConfig(
2291               internal::ClientChannelServiceConfigParser::ParserIndex()));
2292   absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
2293       retry_throttle_config = parsed_service_config->retry_throttling();
2294   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
2295   if (retry_throttle_config.has_value()) {
2296     retry_throttle_data = internal::ServerRetryThrottleMap::GetDataForServer(
2297         server_name_, retry_throttle_config.value().max_milli_tokens,
2298         retry_throttle_config.value().milli_token_ratio);
2299   }
2300   // Construct per-LB filter stack.
2301   std::vector<const grpc_channel_filter*> filters =
2302       config_selector->GetFilters();
2303   filters.push_back(&kDynamicTerminationFilterVtable);
2304   absl::InlinedVector<grpc_arg, 2> args_to_add;
2305   args_to_add.push_back(grpc_channel_arg_pointer_create(
2306       const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
2307       &kChannelDataArgPointerVtable));
2308   if (retry_throttle_data != nullptr) {
2309     args_to_add.push_back(grpc_channel_arg_pointer_create(
2310         const_cast<char*>(GRPC_ARG_RETRY_THROTTLE_DATA),
2311         retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
2312   }
2313   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
2314       channel_args_, args_to_add.data(), args_to_add.size());
2315   RefCountedPtr<DynamicFilters> dynamic_filters =
2316       DynamicFilters::Create(new_args, std::move(filters));
2317   GPR_ASSERT(dynamic_filters != nullptr);
2318   grpc_channel_args_destroy(new_args);
2319   // Grab data plane lock to update service config.
2320   //
2321   // We defer unreffing the old values (and deallocating memory) until
2322   // after releasing the lock to keep the critical section small.
2323   std::set<grpc_call_element*> calls_pending_resolver_result;
2324   {
2325     MutexLock lock(&resolution_mu_);
2326     GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2327     resolver_transient_failure_error_ = GRPC_ERROR_NONE;
2328     // Update service config.
2329     received_service_config_data_ = true;
2330     // Old values will be unreffed after lock is released.
2331     service_config_.swap(service_config);
2332     config_selector_.swap(config_selector);
2333     dynamic_filters_.swap(dynamic_filters);
2334     // Process calls that were queued waiting for the resolver result.
2335     for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2336          call = call->next) {
2337       grpc_call_element* elem = call->elem;
2338       CallData* calld = static_cast<CallData*>(elem->call_data);
2339       grpc_error* error = GRPC_ERROR_NONE;
2340       if (calld->CheckResolutionLocked(elem, &error)) {
2341         calld->AsyncResolutionDone(elem, error);
2342       }
2343     }
2344   }
2345   // Old values will be unreffed after lock is released when they go out
2346   // of scope.
2347 }
2348 
CreateResolverLocked()2349 void ChannelData::CreateResolverLocked() {
2350   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2351     gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
2352   }
2353   resolver_ = ResolverRegistry::CreateResolver(
2354       target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
2355       absl::make_unique<ResolverResultHandler>(this));
2356   // Since the validity of the args was checked when the channel was created,
2357   // CreateResolver() must return a non-null result.
2358   GPR_ASSERT(resolver_ != nullptr);
2359   UpdateStateAndPickerLocked(
2360       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
2361       absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
2362   resolver_->StartLocked();
2363   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2364     gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
2365   }
2366 }
2367 
DestroyResolverAndLbPolicyLocked()2368 void ChannelData::DestroyResolverAndLbPolicyLocked() {
2369   if (resolver_ != nullptr) {
2370     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2371       gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
2372               resolver_.get());
2373     }
2374     resolver_.reset();
2375     if (lb_policy_ != nullptr) {
2376       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2377         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
2378                 lb_policy_.get());
2379       }
2380       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
2381                                        interested_parties_);
2382       lb_policy_.reset();
2383     }
2384   }
2385 }
2386 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)2387 void ChannelData::UpdateStateAndPickerLocked(
2388     grpc_connectivity_state state, const absl::Status& status,
2389     const char* reason,
2390     std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
2391   // Special case for IDLE and SHUTDOWN states.
2392   if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2393     saved_service_config_.reset();
2394     saved_config_selector_.reset();
2395     // Acquire resolution lock to update config selector and associated state.
2396     // To minimize lock contention, we wait to unref these objects until
2397     // after we release the lock.
2398     RefCountedPtr<ServiceConfig> service_config_to_unref;
2399     RefCountedPtr<ConfigSelector> config_selector_to_unref;
2400     RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
2401     {
2402       MutexLock lock(&resolution_mu_);
2403       received_service_config_data_ = false;
2404       service_config_to_unref = std::move(service_config_);
2405       config_selector_to_unref = std::move(config_selector_);
2406       dynamic_filters_to_unref = std::move(dynamic_filters_);
2407     }
2408   }
2409   // Update connectivity state.
2410   state_tracker_.SetState(state, status, reason);
2411   if (channelz_node_ != nullptr) {
2412     channelz_node_->SetConnectivityState(state);
2413     channelz_node_->AddTraceEvent(
2414         channelz::ChannelTrace::Severity::Info,
2415         grpc_slice_from_static_string(
2416             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
2417                 state)));
2418   }
2419   // Grab data plane lock to do subchannel updates and update the picker.
2420   //
2421   // Note that we want to minimize the work done while holding the data
2422   // plane lock, to keep the critical section small.  So, for all of the
2423   // objects that we might wind up unreffing here, we actually hold onto
2424   // the refs until after we release the lock, and then unref them at
2425   // that point.  This includes the following:
2426   // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
2427   // - ownership of the existing picker in picker_
2428   {
2429     MutexLock lock(&data_plane_mu_);
2430     // Handle subchannel updates.
2431     for (auto& p : pending_subchannel_updates_) {
2432       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2433         gpr_log(GPR_INFO,
2434                 "chand=%p: updating subchannel wrapper %p data plane "
2435                 "connected_subchannel to %p",
2436                 this, p.first.get(), p.second.get());
2437       }
2438       // Note: We do not remove the entry from pending_subchannel_updates_
2439       // here, since this would unref the subchannel wrapper; instead,
2440       // we wait until we've released the lock to clear the map.
2441       p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
2442     }
2443     // Swap out the picker.
2444     // Note: Original value will be destroyed after the lock is released.
2445     picker_.swap(picker);
2446     // Re-process queued picks.
2447     for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
2448          call = call->next) {
2449       grpc_error* error = GRPC_ERROR_NONE;
2450       if (call->lb_call->PickSubchannelLocked(&error)) {
2451         call->lb_call->AsyncPickDone(error);
2452       }
2453     }
2454   }
2455   // Clear the pending update map after releasing the lock, to keep the
2456   // critical section small.
2457   pending_subchannel_updates_.clear();
2458 }
2459 
DoPingLocked(grpc_transport_op * op)2460 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
2461   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
2462     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
2463   }
2464   LoadBalancingPolicy::PickResult result =
2465       picker_->Pick(LoadBalancingPolicy::PickArgs());
2466   ConnectedSubchannel* connected_subchannel = nullptr;
2467   if (result.subchannel != nullptr) {
2468     SubchannelWrapper* subchannel =
2469         static_cast<SubchannelWrapper*>(result.subchannel.get());
2470     connected_subchannel = subchannel->connected_subchannel();
2471   }
2472   if (connected_subchannel != nullptr) {
2473     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
2474   } else {
2475     if (result.error == GRPC_ERROR_NONE) {
2476       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2477           "LB policy dropped call on ping");
2478     }
2479   }
2480   return result.error;
2481 }
2482 
StartTransportOpLocked(grpc_transport_op * op)2483 void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
2484   // Connectivity watch.
2485   if (op->start_connectivity_watch != nullptr) {
2486     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
2487                               std::move(op->start_connectivity_watch));
2488   }
2489   if (op->stop_connectivity_watch != nullptr) {
2490     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
2491   }
2492   // Ping.
2493   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2494     grpc_error* error = DoPingLocked(op);
2495     if (error != GRPC_ERROR_NONE) {
2496       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
2497                    GRPC_ERROR_REF(error));
2498       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
2499     }
2500     op->bind_pollset = nullptr;
2501     op->send_ping.on_initiate = nullptr;
2502     op->send_ping.on_ack = nullptr;
2503   }
2504   // Reset backoff.
2505   if (op->reset_connect_backoff) {
2506     if (lb_policy_ != nullptr) {
2507       lb_policy_->ResetBackoffLocked();
2508     }
2509   }
2510   // Disconnect or enter IDLE.
2511   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
2512     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2513       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
2514               grpc_error_string(op->disconnect_with_error));
2515     }
2516     DestroyResolverAndLbPolicyLocked();
2517     intptr_t value;
2518     if (grpc_error_get_int(op->disconnect_with_error,
2519                            GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
2520         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
2521       if (disconnect_error() == GRPC_ERROR_NONE) {
2522         // Enter IDLE state.
2523         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
2524                                    "channel entering IDLE", nullptr);
2525       }
2526       GRPC_ERROR_UNREF(op->disconnect_with_error);
2527     } else {
2528       // Disconnect.
2529       GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
2530                  GRPC_ERROR_NONE);
2531       disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
2532       UpdateStateAndPickerLocked(
2533           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
2534           absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2535               GRPC_ERROR_REF(op->disconnect_with_error)));
2536     }
2537   }
2538   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
2539   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
2540 }
2541 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)2542 void ChannelData::StartTransportOp(grpc_channel_element* elem,
2543                                    grpc_transport_op* op) {
2544   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2545   GPR_ASSERT(op->set_accept_stream == false);
2546   // Handle bind_pollset.
2547   if (op->bind_pollset != nullptr) {
2548     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
2549   }
2550   // Pop into control plane work_serializer for remaining ops.
2551   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
2552   chand->work_serializer_->Run(
2553       [chand, op]() { chand->StartTransportOpLocked(op); }, DEBUG_LOCATION);
2554 }
2555 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)2556 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
2557                                  const grpc_channel_info* info) {
2558   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2559   MutexLock lock(&chand->info_mu_);
2560   if (info->lb_policy_name != nullptr) {
2561     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
2562   }
2563   if (info->service_config_json != nullptr) {
2564     *info->service_config_json =
2565         gpr_strdup(chand->info_service_config_json_.get());
2566   }
2567 }
2568 
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)2569 void ChannelData::AddLbQueuedCall(LbQueuedCall* call,
2570                                   grpc_polling_entity* pollent) {
2571   // Add call to queued picks list.
2572   call->next = lb_queued_calls_;
2573   lb_queued_calls_ = call;
2574   // Add call's pollent to channel's interested_parties, so that I/O
2575   // can be done under the call's CQ.
2576   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2577 }
2578 
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)2579 void ChannelData::RemoveLbQueuedCall(LbQueuedCall* to_remove,
2580                                      grpc_polling_entity* pollent) {
2581   // Remove call's pollent from channel's interested_parties.
2582   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2583   // Remove from queued picks list.
2584   for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
2585        call = &(*call)->next) {
2586     if (*call == to_remove) {
2587       *call = to_remove->next;
2588       return;
2589     }
2590   }
2591 }
2592 
2593 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const2594 ChannelData::GetConnectedSubchannelInDataPlane(
2595     SubchannelInterface* subchannel) const {
2596   SubchannelWrapper* subchannel_wrapper =
2597       static_cast<SubchannelWrapper*>(subchannel);
2598   ConnectedSubchannel* connected_subchannel =
2599       subchannel_wrapper->connected_subchannel_in_data_plane();
2600   if (connected_subchannel == nullptr) return nullptr;
2601   return connected_subchannel->Ref();
2602 }
2603 
TryToConnectLocked()2604 void ChannelData::TryToConnectLocked() {
2605   if (lb_policy_ != nullptr) {
2606     lb_policy_->ExitIdleLocked();
2607   } else if (resolver_ == nullptr) {
2608     CreateResolverLocked();
2609   }
2610   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2611 }
2612 
CheckConnectivityState(bool try_to_connect)2613 grpc_connectivity_state ChannelData::CheckConnectivityState(
2614     bool try_to_connect) {
2615   grpc_connectivity_state out = state_tracker_.state();
2616   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2617     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2618     work_serializer_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION);
2619   }
2620   return out;
2621 }
2622 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2623 void ChannelData::AddConnectivityWatcher(
2624     grpc_connectivity_state initial_state,
2625     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2626   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2627 }
2628 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2629 void ChannelData::RemoveConnectivityWatcher(
2630     AsyncConnectivityStateWatcherInterface* watcher) {
2631   new ConnectivityWatcherRemover(this, watcher);
2632 }
2633 
2634 //
2635 // CallData implementation
2636 //
2637 
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)2638 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
2639                    const grpc_call_element_args& args)
2640     : deadline_state_(elem, args,
2641                       GPR_LIKELY(chand.deadline_checking_enabled())
2642                           ? args.deadline
2643                           : GRPC_MILLIS_INF_FUTURE),
2644       path_(grpc_slice_ref_internal(args.path)),
2645       call_start_time_(args.start_time),
2646       deadline_(args.deadline),
2647       arena_(args.arena),
2648       owning_call_(args.call_stack),
2649       call_combiner_(args.call_combiner),
2650       call_context_(args.context) {
2651   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2652     gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
2653   }
2654 }
2655 
~CallData()2656 CallData::~CallData() {
2657   grpc_slice_unref_internal(path_);
2658   GRPC_ERROR_UNREF(cancel_error_);
2659   // Make sure there are no remaining pending batches.
2660   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2661     GPR_ASSERT(pending_batches_[i] == nullptr);
2662   }
2663 }
2664 
Init(grpc_call_element * elem,const grpc_call_element_args * args)2665 grpc_error* CallData::Init(grpc_call_element* elem,
2666                            const grpc_call_element_args* args) {
2667   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2668   new (elem->call_data) CallData(elem, *chand, *args);
2669   return GRPC_ERROR_NONE;
2670 }
2671 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2672 void CallData::Destroy(grpc_call_element* elem,
2673                        const grpc_call_final_info* /*final_info*/,
2674                        grpc_closure* then_schedule_closure) {
2675   CallData* calld = static_cast<CallData*>(elem->call_data);
2676   RefCountedPtr<DynamicFilters::Call> dynamic_call =
2677       std::move(calld->dynamic_call_);
2678   calld->~CallData();
2679   if (GPR_LIKELY(dynamic_call != nullptr)) {
2680     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2681   } else {
2682     // TODO(yashkt) : This can potentially be a Closure::Run
2683     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
2684   }
2685 }
2686 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2687 void CallData::StartTransportStreamOpBatch(
2688     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2689   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2690   CallData* calld = static_cast<CallData*>(elem->call_data);
2691   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2692   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2693     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2694   }
2695   // Intercept recv_initial_metadata for config selector on-committed callback.
2696   if (batch->recv_initial_metadata) {
2697     calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
2698   }
2699   // If we've previously been cancelled, immediately fail any new batches.
2700   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2701     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2702       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2703               chand, calld, grpc_error_string(calld->cancel_error_));
2704     }
2705     // Note: This will release the call combiner.
2706     grpc_transport_stream_op_batch_finish_with_failure(
2707         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2708     return;
2709   }
2710   // Handle cancellation.
2711   if (GPR_UNLIKELY(batch->cancel_stream)) {
2712     // Stash a copy of cancel_error in our call data, so that we can use
2713     // it for subsequent operations.  This ensures that if the call is
2714     // cancelled before any batches are passed down (e.g., if the deadline
2715     // is in the past when the call starts), we can return the right
2716     // error to the caller when the first batch does get passed down.
2717     GRPC_ERROR_UNREF(calld->cancel_error_);
2718     calld->cancel_error_ =
2719         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2720     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2721       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2722               calld, grpc_error_string(calld->cancel_error_));
2723     }
2724     // If we do not have a dynamic call (i.e., name resolution has not
2725     // yet completed), fail all pending batches.  Otherwise, send the
2726     // cancellation down to the dynamic call.
2727     if (calld->dynamic_call_ == nullptr) {
2728       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2729                                 NoYieldCallCombiner);
2730       // Note: This will release the call combiner.
2731       grpc_transport_stream_op_batch_finish_with_failure(
2732           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2733     } else {
2734       // Note: This will release the call combiner.
2735       calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2736     }
2737     return;
2738   }
2739   // Add the batch to the pending list.
2740   calld->PendingBatchesAdd(elem, batch);
2741   // Check if we've already created a dynamic call.
2742   // Note that once we have done so, we do not need to acquire the channel's
2743   // resolution mutex, which is more efficient (especially for streaming calls).
2744   if (calld->dynamic_call_ != nullptr) {
2745     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2746       gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2747               chand, calld, calld->dynamic_call_.get());
2748     }
2749     calld->PendingBatchesResume(elem);
2750     return;
2751   }
2752   // We do not yet have a dynamic call.
2753   // For batches containing a send_initial_metadata op, acquire the
2754   // channel's resolution mutex to apply the service config to the call,
2755   // after which we will create a dynamic call.
2756   if (GPR_LIKELY(batch->send_initial_metadata)) {
2757     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2758       gpr_log(GPR_INFO,
2759               "chand=%p calld=%p: grabbing resolution mutex to apply service "
2760               "config",
2761               chand, calld);
2762     }
2763     CheckResolution(elem, GRPC_ERROR_NONE);
2764   } else {
2765     // For all other batches, release the call combiner.
2766     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2767       gpr_log(GPR_INFO,
2768               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2769               calld);
2770     }
2771     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2772                             "batch does not include send_initial_metadata");
2773   }
2774 }
2775 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2776 void CallData::SetPollent(grpc_call_element* elem,
2777                           grpc_polling_entity* pollent) {
2778   CallData* calld = static_cast<CallData*>(elem->call_data);
2779   calld->pollent_ = pollent;
2780 }
2781 
2782 //
2783 // pending_batches management
2784 //
2785 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2786 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2787   // Note: It is important the send_initial_metadata be the first entry
2788   // here, since the code in pick_subchannel_locked() assumes it will be.
2789   if (batch->send_initial_metadata) return 0;
2790   if (batch->send_message) return 1;
2791   if (batch->send_trailing_metadata) return 2;
2792   if (batch->recv_initial_metadata) return 3;
2793   if (batch->recv_message) return 4;
2794   if (batch->recv_trailing_metadata) return 5;
2795   GPR_UNREACHABLE_CODE(return (size_t)-1);
2796 }
2797 
2798 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2799 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2800                                  grpc_transport_stream_op_batch* batch) {
2801   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802   const size_t idx = GetBatchIndex(batch);
2803   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804     gpr_log(GPR_INFO,
2805             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2806             this, idx);
2807   }
2808   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2809   GPR_ASSERT(pending == nullptr);
2810   pending = batch;
2811 }
2812 
2813 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2814 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2815   grpc_transport_stream_op_batch* batch =
2816       static_cast<grpc_transport_stream_op_batch*>(arg);
2817   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2818   // Note: This will release the call combiner.
2819   grpc_transport_stream_op_batch_finish_with_failure(
2820       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2821 }
2822 
2823 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2824 void CallData::PendingBatchesFail(
2825     grpc_call_element* elem, grpc_error* error,
2826     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2827   GPR_ASSERT(error != GRPC_ERROR_NONE);
2828   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2829     size_t num_batches = 0;
2830     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831       if (pending_batches_[i] != nullptr) ++num_batches;
2832     }
2833     gpr_log(GPR_INFO,
2834             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2835             elem->channel_data, this, num_batches, grpc_error_string(error));
2836   }
2837   CallCombinerClosureList closures;
2838   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2839     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2840     if (batch != nullptr) {
2841       batch->handler_private.extra_arg = this;
2842       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2843                         FailPendingBatchInCallCombiner, batch,
2844                         grpc_schedule_on_exec_ctx);
2845       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2846                    "PendingBatchesFail");
2847       batch = nullptr;
2848     }
2849   }
2850   if (yield_call_combiner_predicate(closures)) {
2851     closures.RunClosures(call_combiner_);
2852   } else {
2853     closures.RunClosuresWithoutYielding(call_combiner_);
2854   }
2855   GRPC_ERROR_UNREF(error);
2856 }
2857 
2858 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)2859 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2860                                                 grpc_error* /*ignored*/) {
2861   grpc_transport_stream_op_batch* batch =
2862       static_cast<grpc_transport_stream_op_batch*>(arg);
2863   auto* elem =
2864       static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2865   auto* calld = static_cast<CallData*>(elem->call_data);
2866   // Note: This will release the call combiner.
2867   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2868 }
2869 
2870 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2871 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2872   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2873   // Retries not enabled; send down batches as-is.
2874   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2875     size_t num_batches = 0;
2876     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2877       if (pending_batches_[i] != nullptr) ++num_batches;
2878     }
2879     gpr_log(GPR_INFO,
2880             "chand=%p calld=%p: starting %" PRIuPTR
2881             " pending batches on dynamic_call=%p",
2882             chand, this, num_batches, dynamic_call_.get());
2883   }
2884   CallCombinerClosureList closures;
2885   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2886     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2887     if (batch != nullptr) {
2888       batch->handler_private.extra_arg = elem;
2889       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2890                         ResumePendingBatchInCallCombiner, batch, nullptr);
2891       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2892                    "PendingBatchesResume");
2893       batch = nullptr;
2894     }
2895   }
2896   // Note: This will release the call combiner.
2897   closures.RunClosures(call_combiner_);
2898 }
2899 
2900 //
2901 // name resolution
2902 //
2903 
2904 // A class to handle the call combiner cancellation callback for a
2905 // queued pick.
2906 class CallData::ResolverQueuedCallCanceller {
2907  public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2908   explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2909     auto* calld = static_cast<CallData*>(elem->call_data);
2910     GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2911     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2912                       grpc_schedule_on_exec_ctx);
2913     calld->call_combiner_->SetNotifyOnCancel(&closure_);
2914   }
2915 
2916  private:
CancelLocked(void * arg,grpc_error * error)2917   static void CancelLocked(void* arg, grpc_error* error) {
2918     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2919     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
2920     auto* calld = static_cast<CallData*>(self->elem_->call_data);
2921     {
2922       MutexLock lock(chand->resolution_mu());
2923       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2924         gpr_log(GPR_INFO,
2925                 "chand=%p calld=%p: cancelling resolver queued pick: "
2926                 "error=%s self=%p calld->resolver_pick_canceller=%p",
2927                 chand, calld, grpc_error_string(error), self,
2928                 calld->resolver_call_canceller_);
2929       }
2930       if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2931         // Remove pick from list of queued picks.
2932         calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2933         // Fail pending batches on the call.
2934         calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2935                                   YieldCallCombinerIfPendingBatchesFound);
2936       }
2937     }
2938     GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2939     delete self;
2940   }
2941 
2942   grpc_call_element* elem_;
2943   grpc_closure closure_;
2944 };
2945 
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2946 void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2947     grpc_call_element* elem) {
2948   if (!queued_pending_resolver_result_) return;
2949   auto* chand = static_cast<ChannelData*>(elem->channel_data);
2950   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2951     gpr_log(GPR_INFO,
2952             "chand=%p calld=%p: removing from resolver queued picks list",
2953             chand, this);
2954   }
2955   chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2956   queued_pending_resolver_result_ = false;
2957   // Lame the call combiner canceller.
2958   resolver_call_canceller_ = nullptr;
2959 }
2960 
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2961 void CallData::MaybeAddCallToResolverQueuedCallsLocked(
2962     grpc_call_element* elem) {
2963   if (queued_pending_resolver_result_) return;
2964   auto* chand = static_cast<ChannelData*>(elem->channel_data);
2965   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2966     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2967             chand, this);
2968   }
2969   queued_pending_resolver_result_ = true;
2970   resolver_queued_call_.elem = elem;
2971   chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2972   // Register call combiner cancellation callback.
2973   resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2974 }
2975 
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2976 grpc_error* CallData::ApplyServiceConfigToCallLocked(
2977     grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2978   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2979   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2980     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2981             chand, this);
2982   }
2983   ConfigSelector* config_selector = chand->config_selector();
2984   if (config_selector != nullptr) {
2985     // Use the ConfigSelector to determine the config for the call.
2986     ConfigSelector::CallConfig call_config =
2987         config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2988     if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2989     on_call_committed_ = std::move(call_config.on_call_committed);
2990     // Create a ServiceConfigCallData for the call.  This stores a ref to the
2991     // ServiceConfig and caches the right set of parsed configs to use for
2992     // the call.  The MethodConfig will store itself in the call context,
2993     // so that it can be accessed by filters in the subchannel, and it
2994     // will be cleaned up when the call ends.
2995     auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2996         std::move(call_config.service_config), call_config.method_configs,
2997         std::move(call_config.call_attributes), call_context_);
2998     // Apply our own method params to the call.
2999     auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
3000         service_config_call_data->GetMethodParsedConfig(
3001             internal::ClientChannelServiceConfigParser::ParserIndex()));
3002     if (method_params != nullptr) {
3003       // If the deadline from the service config is shorter than the one
3004       // from the client API, reset the deadline timer.
3005       if (chand->deadline_checking_enabled() && method_params->timeout() != 0) {
3006         const grpc_millis per_method_deadline =
3007             grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3008             method_params->timeout();
3009         if (per_method_deadline < deadline_) {
3010           deadline_ = per_method_deadline;
3011           grpc_deadline_state_reset(elem, deadline_);
3012         }
3013       }
3014       // If the service config set wait_for_ready and the application
3015       // did not explicitly set it, use the value from the service config.
3016       uint32_t* send_initial_metadata_flags =
3017           &pending_batches_[0]
3018                ->payload->send_initial_metadata.send_initial_metadata_flags;
3019       if (method_params->wait_for_ready().has_value() &&
3020           !(*send_initial_metadata_flags &
3021             GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3022         if (method_params->wait_for_ready().value()) {
3023           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3024         } else {
3025           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3026         }
3027       }
3028     }
3029     // Set the dynamic filter stack.
3030     dynamic_filters_ = chand->dynamic_filters();
3031   }
3032   return GRPC_ERROR_NONE;
3033 }
3034 
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error * error)3035 void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback(
3036     void* arg, grpc_error* error) {
3037   auto* self = static_cast<CallData*>(arg);
3038   if (self->on_call_committed_ != nullptr) {
3039     self->on_call_committed_();
3040     self->on_call_committed_ = nullptr;
3041   }
3042   // Chain to original callback.
3043   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3044                GRPC_ERROR_REF(error));
3045 }
3046 
3047 // TODO(roth): Consider not intercepting this callback unless we
3048 // actually need to, if this causes a performance problem.
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)3049 void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
3050     grpc_transport_stream_op_batch* batch) {
3051   original_recv_initial_metadata_ready_ =
3052       batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3053   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
3054                     RecvInitialMetadataReadyForConfigSelectorCommitCallback,
3055                     this, nullptr);
3056   batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3057       &recv_initial_metadata_ready_;
3058 }
3059 
AsyncResolutionDone(grpc_call_element * elem,grpc_error * error)3060 void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) {
3061   GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
3062   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
3063 }
3064 
ResolutionDone(void * arg,grpc_error * error)3065 void CallData::ResolutionDone(void* arg, grpc_error* error) {
3066   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3067   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3068   CallData* calld = static_cast<CallData*>(elem->call_data);
3069   if (error != GRPC_ERROR_NONE) {
3070     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3071       gpr_log(GPR_INFO,
3072               "chand=%p calld=%p: error applying config to call: error=%s",
3073               chand, calld, grpc_error_string(error));
3074     }
3075     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3076     return;
3077   }
3078   calld->CreateDynamicCall(elem);
3079 }
3080 
CheckResolution(void * arg,grpc_error * error)3081 void CallData::CheckResolution(void* arg, grpc_error* error) {
3082   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3083   CallData* calld = static_cast<CallData*>(elem->call_data);
3084   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3085   bool resolution_complete;
3086   {
3087     MutexLock lock(chand->resolution_mu());
3088     resolution_complete = calld->CheckResolutionLocked(elem, &error);
3089   }
3090   if (resolution_complete) {
3091     ResolutionDone(elem, error);
3092     GRPC_ERROR_UNREF(error);
3093   }
3094 }
3095 
CheckResolutionLocked(grpc_call_element * elem,grpc_error ** error)3096 bool CallData::CheckResolutionLocked(grpc_call_element* elem,
3097                                      grpc_error** error) {
3098   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3099   // If we're still in IDLE, we need to start resolving.
3100   if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
3101     // Bounce into the control plane work serializer to start resolving,
3102     // in case we are still in IDLE state.  Since we are holding on to the
3103     // resolution mutex here, we offload it on the ExecCtx so that we don't
3104     // deadlock with ourselves.
3105     GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked");
3106     ExecCtx::Run(
3107         DEBUG_LOCATION,
3108         GRPC_CLOSURE_CREATE(
3109             [](void* arg, grpc_error* /*error*/) {
3110               auto* chand = static_cast<ChannelData*>(arg);
3111               chand->work_serializer()->Run(
3112                   [chand]() {
3113                     chand->CheckConnectivityState(/*try_to_connect=*/true);
3114                     GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(),
3115                                              "CheckResolutionLocked");
3116                   },
3117                   DEBUG_LOCATION);
3118             },
3119             chand, nullptr),
3120         GRPC_ERROR_NONE);
3121   }
3122   // Get send_initial_metadata batch and flags.
3123   auto& send_initial_metadata =
3124       pending_batches_[0]->payload->send_initial_metadata;
3125   grpc_metadata_batch* initial_metadata_batch =
3126       send_initial_metadata.send_initial_metadata;
3127   const uint32_t send_initial_metadata_flags =
3128       send_initial_metadata.send_initial_metadata_flags;
3129   // If we don't yet have a resolver result, we need to queue the call
3130   // until we get one.
3131   if (GPR_UNLIKELY(!chand->received_service_config_data())) {
3132     // If the resolver returned transient failure before returning the
3133     // first service config, fail any non-wait_for_ready calls.
3134     grpc_error* resolver_error = chand->resolver_transient_failure_error();
3135     if (resolver_error != GRPC_ERROR_NONE &&
3136         (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
3137             0) {
3138       MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3139       *error = GRPC_ERROR_REF(resolver_error);
3140       return true;
3141     }
3142     // Either the resolver has not yet returned a result, or it has
3143     // returned transient failure but the call is wait_for_ready.  In
3144     // either case, queue the call.
3145     MaybeAddCallToResolverQueuedCallsLocked(elem);
3146     return false;
3147   }
3148   // Apply service config to call if not yet applied.
3149   if (GPR_LIKELY(!service_config_applied_)) {
3150     service_config_applied_ = true;
3151     *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
3152   }
3153   MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3154   return true;
3155 }
3156 
CreateDynamicCall(grpc_call_element * elem)3157 void CallData::CreateDynamicCall(grpc_call_element* elem) {
3158   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3159   DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
3160                                      pollent_,
3161                                      path_,
3162                                      call_start_time_,
3163                                      deadline_,
3164                                      arena_,
3165                                      call_context_,
3166                                      call_combiner_};
3167   grpc_error* error = GRPC_ERROR_NONE;
3168   DynamicFilters* channel_stack = args.channel_stack.get();
3169   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3170     gpr_log(
3171         GPR_INFO,
3172         "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
3173         chand, this, channel_stack);
3174   }
3175   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
3176   if (error != GRPC_ERROR_NONE) {
3177     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3178       gpr_log(GPR_INFO,
3179               "chand=%p calld=%p: failed to create dynamic call: error=%s",
3180               chand, this, grpc_error_string(error));
3181     }
3182     PendingBatchesFail(elem, error, YieldCallCombiner);
3183     return;
3184   }
3185   PendingBatchesResume(elem);
3186 }
3187 
3188 //
3189 // RetryingCall implementation
3190 //
3191 
3192 // Retry support:
3193 //
3194 // In order to support retries, we act as a proxy for stream op batches.
3195 // When we get a batch from the surface, we add it to our list of pending
3196 // batches, and we then use those batches to construct separate "child"
3197 // batches to be started on the subchannel call.  When the child batches
3198 // return, we then decide which pending batches have been completed and
3199 // schedule their callbacks accordingly.  If a subchannel call fails and
3200 // we want to retry it, we do a new pick and start again, constructing
3201 // new "child" batches for the new subchannel call.
3202 //
3203 // Note that retries are committed when receiving data from the server
3204 // (except for Trailers-Only responses).  However, there may be many
3205 // send ops started before receiving any data, so we may have already
3206 // completed some number of send ops (and returned the completions up to
3207 // the surface) by the time we realize that we need to retry.  To deal
3208 // with this, we cache data for send ops, so that we can replay them on a
3209 // different subchannel call even after we have completed the original
3210 // batches.
3211 //
3212 // There are two sets of data to maintain:
3213 // - In call_data (in the parent channel), we maintain a list of pending
3214 //   ops and cached data for send ops.
3215 // - In the subchannel call, we maintain state to indicate what ops have
3216 //   already been sent down to that call.
3217 //
3218 // When constructing the "child" batches, we compare those two sets of
3219 // data to see which batches need to be sent to the subchannel call.
3220 
3221 // TODO(roth): In subsequent PRs:
3222 // - add support for transparent retries (including initial metadata)
3223 // - figure out how to record stats in census for retries
3224 //   (census filter is on top of this one)
3225 // - add census stats for retries
3226 
RetryingCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,const ClientChannelMethodParsedConfig::RetryPolicy * retry_policy)3227 RetryingCall::RetryingCall(
3228     ChannelData* chand, const grpc_call_element_args& args,
3229     grpc_polling_entity* pollent,
3230     RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
3231     const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy)
3232     : chand_(chand),
3233       pollent_(pollent),
3234       retry_throttle_data_(std::move(retry_throttle_data)),
3235       retry_policy_(retry_policy),
3236       retry_backoff_(
3237           BackOff::Options()
3238               .set_initial_backoff(
3239                   retry_policy_ == nullptr ? 0 : retry_policy_->initial_backoff)
3240               .set_multiplier(retry_policy_ == nullptr
3241                                   ? 0
3242                                   : retry_policy_->backoff_multiplier)
3243               .set_jitter(RETRY_BACKOFF_JITTER)
3244               .set_max_backoff(
3245                   retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff)),
3246       path_(grpc_slice_ref_internal(args.path)),
3247       call_start_time_(args.start_time),
3248       deadline_(args.deadline),
3249       arena_(args.arena),
3250       owning_call_(args.call_stack),
3251       call_combiner_(args.call_combiner),
3252       call_context_(args.context),
3253       pending_send_initial_metadata_(false),
3254       pending_send_message_(false),
3255       pending_send_trailing_metadata_(false),
3256       enable_retries_(true),
3257       retry_committed_(false),
3258       last_attempt_got_server_pushback_(false) {}
3259 
~RetryingCall()3260 RetryingCall::~RetryingCall() {
3261   grpc_slice_unref_internal(path_);
3262   GRPC_ERROR_UNREF(cancel_error_);
3263   // Make sure there are no remaining pending batches.
3264   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3265     GPR_ASSERT(pending_batches_[i].batch == nullptr);
3266   }
3267 }
3268 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3269 void RetryingCall::StartTransportStreamOpBatch(
3270     grpc_transport_stream_op_batch* batch) {
3271   // If we've previously been cancelled, immediately fail any new batches.
3272   if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
3273     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3274       gpr_log(GPR_INFO,
3275               "chand=%p retrying_call=%p: failing batch with error: %s", chand_,
3276               this, grpc_error_string(cancel_error_));
3277     }
3278     // Note: This will release the call combiner.
3279     grpc_transport_stream_op_batch_finish_with_failure(
3280         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3281     return;
3282   }
3283   // Handle cancellation.
3284   if (GPR_UNLIKELY(batch->cancel_stream)) {
3285     // Stash a copy of cancel_error in our call data, so that we can use
3286     // it for subsequent operations.  This ensures that if the call is
3287     // cancelled before any batches are passed down (e.g., if the deadline
3288     // is in the past when the call starts), we can return the right
3289     // error to the caller when the first batch does get passed down.
3290     GRPC_ERROR_UNREF(cancel_error_);
3291     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3292     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3293       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: recording cancel_error=%s",
3294               chand_, this, grpc_error_string(cancel_error_));
3295     }
3296     // If we do not have an LB call (i.e., a pick has not yet been started),
3297     // fail all pending batches.  Otherwise, send the cancellation down to the
3298     // LB call.
3299     if (lb_call_ == nullptr) {
3300       // TODO(roth): If there is a pending retry callback, do we need to
3301       // cancel it here?
3302       PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
3303       // Note: This will release the call combiner.
3304       grpc_transport_stream_op_batch_finish_with_failure(
3305           batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3306     } else {
3307       // Note: This will release the call combiner.
3308       lb_call_->StartTransportStreamOpBatch(batch);
3309     }
3310     return;
3311   }
3312   // Add the batch to the pending list.
3313   PendingBatchesAdd(batch);
3314   // Create LB call if needed.
3315   // TODO(roth): If we get a new batch from the surface after the
3316   // initial retry attempt has failed, while the retry timer is pending,
3317   // we should queue the batch and not try to send it immediately.
3318   if (lb_call_ == nullptr) {
3319     // We do not yet have an LB call, so create one.
3320     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3321       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: creating LB call", chand_,
3322               this);
3323     }
3324     CreateLbCall(this, GRPC_ERROR_NONE);
3325     return;
3326   }
3327   // Send batches to LB call.
3328   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3329     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: starting batch on lb_call=%p",
3330             chand_, this, lb_call_.get());
3331   }
3332   PendingBatchesResume();
3333 }
3334 
subchannel_call() const3335 RefCountedPtr<SubchannelCall> RetryingCall::subchannel_call() const {
3336   if (lb_call_ == nullptr) return nullptr;
3337   return lb_call_->subchannel_call();
3338 }
3339 
3340 //
3341 // send op data caching
3342 //
3343 
MaybeCacheSendOpsForBatch(PendingBatch * pending)3344 void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
3345   if (pending->send_ops_cached) return;
3346   pending->send_ops_cached = true;
3347   grpc_transport_stream_op_batch* batch = pending->batch;
3348   // Save a copy of metadata for send_initial_metadata ops.
3349   if (batch->send_initial_metadata) {
3350     seen_send_initial_metadata_ = true;
3351     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
3352     grpc_metadata_batch* send_initial_metadata =
3353         batch->payload->send_initial_metadata.send_initial_metadata;
3354     send_initial_metadata_storage_ =
3355         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3356             sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
3357     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
3358                              send_initial_metadata_storage_);
3359     send_initial_metadata_flags_ =
3360         batch->payload->send_initial_metadata.send_initial_metadata_flags;
3361     peer_string_ = batch->payload->send_initial_metadata.peer_string;
3362   }
3363   // Set up cache for send_message ops.
3364   if (batch->send_message) {
3365     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
3366         std::move(batch->payload->send_message.send_message));
3367     send_messages_.push_back(cache);
3368   }
3369   // Save metadata batch for send_trailing_metadata ops.
3370   if (batch->send_trailing_metadata) {
3371     seen_send_trailing_metadata_ = true;
3372     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
3373     grpc_metadata_batch* send_trailing_metadata =
3374         batch->payload->send_trailing_metadata.send_trailing_metadata;
3375     send_trailing_metadata_storage_ =
3376         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3377             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
3378     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
3379                              send_trailing_metadata_storage_);
3380   }
3381 }
3382 
FreeCachedSendInitialMetadata()3383 void RetryingCall::FreeCachedSendInitialMetadata() {
3384   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3385     gpr_log(GPR_INFO,
3386             "chand=%p retrying_call=%p: destroying send_initial_metadata",
3387             chand_, this);
3388   }
3389   grpc_metadata_batch_destroy(&send_initial_metadata_);
3390 }
3391 
FreeCachedSendMessage(size_t idx)3392 void RetryingCall::FreeCachedSendMessage(size_t idx) {
3393   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3394     gpr_log(GPR_INFO,
3395             "chand=%p retrying_call=%p: destroying send_messages[%" PRIuPTR "]",
3396             chand_, this, idx);
3397   }
3398   send_messages_[idx]->Destroy();
3399 }
3400 
FreeCachedSendTrailingMetadata()3401 void RetryingCall::FreeCachedSendTrailingMetadata() {
3402   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3403     gpr_log(GPR_INFO,
3404             "chand_=%p retrying_call=%p: destroying send_trailing_metadata",
3405             chand_, this);
3406   }
3407   grpc_metadata_batch_destroy(&send_trailing_metadata_);
3408 }
3409 
FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState * retry_state)3410 void RetryingCall::FreeCachedSendOpDataAfterCommit(
3411     SubchannelCallRetryState* retry_state) {
3412   if (retry_state->completed_send_initial_metadata) {
3413     FreeCachedSendInitialMetadata();
3414   }
3415   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
3416     FreeCachedSendMessage(i);
3417   }
3418   if (retry_state->completed_send_trailing_metadata) {
3419     FreeCachedSendTrailingMetadata();
3420   }
3421 }
3422 
FreeCachedSendOpDataForCompletedBatch(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)3423 void RetryingCall::FreeCachedSendOpDataForCompletedBatch(
3424     SubchannelCallBatchData* batch_data,
3425     SubchannelCallRetryState* retry_state) {
3426   if (batch_data->batch.send_initial_metadata) {
3427     FreeCachedSendInitialMetadata();
3428   }
3429   if (batch_data->batch.send_message) {
3430     FreeCachedSendMessage(retry_state->completed_send_message_count - 1);
3431   }
3432   if (batch_data->batch.send_trailing_metadata) {
3433     FreeCachedSendTrailingMetadata();
3434   }
3435 }
3436 
3437 //
3438 // pending_batches management
3439 //
3440 
GetBatchIndex(grpc_transport_stream_op_batch * batch)3441 size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
3442   // Note: It is important the send_initial_metadata be the first entry
3443   // here, since the code in pick_subchannel_locked() assumes it will be.
3444   if (batch->send_initial_metadata) return 0;
3445   if (batch->send_message) return 1;
3446   if (batch->send_trailing_metadata) return 2;
3447   if (batch->recv_initial_metadata) return 3;
3448   if (batch->recv_message) return 4;
3449   if (batch->recv_trailing_metadata) return 5;
3450   GPR_UNREACHABLE_CODE(return (size_t)-1);
3451 }
3452 
3453 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3454 void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) {
3455   const size_t idx = GetBatchIndex(batch);
3456   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3457     gpr_log(
3458         GPR_INFO,
3459         "chand_=%p retrying_call=%p: adding pending batch at index %" PRIuPTR,
3460         chand_, this, idx);
3461   }
3462   PendingBatch* pending = &pending_batches_[idx];
3463   GPR_ASSERT(pending->batch == nullptr);
3464   pending->batch = batch;
3465   pending->send_ops_cached = false;
3466   if (enable_retries_) {
3467     // Update state in calld about pending batches.
3468     // Also check if the batch takes us over the retry buffer limit.
3469     // Note: We don't check the size of trailing metadata here, because
3470     // gRPC clients do not send trailing metadata.
3471     if (batch->send_initial_metadata) {
3472       pending_send_initial_metadata_ = true;
3473       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
3474           batch->payload->send_initial_metadata.send_initial_metadata);
3475     }
3476     if (batch->send_message) {
3477       pending_send_message_ = true;
3478       bytes_buffered_for_retry_ +=
3479           batch->payload->send_message.send_message->length();
3480     }
3481     if (batch->send_trailing_metadata) {
3482       pending_send_trailing_metadata_ = true;
3483     }
3484     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
3485                      chand_->per_rpc_retry_buffer_size())) {
3486       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3487         gpr_log(GPR_INFO,
3488                 "chand=%p retrying_call=%p: exceeded retry buffer size, "
3489                 "committing",
3490                 chand_, this);
3491       }
3492       SubchannelCallRetryState* retry_state =
3493           lb_call_ == nullptr ? nullptr
3494                               : static_cast<SubchannelCallRetryState*>(
3495                                     lb_call_->GetParentData());
3496       RetryCommit(retry_state);
3497       // If we are not going to retry and have not yet started, pretend
3498       // retries are disabled so that we don't bother with retry overhead.
3499       if (num_attempts_completed_ == 0) {
3500         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3501           gpr_log(GPR_INFO,
3502                   "chand=%p retrying_call=%p: disabling retries before first "
3503                   "attempt",
3504                   chand_, this);
3505         }
3506         // TODO(roth): Treat this as a commit?
3507         enable_retries_ = false;
3508       }
3509     }
3510   }
3511 }
3512 
PendingBatchClear(PendingBatch * pending)3513 void RetryingCall::PendingBatchClear(PendingBatch* pending) {
3514   if (enable_retries_) {
3515     if (pending->batch->send_initial_metadata) {
3516       pending_send_initial_metadata_ = false;
3517     }
3518     if (pending->batch->send_message) {
3519       pending_send_message_ = false;
3520     }
3521     if (pending->batch->send_trailing_metadata) {
3522       pending_send_trailing_metadata_ = false;
3523     }
3524   }
3525   pending->batch = nullptr;
3526 }
3527 
MaybeClearPendingBatch(PendingBatch * pending)3528 void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) {
3529   grpc_transport_stream_op_batch* batch = pending->batch;
3530   // We clear the pending batch if all of its callbacks have been
3531   // scheduled and reset to nullptr.
3532   if (batch->on_complete == nullptr &&
3533       (!batch->recv_initial_metadata ||
3534        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
3535            nullptr) &&
3536       (!batch->recv_message ||
3537        batch->payload->recv_message.recv_message_ready == nullptr) &&
3538       (!batch->recv_trailing_metadata ||
3539        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
3540            nullptr)) {
3541     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3542       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: clearing pending batch",
3543               chand_, this);
3544     }
3545     PendingBatchClear(pending);
3546   }
3547 }
3548 
3549 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)3550 void RetryingCall::FailPendingBatchInCallCombiner(void* arg,
3551                                                   grpc_error* error) {
3552   grpc_transport_stream_op_batch* batch =
3553       static_cast<grpc_transport_stream_op_batch*>(arg);
3554   RetryingCall* call =
3555       static_cast<RetryingCall*>(batch->handler_private.extra_arg);
3556   // Note: This will release the call combiner.
3557   grpc_transport_stream_op_batch_finish_with_failure(
3558       batch, GRPC_ERROR_REF(error), call->call_combiner_);
3559 }
3560 
3561 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)3562 void RetryingCall::PendingBatchesFail(
3563     grpc_error* error,
3564     YieldCallCombinerPredicate yield_call_combiner_predicate) {
3565   GPR_ASSERT(error != GRPC_ERROR_NONE);
3566   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3567     size_t num_batches = 0;
3568     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3569       if (pending_batches_[i].batch != nullptr) ++num_batches;
3570     }
3571     gpr_log(GPR_INFO,
3572             "chand=%p retrying_call=%p: failing %" PRIuPTR
3573             " pending batches: %s",
3574             chand_, this, num_batches, grpc_error_string(error));
3575   }
3576   CallCombinerClosureList closures;
3577   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3578     PendingBatch* pending = &pending_batches_[i];
3579     grpc_transport_stream_op_batch* batch = pending->batch;
3580     if (batch != nullptr) {
3581       batch->handler_private.extra_arg = this;
3582       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3583                         FailPendingBatchInCallCombiner, batch,
3584                         grpc_schedule_on_exec_ctx);
3585       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
3586                    "PendingBatchesFail");
3587       PendingBatchClear(pending);
3588     }
3589   }
3590   if (yield_call_combiner_predicate(closures)) {
3591     closures.RunClosures(call_combiner_);
3592   } else {
3593     closures.RunClosuresWithoutYielding(call_combiner_);
3594   }
3595   GRPC_ERROR_UNREF(error);
3596 }
3597 
3598 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)3599 void RetryingCall::ResumePendingBatchInCallCombiner(void* arg,
3600                                                     grpc_error* /*ignored*/) {
3601   grpc_transport_stream_op_batch* batch =
3602       static_cast<grpc_transport_stream_op_batch*>(arg);
3603   auto* lb_call =
3604       static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
3605   // Note: This will release the call combiner.
3606   lb_call->StartTransportStreamOpBatch(batch);
3607 }
3608 
3609 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3610 void RetryingCall::PendingBatchesResume() {
3611   if (enable_retries_) {
3612     StartRetriableSubchannelBatches(this, GRPC_ERROR_NONE);
3613     return;
3614   }
3615   // Retries not enabled; send down batches as-is.
3616   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3617     size_t num_batches = 0;
3618     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3619       if (pending_batches_[i].batch != nullptr) ++num_batches;
3620     }
3621     gpr_log(GPR_INFO,
3622             "chand=%p retrying_call=%p: starting %" PRIuPTR
3623             " pending batches on lb_call=%p",
3624             chand_, this, num_batches, lb_call_.get());
3625   }
3626   CallCombinerClosureList closures;
3627   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3628     PendingBatch* pending = &pending_batches_[i];
3629     grpc_transport_stream_op_batch* batch = pending->batch;
3630     if (batch != nullptr) {
3631       batch->handler_private.extra_arg = lb_call_.get();
3632       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3633                         ResumePendingBatchInCallCombiner, batch, nullptr);
3634       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3635                    "PendingBatchesResume");
3636       PendingBatchClear(pending);
3637     }
3638   }
3639   // Note: This will release the call combiner.
3640   closures.RunClosures(call_combiner_);
3641 }
3642 
3643 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)3644 RetryingCall::PendingBatch* RetryingCall::PendingBatchFind(
3645     const char* log_message, Predicate predicate) {
3646   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3647     PendingBatch* pending = &pending_batches_[i];
3648     grpc_transport_stream_op_batch* batch = pending->batch;
3649     if (batch != nullptr && predicate(batch)) {
3650       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3651         gpr_log(
3652             GPR_INFO,
3653             "chand=%p retrying_call=%p: %s pending batch at index %" PRIuPTR,
3654             chand_, this, log_message, i);
3655       }
3656       return pending;
3657     }
3658   }
3659   return nullptr;
3660 }
3661 
3662 //
3663 // retry code
3664 //
3665 
RetryCommit(SubchannelCallRetryState * retry_state)3666 void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) {
3667   if (retry_committed_) return;
3668   retry_committed_ = true;
3669   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3670     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: committing retries", chand_,
3671             this);
3672   }
3673   if (retry_state != nullptr) {
3674     FreeCachedSendOpDataAfterCommit(retry_state);
3675   }
3676 }
3677 
DoRetry(SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)3678 void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state,
3679                            grpc_millis server_pushback_ms) {
3680   GPR_ASSERT(retry_policy_ != nullptr);
3681   // Reset LB call.
3682   lb_call_.reset();
3683   // Compute backoff delay.
3684   grpc_millis next_attempt_time;
3685   if (server_pushback_ms >= 0) {
3686     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
3687     last_attempt_got_server_pushback_ = true;
3688   } else {
3689     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
3690       last_attempt_got_server_pushback_ = false;
3691     }
3692     next_attempt_time = retry_backoff_.NextAttemptTime();
3693   }
3694   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3695     gpr_log(GPR_INFO,
3696             "chand=%p retrying_call=%p: retrying failed call in %" PRId64 " ms",
3697             chand_, this, next_attempt_time - ExecCtx::Get()->Now());
3698   }
3699   // Schedule retry after computed delay.
3700   GRPC_CLOSURE_INIT(&retry_closure_, CreateLbCall, this, nullptr);
3701   grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
3702   // Update bookkeeping.
3703   if (retry_state != nullptr) retry_state->retry_dispatched = true;
3704 }
3705 
MaybeRetry(SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)3706 bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data,
3707                               grpc_status_code status,
3708                               grpc_mdelem* server_pushback_md) {
3709   // Get retry policy.
3710   if (retry_policy_ == nullptr) return false;
3711   // If we've already dispatched a retry from this call, return true.
3712   // This catches the case where the batch has multiple callbacks
3713   // (i.e., it includes either recv_message or recv_initial_metadata).
3714   SubchannelCallRetryState* retry_state = nullptr;
3715   if (batch_data != nullptr) {
3716     retry_state = static_cast<SubchannelCallRetryState*>(
3717         batch_data->lb_call->GetParentData());
3718     if (retry_state->retry_dispatched) {
3719       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3720         gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retry already dispatched",
3721                 chand_, this);
3722       }
3723       return true;
3724     }
3725   }
3726   // Check status.
3727   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
3728     if (retry_throttle_data_ != nullptr) {
3729       retry_throttle_data_->RecordSuccess();
3730     }
3731     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3732       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call succeeded", chand_,
3733               this);
3734     }
3735     return false;
3736   }
3737   // Status is not OK.  Check whether the status is retryable.
3738   if (!retry_policy_->retryable_status_codes.Contains(status)) {
3739     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3740       gpr_log(
3741           GPR_INFO,
3742           "chand=%p retrying_call=%p: status %s not configured as retryable",
3743           chand_, this, grpc_status_code_to_string(status));
3744     }
3745     return false;
3746   }
3747   // Record the failure and check whether retries are throttled.
3748   // Note that it's important for this check to come after the status
3749   // code check above, since we should only record failures whose statuses
3750   // match the configured retryable status codes, so that we don't count
3751   // things like failures due to malformed requests (INVALID_ARGUMENT).
3752   // Conversely, it's important for this to come before the remaining
3753   // checks, so that we don't fail to record failures due to other factors.
3754   if (retry_throttle_data_ != nullptr &&
3755       !retry_throttle_data_->RecordFailure()) {
3756     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3757       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries throttled", chand_,
3758               this);
3759     }
3760     return false;
3761   }
3762   // Check whether the call is committed.
3763   if (retry_committed_) {
3764     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3765       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries already committed",
3766               chand_, this);
3767     }
3768     return false;
3769   }
3770   // Check whether we have retries remaining.
3771   ++num_attempts_completed_;
3772   if (num_attempts_completed_ >= retry_policy_->max_attempts) {
3773     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3774       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: exceeded %d retry attempts",
3775               chand_, this, retry_policy_->max_attempts);
3776     }
3777     return false;
3778   }
3779   // If the call was cancelled from the surface, don't retry.
3780   if (cancel_error_ != GRPC_ERROR_NONE) {
3781     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3782       gpr_log(GPR_INFO,
3783               "chand=%p retrying_call=%p: call cancelled from surface, not "
3784               "retrying",
3785               chand_, this);
3786     }
3787     return false;
3788   }
3789   // Check server push-back.
3790   grpc_millis server_pushback_ms = -1;
3791   if (server_pushback_md != nullptr) {
3792     // If the value is "-1" or any other unparseable string, we do not retry.
3793     uint32_t ms;
3794     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
3795       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3796         gpr_log(
3797             GPR_INFO,
3798             "chand=%p retrying_call=%p: not retrying due to server push-back",
3799             chand_, this);
3800       }
3801       return false;
3802     } else {
3803       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3804         gpr_log(GPR_INFO,
3805                 "chand=%p retrying_call=%p: server push-back: retry in %u ms",
3806                 chand_, this, ms);
3807       }
3808       server_pushback_ms = static_cast<grpc_millis>(ms);
3809     }
3810   }
3811   DoRetry(retry_state, server_pushback_ms);
3812   return true;
3813 }
3814 
3815 //
3816 // RetryingCall::SubchannelCallBatchData
3817 //
3818 
3819 RetryingCall::SubchannelCallBatchData*
Create(RetryingCall * call,int refcount,bool set_on_complete)3820 RetryingCall::SubchannelCallBatchData::Create(RetryingCall* call, int refcount,
3821                                               bool set_on_complete) {
3822   return call->arena_->New<SubchannelCallBatchData>(call, refcount,
3823                                                     set_on_complete);
3824 }
3825 
SubchannelCallBatchData(RetryingCall * call,int refcount,bool set_on_complete)3826 RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData(
3827     RetryingCall* call, int refcount, bool set_on_complete)
3828     : call(call), lb_call(call->lb_call_) {
3829   SubchannelCallRetryState* retry_state =
3830       static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3831   batch.payload = &retry_state->batch_payload;
3832   gpr_ref_init(&refs, refcount);
3833   if (set_on_complete) {
3834     GRPC_CLOSURE_INIT(&on_complete, RetryingCall::OnComplete, this,
3835                       grpc_schedule_on_exec_ctx);
3836     batch.on_complete = &on_complete;
3837   }
3838   GRPC_CALL_STACK_REF(call->owning_call_, "batch_data");
3839 }
3840 
Destroy()3841 void RetryingCall::SubchannelCallBatchData::Destroy() {
3842   SubchannelCallRetryState* retry_state =
3843       static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3844   if (batch.send_initial_metadata) {
3845     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
3846   }
3847   if (batch.send_trailing_metadata) {
3848     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
3849   }
3850   if (batch.recv_initial_metadata) {
3851     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
3852   }
3853   if (batch.recv_trailing_metadata) {
3854     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
3855   }
3856   lb_call.reset();
3857   GRPC_CALL_STACK_UNREF(call->owning_call_, "batch_data");
3858 }
3859 
3860 //
3861 // recv_initial_metadata callback handling
3862 //
3863 
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)3864 void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg,
3865                                                      grpc_error* error) {
3866   SubchannelCallBatchData* batch_data =
3867       static_cast<SubchannelCallBatchData*>(arg);
3868   // Find pending batch.
3869   PendingBatch* pending = batch_data->call->PendingBatchFind(
3870       "invoking recv_initial_metadata_ready for",
3871       [](grpc_transport_stream_op_batch* batch) {
3872         return batch->recv_initial_metadata &&
3873                batch->payload->recv_initial_metadata
3874                        .recv_initial_metadata_ready != nullptr;
3875       });
3876   GPR_ASSERT(pending != nullptr);
3877   // Return metadata.
3878   SubchannelCallRetryState* retry_state =
3879       static_cast<SubchannelCallRetryState*>(
3880           batch_data->lb_call->GetParentData());
3881   grpc_metadata_batch_move(
3882       &retry_state->recv_initial_metadata,
3883       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
3884   // Update bookkeeping.
3885   // Note: Need to do this before invoking the callback, since invoking
3886   // the callback will result in yielding the call combiner.
3887   grpc_closure* recv_initial_metadata_ready =
3888       pending->batch->payload->recv_initial_metadata
3889           .recv_initial_metadata_ready;
3890   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3891       nullptr;
3892   batch_data->call->MaybeClearPendingBatch(pending);
3893   batch_data->Unref();
3894   // Invoke callback.
3895   Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
3896                GRPC_ERROR_REF(error));
3897 }
3898 
RecvInitialMetadataReady(void * arg,grpc_error * error)3899 void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) {
3900   SubchannelCallBatchData* batch_data =
3901       static_cast<SubchannelCallBatchData*>(arg);
3902   RetryingCall* call = batch_data->call;
3903   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3904     gpr_log(
3905         GPR_INFO,
3906         "chand=%p retrying_call=%p: got recv_initial_metadata_ready, error=%s",
3907         call->chand_, call, grpc_error_string(error));
3908   }
3909   SubchannelCallRetryState* retry_state =
3910       static_cast<SubchannelCallRetryState*>(
3911           batch_data->lb_call->GetParentData());
3912   retry_state->completed_recv_initial_metadata = true;
3913   // If a retry was already dispatched, then we're not going to use the
3914   // result of this recv_initial_metadata op, so do nothing.
3915   if (retry_state->retry_dispatched) {
3916     GRPC_CALL_COMBINER_STOP(
3917         call->call_combiner_,
3918         "recv_initial_metadata_ready after retry dispatched");
3919     return;
3920   }
3921   // If we got an error or a Trailers-Only response and have not yet gotten
3922   // the recv_trailing_metadata_ready callback, then defer propagating this
3923   // callback back to the surface.  We can evaluate whether to retry when
3924   // recv_trailing_metadata comes back.
3925   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
3926                     error != GRPC_ERROR_NONE) &&
3927                    !retry_state->completed_recv_trailing_metadata)) {
3928     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3929       gpr_log(
3930           GPR_INFO,
3931           "chand=%p retrying_call=%p: deferring recv_initial_metadata_ready "
3932           "(Trailers-Only)",
3933           call->chand_, call);
3934     }
3935     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
3936     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
3937     if (!retry_state->started_recv_trailing_metadata) {
3938       // recv_trailing_metadata not yet started by application; start it
3939       // ourselves to get status.
3940       call->StartInternalRecvTrailingMetadata();
3941     } else {
3942       GRPC_CALL_COMBINER_STOP(
3943           call->call_combiner_,
3944           "recv_initial_metadata_ready trailers-only or error");
3945     }
3946     return;
3947   }
3948   // Received valid initial metadata, so commit the call.
3949   call->RetryCommit(retry_state);
3950   // Invoke the callback to return the result to the surface.
3951   // Manually invoking a callback function; it does not take ownership of error.
3952   call->InvokeRecvInitialMetadataCallback(batch_data, error);
3953 }
3954 
3955 //
3956 // recv_message callback handling
3957 //
3958 
InvokeRecvMessageCallback(void * arg,grpc_error * error)3959 void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
3960   SubchannelCallBatchData* batch_data =
3961       static_cast<SubchannelCallBatchData*>(arg);
3962   RetryingCall* call = batch_data->call;
3963   // Find pending op.
3964   PendingBatch* pending = call->PendingBatchFind(
3965       "invoking recv_message_ready for",
3966       [](grpc_transport_stream_op_batch* batch) {
3967         return batch->recv_message &&
3968                batch->payload->recv_message.recv_message_ready != nullptr;
3969       });
3970   GPR_ASSERT(pending != nullptr);
3971   // Return payload.
3972   SubchannelCallRetryState* retry_state =
3973       static_cast<SubchannelCallRetryState*>(
3974           batch_data->lb_call->GetParentData());
3975   *pending->batch->payload->recv_message.recv_message =
3976       std::move(retry_state->recv_message);
3977   // Update bookkeeping.
3978   // Note: Need to do this before invoking the callback, since invoking
3979   // the callback will result in yielding the call combiner.
3980   grpc_closure* recv_message_ready =
3981       pending->batch->payload->recv_message.recv_message_ready;
3982   pending->batch->payload->recv_message.recv_message_ready = nullptr;
3983   call->MaybeClearPendingBatch(pending);
3984   batch_data->Unref();
3985   // Invoke callback.
3986   Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
3987 }
3988 
RecvMessageReady(void * arg,grpc_error * error)3989 void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) {
3990   SubchannelCallBatchData* batch_data =
3991       static_cast<SubchannelCallBatchData*>(arg);
3992   RetryingCall* call = batch_data->call;
3993   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3994     gpr_log(GPR_INFO,
3995             "chand=%p retrying_call=%p: got recv_message_ready, error=%s",
3996             call->chand_, call, grpc_error_string(error));
3997   }
3998   SubchannelCallRetryState* retry_state =
3999       static_cast<SubchannelCallRetryState*>(
4000           batch_data->lb_call->GetParentData());
4001   ++retry_state->completed_recv_message_count;
4002   // If a retry was already dispatched, then we're not going to use the
4003   // result of this recv_message op, so do nothing.
4004   if (retry_state->retry_dispatched) {
4005     GRPC_CALL_COMBINER_STOP(call->call_combiner_,
4006                             "recv_message_ready after retry dispatched");
4007     return;
4008   }
4009   // If we got an error or the payload was nullptr and we have not yet gotten
4010   // the recv_trailing_metadata_ready callback, then defer propagating this
4011   // callback back to the surface.  We can evaluate whether to retry when
4012   // recv_trailing_metadata comes back.
4013   if (GPR_UNLIKELY(
4014           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
4015           !retry_state->completed_recv_trailing_metadata)) {
4016     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4017       gpr_log(
4018           GPR_INFO,
4019           "chand=%p retrying_call=%p: deferring recv_message_ready (nullptr "
4020           "message and recv_trailing_metadata pending)",
4021           call->chand_, call);
4022     }
4023     retry_state->recv_message_ready_deferred_batch = batch_data;
4024     retry_state->recv_message_error = GRPC_ERROR_REF(error);
4025     if (!retry_state->started_recv_trailing_metadata) {
4026       // recv_trailing_metadata not yet started by application; start it
4027       // ourselves to get status.
4028       call->StartInternalRecvTrailingMetadata();
4029     } else {
4030       GRPC_CALL_COMBINER_STOP(call->call_combiner_, "recv_message_ready null");
4031     }
4032     return;
4033   }
4034   // Received a valid message, so commit the call.
4035   call->RetryCommit(retry_state);
4036   // Invoke the callback to return the result to the surface.
4037   // Manually invoking a callback function; it does not take ownership of error.
4038   call->InvokeRecvMessageCallback(batch_data, error);
4039 }
4040 
4041 //
4042 // recv_trailing_metadata handling
4043 //
4044 
GetCallStatus(grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)4045 void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch,
4046                                  grpc_error* error, grpc_status_code* status,
4047                                  grpc_mdelem** server_pushback_md) {
4048   if (error != GRPC_ERROR_NONE) {
4049     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
4050   } else {
4051     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
4052     *status =
4053         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
4054     if (server_pushback_md != nullptr &&
4055         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
4056       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
4057     }
4058   }
4059   GRPC_ERROR_UNREF(error);
4060 }
4061 
AddClosureForRecvTrailingMetadataReady(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4062 void RetryingCall::AddClosureForRecvTrailingMetadataReady(
4063     SubchannelCallBatchData* batch_data, grpc_error* error,
4064     CallCombinerClosureList* closures) {
4065   // Find pending batch.
4066   PendingBatch* pending = PendingBatchFind(
4067       "invoking recv_trailing_metadata for",
4068       [](grpc_transport_stream_op_batch* batch) {
4069         return batch->recv_trailing_metadata &&
4070                batch->payload->recv_trailing_metadata
4071                        .recv_trailing_metadata_ready != nullptr;
4072       });
4073   // If we generated the recv_trailing_metadata op internally via
4074   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
4075   if (pending == nullptr) {
4076     GRPC_ERROR_UNREF(error);
4077     return;
4078   }
4079   // Return metadata.
4080   SubchannelCallRetryState* retry_state =
4081       static_cast<SubchannelCallRetryState*>(
4082           batch_data->lb_call->GetParentData());
4083   grpc_metadata_batch_move(
4084       &retry_state->recv_trailing_metadata,
4085       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
4086   // Add closure.
4087   closures->Add(pending->batch->payload->recv_trailing_metadata
4088                     .recv_trailing_metadata_ready,
4089                 error, "recv_trailing_metadata_ready for pending batch");
4090   // Update bookkeeping.
4091   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
4092       nullptr;
4093   MaybeClearPendingBatch(pending);
4094 }
4095 
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4096 void RetryingCall::AddClosuresForDeferredRecvCallbacks(
4097     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4098     CallCombinerClosureList* closures) {
4099   if (batch_data->batch.recv_trailing_metadata) {
4100     // Add closure for deferred recv_initial_metadata_ready.
4101     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
4102                      nullptr)) {
4103       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4104                         InvokeRecvInitialMetadataCallback,
4105                         retry_state->recv_initial_metadata_ready_deferred_batch,
4106                         grpc_schedule_on_exec_ctx);
4107       closures->Add(&retry_state->recv_initial_metadata_ready,
4108                     retry_state->recv_initial_metadata_error,
4109                     "resuming recv_initial_metadata_ready");
4110       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
4111     }
4112     // Add closure for deferred recv_message_ready.
4113     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
4114                      nullptr)) {
4115       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
4116                         InvokeRecvMessageCallback,
4117                         retry_state->recv_message_ready_deferred_batch,
4118                         grpc_schedule_on_exec_ctx);
4119       closures->Add(&retry_state->recv_message_ready,
4120                     retry_state->recv_message_error,
4121                     "resuming recv_message_ready");
4122       retry_state->recv_message_ready_deferred_batch = nullptr;
4123     }
4124   }
4125 }
4126 
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)4127 bool RetryingCall::PendingBatchIsUnstarted(
4128     PendingBatch* pending, SubchannelCallRetryState* retry_state) {
4129   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
4130     return false;
4131   }
4132   if (pending->batch->send_initial_metadata &&
4133       !retry_state->started_send_initial_metadata) {
4134     return true;
4135   }
4136   if (pending->batch->send_message &&
4137       retry_state->started_send_message_count < send_messages_.size()) {
4138     return true;
4139   }
4140   if (pending->batch->send_trailing_metadata &&
4141       !retry_state->started_send_trailing_metadata) {
4142     return true;
4143   }
4144   return false;
4145 }
4146 
AddClosuresToFailUnstartedPendingBatches(SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)4147 void RetryingCall::AddClosuresToFailUnstartedPendingBatches(
4148     SubchannelCallRetryState* retry_state, grpc_error* error,
4149     CallCombinerClosureList* closures) {
4150   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4151     PendingBatch* pending = &pending_batches_[i];
4152     if (PendingBatchIsUnstarted(pending, retry_state)) {
4153       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4154         gpr_log(GPR_INFO,
4155                 "chand=%p retrying_call=%p: failing unstarted pending batch at "
4156                 "index "
4157                 "%" PRIuPTR,
4158                 chand_, this, i);
4159       }
4160       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
4161                     "failing on_complete for pending batch");
4162       pending->batch->on_complete = nullptr;
4163       MaybeClearPendingBatch(pending);
4164     }
4165   }
4166   GRPC_ERROR_UNREF(error);
4167 }
4168 
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)4169 void RetryingCall::RunClosuresForCompletedCall(
4170     SubchannelCallBatchData* batch_data, grpc_error* error) {
4171   SubchannelCallRetryState* retry_state =
4172       static_cast<SubchannelCallRetryState*>(
4173           batch_data->lb_call->GetParentData());
4174   // Construct list of closures to execute.
4175   CallCombinerClosureList closures;
4176   // First, add closure for recv_trailing_metadata_ready.
4177   AddClosureForRecvTrailingMetadataReady(batch_data, GRPC_ERROR_REF(error),
4178                                          &closures);
4179   // If there are deferred recv_initial_metadata_ready or recv_message_ready
4180   // callbacks, add them to closures.
4181   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
4182   // Add closures to fail any pending batches that have not yet been started.
4183   AddClosuresToFailUnstartedPendingBatches(retry_state, GRPC_ERROR_REF(error),
4184                                            &closures);
4185   // Don't need batch_data anymore.
4186   batch_data->Unref();
4187   // Schedule all of the closures identified above.
4188   // Note: This will release the call combiner.
4189   closures.RunClosures(call_combiner_);
4190   GRPC_ERROR_UNREF(error);
4191 }
4192 
RecvTrailingMetadataReady(void * arg,grpc_error * error)4193 void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
4194   SubchannelCallBatchData* batch_data =
4195       static_cast<SubchannelCallBatchData*>(arg);
4196   RetryingCall* call = batch_data->call;
4197   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4198     gpr_log(
4199         GPR_INFO,
4200         "chand=%p retrying_call=%p: got recv_trailing_metadata_ready, error=%s",
4201         call->chand_, call, grpc_error_string(error));
4202   }
4203   SubchannelCallRetryState* retry_state =
4204       static_cast<SubchannelCallRetryState*>(
4205           batch_data->lb_call->GetParentData());
4206   retry_state->completed_recv_trailing_metadata = true;
4207   // Get the call's status and check for server pushback metadata.
4208   grpc_status_code status = GRPC_STATUS_OK;
4209   grpc_mdelem* server_pushback_md = nullptr;
4210   grpc_metadata_batch* md_batch =
4211       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
4212   call->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
4213                       &server_pushback_md);
4214   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4215     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call finished, status=%s",
4216             call->chand_, call, grpc_status_code_to_string(status));
4217   }
4218   // Check if we should retry.
4219   if (call->MaybeRetry(batch_data, status, server_pushback_md)) {
4220     // Unref batch_data for deferred recv_initial_metadata_ready or
4221     // recv_message_ready callbacks, if any.
4222     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
4223       batch_data->Unref();
4224       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
4225     }
4226     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
4227       batch_data->Unref();
4228       GRPC_ERROR_UNREF(retry_state->recv_message_error);
4229     }
4230     batch_data->Unref();
4231     return;
4232   }
4233   // Not retrying, so commit the call.
4234   call->RetryCommit(retry_state);
4235   // Run any necessary closures.
4236   call->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
4237 }
4238 
4239 //
4240 // on_complete callback handling
4241 //
4242 
AddClosuresForCompletedPendingBatch(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4243 void RetryingCall::AddClosuresForCompletedPendingBatch(
4244     SubchannelCallBatchData* batch_data, grpc_error* error,
4245     CallCombinerClosureList* closures) {
4246   PendingBatch* pending = PendingBatchFind(
4247       "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
4248         // Match the pending batch with the same set of send ops as the
4249         // subchannel batch we've just completed.
4250         return batch->on_complete != nullptr &&
4251                batch_data->batch.send_initial_metadata ==
4252                    batch->send_initial_metadata &&
4253                batch_data->batch.send_message == batch->send_message &&
4254                batch_data->batch.send_trailing_metadata ==
4255                    batch->send_trailing_metadata;
4256       });
4257   // If batch_data is a replay batch, then there will be no pending
4258   // batch to complete.
4259   if (pending == nullptr) {
4260     GRPC_ERROR_UNREF(error);
4261     return;
4262   }
4263   // Add closure.
4264   closures->Add(pending->batch->on_complete, error,
4265                 "on_complete for pending batch");
4266   pending->batch->on_complete = nullptr;
4267   MaybeClearPendingBatch(pending);
4268 }
4269 
AddClosuresForReplayOrPendingSendOps(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4270 void RetryingCall::AddClosuresForReplayOrPendingSendOps(
4271     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4272     CallCombinerClosureList* closures) {
4273   bool have_pending_send_message_ops =
4274       retry_state->started_send_message_count < send_messages_.size();
4275   bool have_pending_send_trailing_metadata_op =
4276       seen_send_trailing_metadata_ &&
4277       !retry_state->started_send_trailing_metadata;
4278   if (!have_pending_send_message_ops &&
4279       !have_pending_send_trailing_metadata_op) {
4280     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4281       PendingBatch* pending = &pending_batches_[i];
4282       grpc_transport_stream_op_batch* batch = pending->batch;
4283       if (batch == nullptr || pending->send_ops_cached) continue;
4284       if (batch->send_message) have_pending_send_message_ops = true;
4285       if (batch->send_trailing_metadata) {
4286         have_pending_send_trailing_metadata_op = true;
4287       }
4288     }
4289   }
4290   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
4291     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4292       gpr_log(GPR_INFO,
4293               "chand=%p retrying_call=%p: starting next batch for pending send "
4294               "op(s)",
4295               chand_, this);
4296     }
4297     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
4298                       StartRetriableSubchannelBatches, this,
4299                       grpc_schedule_on_exec_ctx);
4300     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
4301                   "starting next batch for send_* op(s)");
4302   }
4303 }
4304 
OnComplete(void * arg,grpc_error * error)4305 void RetryingCall::OnComplete(void* arg, grpc_error* error) {
4306   SubchannelCallBatchData* batch_data =
4307       static_cast<SubchannelCallBatchData*>(arg);
4308   RetryingCall* call = batch_data->call;
4309   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4310     gpr_log(GPR_INFO,
4311             "chand=%p retrying_call=%p: got on_complete, error=%s, batch=%s",
4312             call->chand_, call, grpc_error_string(error),
4313             grpc_transport_stream_op_batch_string(&batch_data->batch).c_str());
4314   }
4315   SubchannelCallRetryState* retry_state =
4316       static_cast<SubchannelCallRetryState*>(
4317           batch_data->lb_call->GetParentData());
4318   // Update bookkeeping in retry_state.
4319   if (batch_data->batch.send_initial_metadata) {
4320     retry_state->completed_send_initial_metadata = true;
4321   }
4322   if (batch_data->batch.send_message) {
4323     ++retry_state->completed_send_message_count;
4324   }
4325   if (batch_data->batch.send_trailing_metadata) {
4326     retry_state->completed_send_trailing_metadata = true;
4327   }
4328   // If the call is committed, free cached data for send ops that we've just
4329   // completed.
4330   if (call->retry_committed_) {
4331     call->FreeCachedSendOpDataForCompletedBatch(batch_data, retry_state);
4332   }
4333   // Construct list of closures to execute.
4334   CallCombinerClosureList closures;
4335   // If a retry was already dispatched, that means we saw
4336   // recv_trailing_metadata before this, so we do nothing here.
4337   // Otherwise, invoke the callback to return the result to the surface.
4338   if (!retry_state->retry_dispatched) {
4339     // Add closure for the completed pending batch, if any.
4340     call->AddClosuresForCompletedPendingBatch(batch_data, GRPC_ERROR_REF(error),
4341                                               &closures);
4342     // If needed, add a callback to start any replay or pending send ops on
4343     // the subchannel call.
4344     if (!retry_state->completed_recv_trailing_metadata) {
4345       call->AddClosuresForReplayOrPendingSendOps(batch_data, retry_state,
4346                                                  &closures);
4347     }
4348   }
4349   // Track number of pending subchannel send batches and determine if this
4350   // was the last one.
4351   --call->num_pending_retriable_subchannel_send_batches_;
4352   const bool last_send_batch_complete =
4353       call->num_pending_retriable_subchannel_send_batches_ == 0;
4354   // Don't need batch_data anymore.
4355   batch_data->Unref();
4356   // Schedule all of the closures identified above.
4357   // Note: This yeilds the call combiner.
4358   closures.RunClosures(call->call_combiner_);
4359   // If this was the last subchannel send batch, unref the call stack.
4360   if (last_send_batch_complete) {
4361     GRPC_CALL_STACK_UNREF(call->owning_call_, "subchannel_send_batches");
4362   }
4363 }
4364 
4365 //
4366 // subchannel batch construction
4367 //
4368 
StartBatchInCallCombiner(void * arg,grpc_error *)4369 void RetryingCall::StartBatchInCallCombiner(void* arg,
4370                                             grpc_error* /*ignored*/) {
4371   grpc_transport_stream_op_batch* batch =
4372       static_cast<grpc_transport_stream_op_batch*>(arg);
4373   auto* lb_call =
4374       static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
4375   // Note: This will release the call combiner.
4376   lb_call->StartTransportStreamOpBatch(batch);
4377 }
4378 
AddClosureForSubchannelBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)4379 void RetryingCall::AddClosureForSubchannelBatch(
4380     grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
4381   batch->handler_private.extra_arg = lb_call_.get();
4382   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
4383                     batch, grpc_schedule_on_exec_ctx);
4384   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4385     gpr_log(GPR_INFO,
4386             "chand=%p retrying_call=%p: starting subchannel batch: %s", chand_,
4387             this, grpc_transport_stream_op_batch_string(batch).c_str());
4388   }
4389   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
4390                 "start_subchannel_batch");
4391 }
4392 
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4393 void RetryingCall::AddRetriableSendInitialMetadataOp(
4394     SubchannelCallRetryState* retry_state,
4395     SubchannelCallBatchData* batch_data) {
4396   // Maps the number of retries to the corresponding metadata value slice.
4397   const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
4398                                              &GRPC_MDSTR_3, &GRPC_MDSTR_4};
4399   // We need to make a copy of the metadata batch for each attempt, since
4400   // the filters in the subchannel stack may modify this batch, and we don't
4401   // want those modifications to be passed forward to subsequent attempts.
4402   //
4403   // If we've already completed one or more attempts, add the
4404   // grpc-retry-attempts header.
4405   retry_state->send_initial_metadata_storage =
4406       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4407           sizeof(grpc_linked_mdelem) *
4408           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
4409   grpc_metadata_batch_copy(&send_initial_metadata_,
4410                            &retry_state->send_initial_metadata,
4411                            retry_state->send_initial_metadata_storage);
4412   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
4413                        .grpc_previous_rpc_attempts != nullptr)) {
4414     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
4415                                GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4416   }
4417   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
4418     grpc_mdelem retry_md = grpc_mdelem_create(
4419         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
4420         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
4421     grpc_error* error = grpc_metadata_batch_add_tail(
4422         &retry_state->send_initial_metadata,
4423         &retry_state
4424              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
4425         retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4426     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
4427       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
4428               grpc_error_string(error));
4429       GPR_ASSERT(false);
4430     }
4431   }
4432   retry_state->started_send_initial_metadata = true;
4433   batch_data->batch.send_initial_metadata = true;
4434   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
4435       &retry_state->send_initial_metadata;
4436   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
4437       send_initial_metadata_flags_;
4438   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
4439 }
4440 
AddRetriableSendMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4441 void RetryingCall::AddRetriableSendMessageOp(
4442     SubchannelCallRetryState* retry_state,
4443     SubchannelCallBatchData* batch_data) {
4444   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4445     gpr_log(GPR_INFO,
4446             "chand=%p retrying_call=%p: starting calld->send_messages[%" PRIuPTR
4447             "]",
4448             chand_, this, retry_state->started_send_message_count);
4449   }
4450   ByteStreamCache* cache =
4451       send_messages_[retry_state->started_send_message_count];
4452   ++retry_state->started_send_message_count;
4453   retry_state->send_message.Init(cache);
4454   batch_data->batch.send_message = true;
4455   batch_data->batch.payload->send_message.send_message.reset(
4456       retry_state->send_message.get());
4457 }
4458 
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4459 void RetryingCall::AddRetriableSendTrailingMetadataOp(
4460     SubchannelCallRetryState* retry_state,
4461     SubchannelCallBatchData* batch_data) {
4462   // We need to make a copy of the metadata batch for each attempt, since
4463   // the filters in the subchannel stack may modify this batch, and we don't
4464   // want those modifications to be passed forward to subsequent attempts.
4465   retry_state->send_trailing_metadata_storage =
4466       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4467           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
4468   grpc_metadata_batch_copy(&send_trailing_metadata_,
4469                            &retry_state->send_trailing_metadata,
4470                            retry_state->send_trailing_metadata_storage);
4471   retry_state->started_send_trailing_metadata = true;
4472   batch_data->batch.send_trailing_metadata = true;
4473   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
4474       &retry_state->send_trailing_metadata;
4475 }
4476 
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4477 void RetryingCall::AddRetriableRecvInitialMetadataOp(
4478     SubchannelCallRetryState* retry_state,
4479     SubchannelCallBatchData* batch_data) {
4480   retry_state->started_recv_initial_metadata = true;
4481   batch_data->batch.recv_initial_metadata = true;
4482   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
4483   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
4484       &retry_state->recv_initial_metadata;
4485   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
4486       &retry_state->trailing_metadata_available;
4487   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4488                     RecvInitialMetadataReady, batch_data,
4489                     grpc_schedule_on_exec_ctx);
4490   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
4491       &retry_state->recv_initial_metadata_ready;
4492 }
4493 
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4494 void RetryingCall::AddRetriableRecvMessageOp(
4495     SubchannelCallRetryState* retry_state,
4496     SubchannelCallBatchData* batch_data) {
4497   ++retry_state->started_recv_message_count;
4498   batch_data->batch.recv_message = true;
4499   batch_data->batch.payload->recv_message.recv_message =
4500       &retry_state->recv_message;
4501   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
4502                     batch_data, grpc_schedule_on_exec_ctx);
4503   batch_data->batch.payload->recv_message.recv_message_ready =
4504       &retry_state->recv_message_ready;
4505 }
4506 
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4507 void RetryingCall::AddRetriableRecvTrailingMetadataOp(
4508     SubchannelCallRetryState* retry_state,
4509     SubchannelCallBatchData* batch_data) {
4510   retry_state->started_recv_trailing_metadata = true;
4511   batch_data->batch.recv_trailing_metadata = true;
4512   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
4513   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
4514       &retry_state->recv_trailing_metadata;
4515   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
4516       &retry_state->collect_stats;
4517   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
4518                     RecvTrailingMetadataReady, batch_data,
4519                     grpc_schedule_on_exec_ctx);
4520   batch_data->batch.payload->recv_trailing_metadata
4521       .recv_trailing_metadata_ready =
4522       &retry_state->recv_trailing_metadata_ready;
4523 }
4524 
StartInternalRecvTrailingMetadata()4525 void RetryingCall::StartInternalRecvTrailingMetadata() {
4526   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4527     gpr_log(
4528         GPR_INFO,
4529         "chand=%p retrying_call=%p: call failed but recv_trailing_metadata not "
4530         "started; starting it internally",
4531         chand_, this);
4532   }
4533   SubchannelCallRetryState* retry_state =
4534       static_cast<SubchannelCallRetryState*>(lb_call_->GetParentData());
4535   // Create batch_data with 2 refs, since this batch will be unreffed twice:
4536   // once for the recv_trailing_metadata_ready callback when the subchannel
4537   // batch returns, and again when we actually get a recv_trailing_metadata
4538   // op from the surface.
4539   SubchannelCallBatchData* batch_data =
4540       SubchannelCallBatchData::Create(this, 2, false /* set_on_complete */);
4541   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4542   retry_state->recv_trailing_metadata_internal_batch = batch_data;
4543   // Note: This will release the call combiner.
4544   lb_call_->StartTransportStreamOpBatch(&batch_data->batch);
4545 }
4546 
4547 // If there are any cached send ops that need to be replayed on the
4548 // current subchannel call, creates and returns a new subchannel batch
4549 // to replay those ops.  Otherwise, returns nullptr.
4550 RetryingCall::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(SubchannelCallRetryState * retry_state)4551 RetryingCall::MaybeCreateSubchannelBatchForReplay(
4552     SubchannelCallRetryState* retry_state) {
4553   SubchannelCallBatchData* replay_batch_data = nullptr;
4554   // send_initial_metadata.
4555   if (seen_send_initial_metadata_ &&
4556       !retry_state->started_send_initial_metadata &&
4557       !pending_send_initial_metadata_) {
4558     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4559       gpr_log(GPR_INFO,
4560               "chand=%p retrying_call=%p: replaying previously completed "
4561               "send_initial_metadata op",
4562               chand_, this);
4563     }
4564     replay_batch_data =
4565         SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4566     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
4567   }
4568   // send_message.
4569   // Note that we can only have one send_message op in flight at a time.
4570   if (retry_state->started_send_message_count < send_messages_.size() &&
4571       retry_state->started_send_message_count ==
4572           retry_state->completed_send_message_count &&
4573       !pending_send_message_) {
4574     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4575       gpr_log(GPR_INFO,
4576               "chand=%p retrying_call=%p: replaying previously completed "
4577               "send_message op",
4578               chand_, this);
4579     }
4580     if (replay_batch_data == nullptr) {
4581       replay_batch_data =
4582           SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4583     }
4584     AddRetriableSendMessageOp(retry_state, replay_batch_data);
4585   }
4586   // send_trailing_metadata.
4587   // Note that we only add this op if we have no more send_message ops
4588   // to start, since we can't send down any more send_message ops after
4589   // send_trailing_metadata.
4590   if (seen_send_trailing_metadata_ &&
4591       retry_state->started_send_message_count == send_messages_.size() &&
4592       !retry_state->started_send_trailing_metadata &&
4593       !pending_send_trailing_metadata_) {
4594     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4595       gpr_log(GPR_INFO,
4596               "chand=%p retrying_call=%p: replaying previously completed "
4597               "send_trailing_metadata op",
4598               chand_, this);
4599     }
4600     if (replay_batch_data == nullptr) {
4601       replay_batch_data =
4602           SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4603     }
4604     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
4605   }
4606   return replay_batch_data;
4607 }
4608 
AddSubchannelBatchesForPendingBatches(SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4609 void RetryingCall::AddSubchannelBatchesForPendingBatches(
4610     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
4611   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4612     PendingBatch* pending = &pending_batches_[i];
4613     grpc_transport_stream_op_batch* batch = pending->batch;
4614     if (batch == nullptr) continue;
4615     // Skip any batch that either (a) has already been started on this
4616     // subchannel call or (b) we can't start yet because we're still
4617     // replaying send ops that need to be completed first.
4618     // TODO(roth): Note that if any one op in the batch can't be sent
4619     // yet due to ops that we're replaying, we don't start any of the ops
4620     // in the batch.  This is probably okay, but it could conceivably
4621     // lead to increased latency in some cases -- e.g., we could delay
4622     // starting a recv op due to it being in the same batch with a send
4623     // op.  If/when we revamp the callback protocol in
4624     // transport_stream_op_batch, we may be able to fix this.
4625     if (batch->send_initial_metadata &&
4626         retry_state->started_send_initial_metadata) {
4627       continue;
4628     }
4629     if (batch->send_message && retry_state->completed_send_message_count <
4630                                    retry_state->started_send_message_count) {
4631       continue;
4632     }
4633     // Note that we only start send_trailing_metadata if we have no more
4634     // send_message ops to start, since we can't send down any more
4635     // send_message ops after send_trailing_metadata.
4636     if (batch->send_trailing_metadata &&
4637         (retry_state->started_send_message_count + batch->send_message <
4638              send_messages_.size() ||
4639          retry_state->started_send_trailing_metadata)) {
4640       continue;
4641     }
4642     if (batch->recv_initial_metadata &&
4643         retry_state->started_recv_initial_metadata) {
4644       continue;
4645     }
4646     if (batch->recv_message && retry_state->completed_recv_message_count <
4647                                    retry_state->started_recv_message_count) {
4648       continue;
4649     }
4650     if (batch->recv_trailing_metadata &&
4651         retry_state->started_recv_trailing_metadata) {
4652       // If we previously completed a recv_trailing_metadata op
4653       // initiated by StartInternalRecvTrailingMetadata(), use the
4654       // result of that instead of trying to re-start this op.
4655       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
4656                         nullptr))) {
4657         // If the batch completed, then trigger the completion callback
4658         // directly, so that we return the previously returned results to
4659         // the application.  Otherwise, just unref the internally
4660         // started subchannel batch, since we'll propagate the
4661         // completion when it completes.
4662         if (retry_state->completed_recv_trailing_metadata) {
4663           // Batches containing recv_trailing_metadata always succeed.
4664           closures->Add(
4665               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
4666               "re-executing recv_trailing_metadata_ready to propagate "
4667               "internally triggered result");
4668         } else {
4669           retry_state->recv_trailing_metadata_internal_batch->Unref();
4670         }
4671         retry_state->recv_trailing_metadata_internal_batch = nullptr;
4672       }
4673       continue;
4674     }
4675     // If we're not retrying, just send the batch as-is.
4676     // TODO(roth): This condition doesn't seem exactly right -- maybe need a
4677     // notion of "draining" once we've committed and are done replaying?
4678     if (retry_policy_ == nullptr || retry_committed_) {
4679       AddClosureForSubchannelBatch(batch, closures);
4680       PendingBatchClear(pending);
4681       continue;
4682     }
4683     // Create batch with the right number of callbacks.
4684     const bool has_send_ops = batch->send_initial_metadata ||
4685                               batch->send_message ||
4686                               batch->send_trailing_metadata;
4687     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
4688                               batch->recv_message +
4689                               batch->recv_trailing_metadata;
4690     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
4691         this, num_callbacks, has_send_ops /* set_on_complete */);
4692     // Cache send ops if needed.
4693     MaybeCacheSendOpsForBatch(pending);
4694     // send_initial_metadata.
4695     if (batch->send_initial_metadata) {
4696       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
4697     }
4698     // send_message.
4699     if (batch->send_message) {
4700       AddRetriableSendMessageOp(retry_state, batch_data);
4701     }
4702     // send_trailing_metadata.
4703     if (batch->send_trailing_metadata) {
4704       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
4705     }
4706     // recv_initial_metadata.
4707     if (batch->recv_initial_metadata) {
4708       // recv_flags is only used on the server side.
4709       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
4710       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
4711     }
4712     // recv_message.
4713     if (batch->recv_message) {
4714       AddRetriableRecvMessageOp(retry_state, batch_data);
4715     }
4716     // recv_trailing_metadata.
4717     if (batch->recv_trailing_metadata) {
4718       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4719     }
4720     AddClosureForSubchannelBatch(&batch_data->batch, closures);
4721     // Track number of pending subchannel send batches.
4722     // If this is the first one, take a ref to the call stack.
4723     if (batch->send_initial_metadata || batch->send_message ||
4724         batch->send_trailing_metadata) {
4725       if (num_pending_retriable_subchannel_send_batches_ == 0) {
4726         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
4727       }
4728       ++num_pending_retriable_subchannel_send_batches_;
4729     }
4730   }
4731 }
4732 
StartRetriableSubchannelBatches(void * arg,grpc_error *)4733 void RetryingCall::StartRetriableSubchannelBatches(void* arg,
4734                                                    grpc_error* /*ignored*/) {
4735   RetryingCall* call = static_cast<RetryingCall*>(arg);
4736   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4737     gpr_log(GPR_INFO,
4738             "chand=%p retrying_call=%p: constructing retriable batches",
4739             call->chand_, call);
4740   }
4741   SubchannelCallRetryState* retry_state =
4742       static_cast<SubchannelCallRetryState*>(call->lb_call_->GetParentData());
4743   // Construct list of closures to execute, one for each pending batch.
4744   CallCombinerClosureList closures;
4745   // Replay previously-returned send_* ops if needed.
4746   SubchannelCallBatchData* replay_batch_data =
4747       call->MaybeCreateSubchannelBatchForReplay(retry_state);
4748   if (replay_batch_data != nullptr) {
4749     call->AddClosureForSubchannelBatch(&replay_batch_data->batch, &closures);
4750     // Track number of pending subchannel send batches.
4751     // If this is the first one, take a ref to the call stack.
4752     if (call->num_pending_retriable_subchannel_send_batches_ == 0) {
4753       GRPC_CALL_STACK_REF(call->owning_call_, "subchannel_send_batches");
4754     }
4755     ++call->num_pending_retriable_subchannel_send_batches_;
4756   }
4757   // Now add pending batches.
4758   call->AddSubchannelBatchesForPendingBatches(retry_state, &closures);
4759   // Start batches on subchannel call.
4760   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4761     gpr_log(GPR_INFO,
4762             "chand=%p retrying_call=%p: starting %" PRIuPTR
4763             " retriable batches on lb_call=%p",
4764             call->chand_, call, closures.size(), call->lb_call_.get());
4765   }
4766   // Note: This will yield the call combiner.
4767   closures.RunClosures(call->call_combiner_);
4768 }
4769 
CreateLbCall(void * arg,grpc_error *)4770 void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) {
4771   auto* call = static_cast<RetryingCall*>(arg);
4772   const size_t parent_data_size =
4773       call->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
4774   grpc_call_element_args args = {call->owning_call_,     nullptr,
4775                                  call->call_context_,    call->path_,
4776                                  call->call_start_time_, call->deadline_,
4777                                  call->arena_,           call->call_combiner_};
4778   call->lb_call_ = LoadBalancedCall::Create(call->chand_, args, call->pollent_,
4779                                             parent_data_size);
4780   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
4781     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: create lb_call=%p",
4782             call->chand_, call, call->lb_call_.get());
4783   }
4784   if (parent_data_size > 0) {
4785     new (call->lb_call_->GetParentData())
4786         SubchannelCallRetryState(call->call_context_);
4787   }
4788   call->PendingBatchesResume();
4789 }
4790 
4791 //
4792 // LoadBalancedCall::Metadata
4793 //
4794 
4795 class LoadBalancedCall::Metadata
4796     : public LoadBalancingPolicy::MetadataInterface {
4797  public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)4798   Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
4799       : lb_call_(lb_call), batch_(batch) {}
4800 
Add(absl::string_view key,absl::string_view value)4801   void Add(absl::string_view key, absl::string_view value) override {
4802     grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
4803         lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
4804     linked_mdelem->md = grpc_mdelem_from_slices(
4805         ExternallyManagedSlice(key.data(), key.size()),
4806         ExternallyManagedSlice(value.data(), value.size()));
4807     GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
4808                GRPC_ERROR_NONE);
4809   }
4810 
begin() const4811   iterator begin() const override {
4812     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4813                   "iterator size too large");
4814     return iterator(
4815         this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
4816   }
end() const4817   iterator end() const override {
4818     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4819                   "iterator size too large");
4820     return iterator(this, 0);
4821   }
4822 
erase(iterator it)4823   iterator erase(iterator it) override {
4824     grpc_linked_mdelem* linked_mdelem =
4825         reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
4826     intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
4827     grpc_metadata_batch_remove(batch_, linked_mdelem);
4828     return iterator(this, handle);
4829   }
4830 
4831  private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const4832   grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
4833     if (entry != nullptr && batch_->idx.named.path == entry) {
4834       return entry->next;
4835     }
4836     return entry;
4837   }
4838 
IteratorHandleNext(intptr_t handle) const4839   intptr_t IteratorHandleNext(intptr_t handle) const override {
4840     grpc_linked_mdelem* linked_mdelem =
4841         reinterpret_cast<grpc_linked_mdelem*>(handle);
4842     return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
4843   }
4844 
IteratorHandleGet(intptr_t handle) const4845   std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
4846       intptr_t handle) const override {
4847     grpc_linked_mdelem* linked_mdelem =
4848         reinterpret_cast<grpc_linked_mdelem*>(handle);
4849     return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
4850                           StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
4851   }
4852 
4853   LoadBalancedCall* lb_call_;
4854   grpc_metadata_batch* batch_;
4855 };
4856 
4857 //
4858 // LoadBalancedCall::LbCallState
4859 //
4860 
4861 class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState {
4862  public:
LbCallState(LoadBalancedCall * lb_call)4863   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
4864 
Alloc(size_t size)4865   void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
4866 
GetBackendMetricData()4867   const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
4868       override {
4869     if (lb_call_->backend_metric_data_ == nullptr) {
4870       grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
4871                                    .x_endpoint_load_metrics_bin;
4872       if (md != nullptr) {
4873         lb_call_->backend_metric_data_ =
4874             ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
4875       }
4876     }
4877     return lb_call_->backend_metric_data_;
4878   }
4879 
ExperimentalGetCallAttribute(const char * key)4880   absl::string_view ExperimentalGetCallAttribute(const char* key) override {
4881     auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
4882         lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
4883     auto& call_attributes = service_config_call_data->call_attributes();
4884     auto it = call_attributes.find(key);
4885     if (it == call_attributes.end()) return absl::string_view();
4886     return it->second;
4887   }
4888 
4889  private:
4890   LoadBalancedCall* lb_call_;
4891 };
4892 
4893 //
4894 // LoadBalancedCall
4895 //
4896 
Create(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,size_t parent_data_size)4897 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Create(
4898     ChannelData* chand, const grpc_call_element_args& args,
4899     grpc_polling_entity* pollent, size_t parent_data_size) {
4900   const size_t alloc_size =
4901       parent_data_size > 0
4902           ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) +
4903              parent_data_size)
4904           : sizeof(LoadBalancedCall);
4905   auto* lb_call = static_cast<LoadBalancedCall*>(args.arena->Alloc(alloc_size));
4906   new (lb_call) LoadBalancedCall(chand, args, pollent);
4907   return lb_call;
4908 }
4909 
LoadBalancedCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent)4910 LoadBalancedCall::LoadBalancedCall(ChannelData* chand,
4911                                    const grpc_call_element_args& args,
4912                                    grpc_polling_entity* pollent)
4913     : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
4914                    ? "LoadBalancedCall"
4915                    : nullptr),
4916       chand_(chand),
4917       path_(grpc_slice_ref_internal(args.path)),
4918       call_start_time_(args.start_time),
4919       deadline_(args.deadline),
4920       arena_(args.arena),
4921       owning_call_(args.call_stack),
4922       call_combiner_(args.call_combiner),
4923       call_context_(args.context),
4924       pollent_(pollent) {}
4925 
~LoadBalancedCall()4926 LoadBalancedCall::~LoadBalancedCall() {
4927   grpc_slice_unref_internal(path_);
4928   GRPC_ERROR_UNREF(cancel_error_);
4929   if (backend_metric_data_ != nullptr) {
4930     backend_metric_data_
4931         ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
4932   }
4933   // Make sure there are no remaining pending batches.
4934   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4935     GPR_ASSERT(pending_batches_[i] == nullptr);
4936   }
4937 }
4938 
Ref()4939 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref() {
4940   IncrementRefCount();
4941   return RefCountedPtr<LoadBalancedCall>(this);
4942 }
4943 
Ref(const DebugLocation & location,const char * reason)4944 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref(
4945     const DebugLocation& location, const char* reason) {
4946   IncrementRefCount(location, reason);
4947   return RefCountedPtr<LoadBalancedCall>(this);
4948 }
4949 
Unref()4950 void LoadBalancedCall::Unref() {
4951   if (GPR_UNLIKELY(refs_.Unref())) {
4952     this->~LoadBalancedCall();
4953   }
4954 }
4955 
Unref(const DebugLocation & location,const char * reason)4956 void LoadBalancedCall::Unref(const DebugLocation& location,
4957                              const char* reason) {
4958   if (GPR_UNLIKELY(refs_.Unref(location, reason))) {
4959     this->~LoadBalancedCall();
4960   }
4961 }
4962 
IncrementRefCount()4963 void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); }
4964 
IncrementRefCount(const DebugLocation & location,const char * reason)4965 void LoadBalancedCall::IncrementRefCount(const DebugLocation& location,
4966                                          const char* reason) {
4967   refs_.Ref(location, reason);
4968 }
4969 
GetParentData()4970 void* LoadBalancedCall::GetParentData() {
4971   return reinterpret_cast<char*>(this) +
4972          GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall));
4973 }
4974 
GetBatchIndex(grpc_transport_stream_op_batch * batch)4975 size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
4976   // Note: It is important the send_initial_metadata be the first entry
4977   // here, since the code in pick_subchannel_locked() assumes it will be.
4978   if (batch->send_initial_metadata) return 0;
4979   if (batch->send_message) return 1;
4980   if (batch->send_trailing_metadata) return 2;
4981   if (batch->recv_initial_metadata) return 3;
4982   if (batch->recv_message) return 4;
4983   if (batch->recv_trailing_metadata) return 5;
4984   GPR_UNREACHABLE_CODE(return (size_t)-1);
4985 }
4986 
4987 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)4988 void LoadBalancedCall::PendingBatchesAdd(
4989     grpc_transport_stream_op_batch* batch) {
4990   const size_t idx = GetBatchIndex(batch);
4991   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4992     gpr_log(GPR_INFO,
4993             "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
4994             chand_, this, idx);
4995   }
4996   GPR_ASSERT(pending_batches_[idx] == nullptr);
4997   pending_batches_[idx] = batch;
4998 }
4999 
5000 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)5001 void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg,
5002                                                       grpc_error* error) {
5003   grpc_transport_stream_op_batch* batch =
5004       static_cast<grpc_transport_stream_op_batch*>(arg);
5005   auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
5006   // Note: This will release the call combiner.
5007   grpc_transport_stream_op_batch_finish_with_failure(
5008       batch, GRPC_ERROR_REF(error), self->call_combiner_);
5009 }
5010 
5011 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)5012 void LoadBalancedCall::PendingBatchesFail(
5013     grpc_error* error,
5014     YieldCallCombinerPredicate yield_call_combiner_predicate) {
5015   GPR_ASSERT(error != GRPC_ERROR_NONE);
5016   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5017     size_t num_batches = 0;
5018     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5019       if (pending_batches_[i] != nullptr) ++num_batches;
5020     }
5021     gpr_log(GPR_INFO,
5022             "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
5023             chand_, this, num_batches, grpc_error_string(error));
5024   }
5025   CallCombinerClosureList closures;
5026   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5027     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5028     if (batch != nullptr) {
5029       batch->handler_private.extra_arg = this;
5030       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5031                         FailPendingBatchInCallCombiner, batch,
5032                         grpc_schedule_on_exec_ctx);
5033       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
5034                    "PendingBatchesFail");
5035       batch = nullptr;
5036     }
5037   }
5038   if (yield_call_combiner_predicate(closures)) {
5039     closures.RunClosures(call_combiner_);
5040   } else {
5041     closures.RunClosuresWithoutYielding(call_combiner_);
5042   }
5043   GRPC_ERROR_UNREF(error);
5044 }
5045 
5046 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)5047 void LoadBalancedCall::ResumePendingBatchInCallCombiner(
5048     void* arg, grpc_error* /*ignored*/) {
5049   grpc_transport_stream_op_batch* batch =
5050       static_cast<grpc_transport_stream_op_batch*>(arg);
5051   SubchannelCall* subchannel_call =
5052       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
5053   // Note: This will release the call combiner.
5054   subchannel_call->StartTransportStreamOpBatch(batch);
5055 }
5056 
5057 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()5058 void LoadBalancedCall::PendingBatchesResume() {
5059   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5060     size_t num_batches = 0;
5061     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5062       if (pending_batches_[i] != nullptr) ++num_batches;
5063     }
5064     gpr_log(GPR_INFO,
5065             "chand=%p lb_call=%p: starting %" PRIuPTR
5066             " pending batches on subchannel_call=%p",
5067             chand_, this, num_batches, subchannel_call_.get());
5068   }
5069   CallCombinerClosureList closures;
5070   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5071     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5072     if (batch != nullptr) {
5073       batch->handler_private.extra_arg = subchannel_call_.get();
5074       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5075                         ResumePendingBatchInCallCombiner, batch,
5076                         grpc_schedule_on_exec_ctx);
5077       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
5078                    "PendingBatchesResume");
5079       batch = nullptr;
5080     }
5081   }
5082   // Note: This will release the call combiner.
5083   closures.RunClosures(call_combiner_);
5084 }
5085 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)5086 void LoadBalancedCall::StartTransportStreamOpBatch(
5087     grpc_transport_stream_op_batch* batch) {
5088   // Intercept recv_trailing_metadata_ready for LB callback.
5089   if (batch->recv_trailing_metadata) {
5090     InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
5091   }
5092   // If we've previously been cancelled, immediately fail any new batches.
5093   if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
5094     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5095       gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
5096               chand_, this, grpc_error_string(cancel_error_));
5097     }
5098     // Note: This will release the call combiner.
5099     grpc_transport_stream_op_batch_finish_with_failure(
5100         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5101     return;
5102   }
5103   // Handle cancellation.
5104   if (GPR_UNLIKELY(batch->cancel_stream)) {
5105     // Stash a copy of cancel_error in our call data, so that we can use
5106     // it for subsequent operations.  This ensures that if the call is
5107     // cancelled before any batches are passed down (e.g., if the deadline
5108     // is in the past when the call starts), we can return the right
5109     // error to the caller when the first batch does get passed down.
5110     GRPC_ERROR_UNREF(cancel_error_);
5111     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
5112     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5113       gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
5114               chand_, this, grpc_error_string(cancel_error_));
5115     }
5116     // If we do not have a subchannel call (i.e., a pick has not yet
5117     // been started), fail all pending batches.  Otherwise, send the
5118     // cancellation down to the subchannel call.
5119     if (subchannel_call_ == nullptr) {
5120       PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
5121       // Note: This will release the call combiner.
5122       grpc_transport_stream_op_batch_finish_with_failure(
5123           batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5124     } else {
5125       // Note: This will release the call combiner.
5126       subchannel_call_->StartTransportStreamOpBatch(batch);
5127     }
5128     return;
5129   }
5130   // Add the batch to the pending list.
5131   PendingBatchesAdd(batch);
5132   // Check if we've already gotten a subchannel call.
5133   // Note that once we have picked a subchannel, we do not need to acquire
5134   // the channel's data plane mutex, which is more efficient (especially for
5135   // streaming calls).
5136   if (subchannel_call_ != nullptr) {
5137     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5138       gpr_log(GPR_INFO,
5139               "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
5140               chand_, this, subchannel_call_.get());
5141     }
5142     PendingBatchesResume();
5143     return;
5144   }
5145   // We do not yet have a subchannel call.
5146   // For batches containing a send_initial_metadata op, acquire the
5147   // channel's data plane mutex to pick a subchannel.
5148   if (GPR_LIKELY(batch->send_initial_metadata)) {
5149     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5150       gpr_log(GPR_INFO,
5151               "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
5152               chand_, this);
5153     }
5154     PickSubchannel(this, GRPC_ERROR_NONE);
5155   } else {
5156     // For all other batches, release the call combiner.
5157     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5158       gpr_log(GPR_INFO,
5159               "chand=%p lb_call=%p: saved batch, yielding call combiner",
5160               chand_, this);
5161     }
5162     GRPC_CALL_COMBINER_STOP(call_combiner_,
5163                             "batch does not include send_initial_metadata");
5164   }
5165 }
5166 
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)5167 void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy(
5168     void* arg, grpc_error* error) {
5169   auto* self = static_cast<LoadBalancedCall*>(arg);
5170   if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
5171     // Set error if call did not succeed.
5172     grpc_error* error_for_lb = GRPC_ERROR_NONE;
5173     if (error != GRPC_ERROR_NONE) {
5174       error_for_lb = error;
5175     } else {
5176       const auto& fields = self->recv_trailing_metadata_->idx.named;
5177       GPR_ASSERT(fields.grpc_status != nullptr);
5178       grpc_status_code status =
5179           grpc_get_status_code_from_metadata(fields.grpc_status->md);
5180       std::string msg;
5181       if (status != GRPC_STATUS_OK) {
5182         error_for_lb = grpc_error_set_int(
5183             GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
5184             GRPC_ERROR_INT_GRPC_STATUS, status);
5185         if (fields.grpc_message != nullptr) {
5186           error_for_lb = grpc_error_set_str(
5187               error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
5188               grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
5189         }
5190       }
5191     }
5192     // Invoke callback to LB policy.
5193     Metadata trailing_metadata(self, self->recv_trailing_metadata_);
5194     LbCallState lb_call_state(self);
5195     self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
5196                                            &lb_call_state);
5197     if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
5198   }
5199   // Chain to original callback.
5200   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
5201                GRPC_ERROR_REF(error));
5202 }
5203 
5204 // TODO(roth): Consider not intercepting this callback unless we
5205 // actually need to, if this causes a performance problem.
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)5206 void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
5207     grpc_transport_stream_op_batch* batch) {
5208   recv_trailing_metadata_ =
5209       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
5210   original_recv_trailing_metadata_ready_ =
5211       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
5212   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
5213                     RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
5214                     grpc_schedule_on_exec_ctx);
5215   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
5216       &recv_trailing_metadata_ready_;
5217 }
5218 
CreateSubchannelCall()5219 void LoadBalancedCall::CreateSubchannelCall() {
5220   SubchannelCall::Args call_args = {
5221       std::move(connected_subchannel_), pollent_, path_, call_start_time_,
5222       deadline_, arena_,
5223       // TODO(roth): When we implement hedging support, we will probably
5224       // need to use a separate call context for each subchannel call.
5225       call_context_, call_combiner_};
5226   grpc_error* error = GRPC_ERROR_NONE;
5227   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
5228   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5229     gpr_log(GPR_INFO,
5230             "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
5231             this, subchannel_call_.get(), grpc_error_string(error));
5232   }
5233   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
5234     PendingBatchesFail(error, YieldCallCombiner);
5235   } else {
5236     PendingBatchesResume();
5237   }
5238 }
5239 
5240 // A class to handle the call combiner cancellation callback for a
5241 // queued pick.
5242 // TODO(roth): When we implement hedging support, we won't be able to
5243 // register a call combiner cancellation closure for each LB pick,
5244 // because there may be multiple LB picks happening in parallel.
5245 // Instead, we will probably need to maintain a list in the CallData
5246 // object of pending LB picks to be cancelled when the closure runs.
5247 class LoadBalancedCall::LbQueuedCallCanceller {
5248  public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)5249   explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
5250       : lb_call_(std::move(lb_call)) {
5251     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
5252     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
5253     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
5254   }
5255 
5256  private:
CancelLocked(void * arg,grpc_error * error)5257   static void CancelLocked(void* arg, grpc_error* error) {
5258     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
5259     auto* lb_call = self->lb_call_.get();
5260     auto* chand = lb_call->chand_;
5261     {
5262       MutexLock lock(chand->data_plane_mu());
5263       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5264         gpr_log(GPR_INFO,
5265                 "chand=%p lb_call=%p: cancelling queued pick: "
5266                 "error=%s self=%p calld->pick_canceller=%p",
5267                 chand, lb_call, grpc_error_string(error), self,
5268                 lb_call->lb_call_canceller_);
5269       }
5270       if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
5271         // Remove pick from list of queued picks.
5272         lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
5273         // Fail pending batches on the call.
5274         lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
5275                                     YieldCallCombinerIfPendingBatchesFound);
5276       }
5277     }
5278     GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
5279     delete self;
5280   }
5281 
5282   RefCountedPtr<LoadBalancedCall> lb_call_;
5283   grpc_closure closure_;
5284 };
5285 
MaybeRemoveCallFromLbQueuedCallsLocked()5286 void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
5287   if (!queued_pending_lb_pick_) return;
5288   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5289     gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
5290             chand_, this);
5291   }
5292   chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
5293   queued_pending_lb_pick_ = false;
5294   // Lame the call combiner canceller.
5295   lb_call_canceller_ = nullptr;
5296 }
5297 
MaybeAddCallToLbQueuedCallsLocked()5298 void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
5299   if (queued_pending_lb_pick_) return;
5300   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5301     gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
5302             chand_, this);
5303   }
5304   queued_pending_lb_pick_ = true;
5305   queued_call_.lb_call = this;
5306   chand_->AddLbQueuedCall(&queued_call_, pollent_);
5307   // Register call combiner cancellation callback.
5308   lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
5309 }
5310 
AsyncPickDone(grpc_error * error)5311 void LoadBalancedCall::AsyncPickDone(grpc_error* error) {
5312   GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
5313   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
5314 }
5315 
PickDone(void * arg,grpc_error * error)5316 void LoadBalancedCall::PickDone(void* arg, grpc_error* error) {
5317   auto* self = static_cast<LoadBalancedCall*>(arg);
5318   if (error != GRPC_ERROR_NONE) {
5319     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5320       gpr_log(GPR_INFO,
5321               "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
5322               self->chand_, self, grpc_error_string(error));
5323     }
5324     self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
5325     return;
5326   }
5327   self->CreateSubchannelCall();
5328 }
5329 
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)5330 const char* PickResultTypeName(
5331     LoadBalancingPolicy::PickResult::ResultType type) {
5332   switch (type) {
5333     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
5334       return "COMPLETE";
5335     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5336       return "QUEUE";
5337     case LoadBalancingPolicy::PickResult::PICK_FAILED:
5338       return "FAILED";
5339   }
5340   GPR_UNREACHABLE_CODE(return "UNKNOWN");
5341 }
5342 
PickSubchannel(void * arg,grpc_error * error)5343 void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) {
5344   auto* self = static_cast<LoadBalancedCall*>(arg);
5345   bool pick_complete;
5346   {
5347     MutexLock lock(self->chand_->data_plane_mu());
5348     pick_complete = self->PickSubchannelLocked(&error);
5349   }
5350   if (pick_complete) {
5351     PickDone(self, error);
5352     GRPC_ERROR_UNREF(error);
5353   }
5354 }
5355 
PickSubchannelLocked(grpc_error ** error)5356 bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) {
5357   GPR_ASSERT(connected_subchannel_ == nullptr);
5358   GPR_ASSERT(subchannel_call_ == nullptr);
5359   // Grab initial metadata.
5360   auto& send_initial_metadata =
5361       pending_batches_[0]->payload->send_initial_metadata;
5362   grpc_metadata_batch* initial_metadata_batch =
5363       send_initial_metadata.send_initial_metadata;
5364   const uint32_t send_initial_metadata_flags =
5365       send_initial_metadata.send_initial_metadata_flags;
5366   // Perform LB pick.
5367   LoadBalancingPolicy::PickArgs pick_args;
5368   pick_args.path = StringViewFromSlice(path_);
5369   LbCallState lb_call_state(this);
5370   pick_args.call_state = &lb_call_state;
5371   Metadata initial_metadata(this, initial_metadata_batch);
5372   pick_args.initial_metadata = &initial_metadata;
5373   auto result = chand_->picker()->Pick(pick_args);
5374   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5375     gpr_log(
5376         GPR_INFO,
5377         "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
5378         chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
5379         grpc_error_string(result.error));
5380   }
5381   switch (result.type) {
5382     case LoadBalancingPolicy::PickResult::PICK_FAILED: {
5383       // If we're shutting down, fail all RPCs.
5384       grpc_error* disconnect_error = chand_->disconnect_error();
5385       if (disconnect_error != GRPC_ERROR_NONE) {
5386         GRPC_ERROR_UNREF(result.error);
5387         MaybeRemoveCallFromLbQueuedCallsLocked();
5388         *error = GRPC_ERROR_REF(disconnect_error);
5389         return true;
5390       }
5391       // If wait_for_ready is false, then the error indicates the RPC
5392       // attempt's final status.
5393       if ((send_initial_metadata_flags &
5394            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
5395         grpc_error* new_error =
5396             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
5397                 "Failed to pick subchannel", &result.error, 1);
5398         GRPC_ERROR_UNREF(result.error);
5399         *error = new_error;
5400         MaybeRemoveCallFromLbQueuedCallsLocked();
5401         return true;
5402       }
5403       // If wait_for_ready is true, then queue to retry when we get a new
5404       // picker.
5405       GRPC_ERROR_UNREF(result.error);
5406     }
5407     // Fallthrough
5408     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5409       MaybeAddCallToLbQueuedCallsLocked();
5410       return false;
5411     default:  // PICK_COMPLETE
5412       MaybeRemoveCallFromLbQueuedCallsLocked();
5413       // Handle drops.
5414       if (GPR_UNLIKELY(result.subchannel == nullptr)) {
5415         result.error = grpc_error_set_int(
5416             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
5417                 "Call dropped by load balancing policy"),
5418             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
5419       } else {
5420         // Grab a ref to the connected subchannel while we're still
5421         // holding the data plane mutex.
5422         connected_subchannel_ =
5423             chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
5424         GPR_ASSERT(connected_subchannel_ != nullptr);
5425       }
5426       lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
5427       *error = result.error;
5428       return true;
5429   }
5430 }
5431 
5432 }  // namespace
5433 }  // namespace grpc_core
5434 
5435 /*************************************************************************
5436  * EXPORTED SYMBOLS
5437  */
5438 
5439 using grpc_core::CallData;
5440 using grpc_core::ChannelData;
5441 
5442 const grpc_channel_filter grpc_client_channel_filter = {
5443     CallData::StartTransportStreamOpBatch,
5444     ChannelData::StartTransportOp,
5445     sizeof(CallData),
5446     CallData::Init,
5447     CallData::SetPollent,
5448     CallData::Destroy,
5449     sizeof(ChannelData),
5450     ChannelData::Init,
5451     ChannelData::Destroy,
5452     ChannelData::GetChannelInfo,
5453     "client-channel",
5454 };
5455 
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)5456 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
5457     grpc_channel_element* elem, int try_to_connect) {
5458   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5459   return chand->CheckConnectivityState(try_to_connect);
5460 }
5461 
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)5462 int grpc_client_channel_num_external_connectivity_watchers(
5463     grpc_channel_element* elem) {
5464   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5465   return chand->NumExternalConnectivityWatchers();
5466 }
5467 
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)5468 void grpc_client_channel_watch_connectivity_state(
5469     grpc_channel_element* elem, grpc_polling_entity pollent,
5470     grpc_connectivity_state* state, grpc_closure* on_complete,
5471     grpc_closure* watcher_timer_init) {
5472   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5473   if (state == nullptr) {
5474     // Handle cancellation.
5475     GPR_ASSERT(watcher_timer_init == nullptr);
5476     chand->RemoveExternalConnectivityWatcher(on_complete, /*cancel=*/true);
5477     return;
5478   }
5479   // Handle addition.
5480   return chand->AddExternalConnectivityWatcher(pollent, state, on_complete,
5481                                                watcher_timer_init);
5482 }
5483 
grpc_client_channel_start_connectivity_watch(grpc_channel_element * elem,grpc_connectivity_state initial_state,grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface> watcher)5484 void grpc_client_channel_start_connectivity_watch(
5485     grpc_channel_element* elem, grpc_connectivity_state initial_state,
5486     grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
5487         watcher) {
5488   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5489   chand->AddConnectivityWatcher(initial_state, std::move(watcher));
5490 }
5491 
grpc_client_channel_stop_connectivity_watch(grpc_channel_element * elem,grpc_core::AsyncConnectivityStateWatcherInterface * watcher)5492 void grpc_client_channel_stop_connectivity_watch(
5493     grpc_channel_element* elem,
5494     grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
5495   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5496   chand->RemoveConnectivityWatcher(watcher);
5497 }
5498