• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26 
27 #include <set>
28 
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38 
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41 
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_throttle.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/manual_constructor.h"
65 #include "src/core/lib/gprpp/sync.h"
66 #include "src/core/lib/iomgr/iomgr.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/iomgr/work_serializer.h"
69 #include "src/core/lib/profiling/timers.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/slice/slice_string_helpers.h"
72 #include "src/core/lib/surface/channel.h"
73 #include "src/core/lib/transport/connectivity_state.h"
74 #include "src/core/lib/transport/error_utils.h"
75 #include "src/core/lib/transport/metadata.h"
76 #include "src/core/lib/transport/metadata_batch.h"
77 #include "src/core/lib/transport/static_metadata.h"
78 #include "src/core/lib/transport/status_metadata.h"
79 
80 //
81 // Client channel filter
82 //
83 
84 // By default, we buffer 256 KiB per RPC for retries.
85 // TODO(roth): Do we have any data to suggest a better value?
86 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
87 
88 // This value was picked arbitrarily.  It can be changed if there is
89 // any even moderately compelling reason to do so.
90 #define RETRY_BACKOFF_JITTER 0.2
91 
92 // Max number of batches that can be pending on a call at any given
93 // time.  This includes one batch for each of the following ops:
94 //   recv_initial_metadata
95 //   send_initial_metadata
96 //   recv_message
97 //   send_message
98 //   recv_trailing_metadata
99 //   send_trailing_metadata
100 #define MAX_PENDING_BATCHES 6
101 
102 // Channel arg containing a pointer to the ChannelData object.
103 #define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
104 
105 // Channel arg containing a pointer to the RetryThrottleData object.
106 #define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
107 
108 namespace grpc_core {
109 
110 using internal::ClientChannelGlobalParsedConfig;
111 using internal::ClientChannelMethodParsedConfig;
112 using internal::ClientChannelServiceConfigParser;
113 using internal::ServerRetryThrottleData;
114 
115 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
116 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
117 
118 namespace {
119 
120 //
121 // ChannelData definition
122 //
123 
124 class LoadBalancedCall;
125 
126 class ChannelData {
127  public:
128   struct ResolverQueuedCall {
129     grpc_call_element* elem;
130     ResolverQueuedCall* next = nullptr;
131   };
132   struct LbQueuedCall {
133     LoadBalancedCall* lb_call;
134     LbQueuedCall* next = nullptr;
135   };
136 
137   static grpc_error* Init(grpc_channel_element* elem,
138                           grpc_channel_element_args* args);
139   static void Destroy(grpc_channel_element* elem);
140   static void StartTransportOp(grpc_channel_element* elem,
141                                grpc_transport_op* op);
142   static void GetChannelInfo(grpc_channel_element* elem,
143                              const grpc_channel_info* info);
144 
deadline_checking_enabled() const145   bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const146   bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const147   size_t per_rpc_retry_buffer_size() const {
148     return per_rpc_retry_buffer_size_;
149   }
owning_stack() const150   grpc_channel_stack* owning_stack() const { return owning_stack_; }
151 
152   // Note: Does NOT return a new ref.
disconnect_error() const153   grpc_error* disconnect_error() const {
154     return disconnect_error_.Load(MemoryOrder::ACQUIRE);
155   }
156 
resolution_mu() const157   Mutex* resolution_mu() const { return &resolution_mu_; }
158   // These methods all require holding resolution_mu_.
159   void AddResolverQueuedCall(ResolverQueuedCall* call,
160                              grpc_polling_entity* pollent);
161   void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
162                                 grpc_polling_entity* pollent);
received_service_config_data() const163   bool received_service_config_data() const {
164     return received_service_config_data_;
165   }
resolver_transient_failure_error() const166   grpc_error* resolver_transient_failure_error() const {
167     return resolver_transient_failure_error_;
168   }
service_config() const169   RefCountedPtr<ServiceConfig> service_config() const {
170     return service_config_;
171   }
config_selector() const172   ConfigSelector* config_selector() const { return config_selector_.get(); }
dynamic_filters() const173   RefCountedPtr<DynamicFilters> dynamic_filters() const {
174     return dynamic_filters_;
175   }
176 
data_plane_mu() const177   Mutex* data_plane_mu() const { return &data_plane_mu_; }
178   // These methods all require holding data_plane_mu_.
picker() const179   LoadBalancingPolicy::SubchannelPicker* picker() const {
180     return picker_.get();
181   }
182   void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent);
183   void RemoveLbQueuedCall(LbQueuedCall* to_remove,
184                           grpc_polling_entity* pollent);
185   RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
186       SubchannelInterface* subchannel) const;
187 
work_serializer() const188   WorkSerializer* work_serializer() const { return work_serializer_.get(); }
189 
190   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
191 
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)192   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
193                                       grpc_connectivity_state* state,
194                                       grpc_closure* on_complete,
195                                       grpc_closure* watcher_timer_init) {
196     new ExternalConnectivityWatcher(this, pollent, state, on_complete,
197                                     watcher_timer_init);
198   }
199 
RemoveExternalConnectivityWatcher(grpc_closure * on_complete,bool cancel)200   void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
201                                          bool cancel) {
202     ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
203         this, on_complete, cancel);
204   }
205 
NumExternalConnectivityWatchers() const206   int NumExternalConnectivityWatchers() const {
207     MutexLock lock(&external_watchers_mu_);
208     return static_cast<int>(external_watchers_.size());
209   }
210 
211   void AddConnectivityWatcher(
212       grpc_connectivity_state initial_state,
213       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
214   void RemoveConnectivityWatcher(
215       AsyncConnectivityStateWatcherInterface* watcher);
216 
217  private:
218   class SubchannelWrapper;
219   class ClientChannelControlHelper;
220   class ConnectivityWatcherAdder;
221   class ConnectivityWatcherRemover;
222 
223   // Represents a pending connectivity callback from an external caller
224   // via grpc_client_channel_watch_connectivity_state().
225   class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
226    public:
227     ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
228                                 grpc_connectivity_state* state,
229                                 grpc_closure* on_complete,
230                                 grpc_closure* watcher_timer_init);
231 
232     ~ExternalConnectivityWatcher() override;
233 
234     // Removes the watcher from the external_watchers_ map.
235     static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
236                                                      grpc_closure* on_complete,
237                                                      bool cancel);
238 
239     void Notify(grpc_connectivity_state state,
240                 const absl::Status& /* status */) override;
241 
242     void Cancel();
243 
244    private:
245     // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
246     // from Start().
247     void AddWatcherLocked();
248     void RemoveWatcherLocked();
249 
250     ChannelData* chand_;
251     grpc_polling_entity pollent_;
252     grpc_connectivity_state initial_state_;
253     grpc_connectivity_state* state_;
254     grpc_closure* on_complete_;
255     grpc_closure* watcher_timer_init_;
256     Atomic<bool> done_{false};
257   };
258 
259   class ResolverResultHandler : public Resolver::ResultHandler {
260    public:
ResolverResultHandler(ChannelData * chand)261     explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) {
262       GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
263     }
264 
~ResolverResultHandler()265     ~ResolverResultHandler() override {
266       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
267         gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
268       }
269       GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
270     }
271 
ReturnResult(Resolver::Result result)272     void ReturnResult(Resolver::Result result) override {
273       chand_->OnResolverResultChangedLocked(std::move(result));
274     }
275 
ReturnError(grpc_error * error)276     void ReturnError(grpc_error* error) override {
277       chand_->OnResolverErrorLocked(error);
278     }
279 
280    private:
281     ChannelData* chand_;
282   };
283 
284   ChannelData(grpc_channel_element_args* args, grpc_error** error);
285   ~ChannelData();
286 
287   // Note: All methods with "Locked" suffix must be invoked from within
288   // work_serializer_.
289 
290   void OnResolverResultChangedLocked(Resolver::Result result);
291   void OnResolverErrorLocked(grpc_error* error);
292 
293   void CreateOrUpdateLbPolicyLocked(
294       RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
295       Resolver::Result result);
296   OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
297       const grpc_channel_args& args);
298 
299   void UpdateStateAndPickerLocked(
300       grpc_connectivity_state state, const absl::Status& status,
301       const char* reason,
302       std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
303 
304   void UpdateServiceConfigInControlPlaneLocked(
305       RefCountedPtr<ServiceConfig> service_config,
306       RefCountedPtr<ConfigSelector> config_selector,
307       const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
308       const char* lb_policy_name);
309 
310   void UpdateServiceConfigInDataPlaneLocked();
311 
312   void CreateResolverLocked();
313   void DestroyResolverAndLbPolicyLocked();
314 
315   grpc_error* DoPingLocked(grpc_transport_op* op);
316 
317   void StartTransportOpLocked(grpc_transport_op* op);
318 
319   void TryToConnectLocked();
320 
321   //
322   // Fields set at construction and never modified.
323   //
324   const bool deadline_checking_enabled_;
325   const bool enable_retries_;
326   const size_t per_rpc_retry_buffer_size_;
327   grpc_channel_stack* owning_stack_;
328   ClientChannelFactory* client_channel_factory_;
329   const grpc_channel_args* channel_args_;
330   RefCountedPtr<ServiceConfig> default_service_config_;
331   std::string server_name_;
332   UniquePtr<char> target_uri_;
333   channelz::ChannelNode* channelz_node_;
334 
335   //
336   // Fields related to name resolution.  Guarded by resolution_mu_.
337   //
338   mutable Mutex resolution_mu_;
339   // Linked list of calls queued waiting for resolver result.
340   ResolverQueuedCall* resolver_queued_calls_ = nullptr;
341   // Data from service config.
342   grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
343   bool received_service_config_data_ = false;
344   RefCountedPtr<ServiceConfig> service_config_;
345   RefCountedPtr<ConfigSelector> config_selector_;
346   RefCountedPtr<DynamicFilters> dynamic_filters_;
347 
348   //
349   // Fields used in the data plane.  Guarded by data_plane_mu_.
350   //
351   mutable Mutex data_plane_mu_;
352   std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_;
353   // Linked list of calls queued waiting for LB pick.
354   LbQueuedCall* lb_queued_calls_ = nullptr;
355 
356   //
357   // Fields used in the control plane.  Guarded by work_serializer.
358   //
359   std::shared_ptr<WorkSerializer> work_serializer_;
360   grpc_pollset_set* interested_parties_;
361   ConnectivityStateTracker state_tracker_;
362   OrphanablePtr<Resolver> resolver_;
363   bool previous_resolution_contained_addresses_ = false;
364   RefCountedPtr<ServiceConfig> saved_service_config_;
365   RefCountedPtr<ConfigSelector> saved_config_selector_;
366   absl::optional<std::string> health_check_service_name_;
367   OrphanablePtr<LoadBalancingPolicy> lb_policy_;
368   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
369   // The number of SubchannelWrapper instances referencing a given Subchannel.
370   std::map<Subchannel*, int> subchannel_refcount_map_;
371   // The set of SubchannelWrappers that currently exist.
372   // No need to hold a ref, since the map is updated in the control-plane
373   // work_serializer when the SubchannelWrappers are created and destroyed.
374   std::set<SubchannelWrapper*> subchannel_wrappers_;
375   // Pending ConnectedSubchannel updates for each SubchannelWrapper.
376   // Updates are queued here in the control plane work_serializer and then
377   // applied in the data plane mutex when the picker is updated.
378   std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
379       pending_subchannel_updates_;
380   int keepalive_time_ = -1;
381 
382   //
383   // Fields accessed from both data plane mutex and control plane
384   // work_serializer.
385   //
386   Atomic<grpc_error*> disconnect_error_;
387 
388   //
389   // Fields guarded by a mutex, since they need to be accessed
390   // synchronously via get_channel_info().
391   //
392   gpr_mu info_mu_;
393   UniquePtr<char> info_lb_policy_name_;
394   UniquePtr<char> info_service_config_json_;
395 
396   //
397   // Fields guarded by a mutex, since they need to be accessed
398   // synchronously via grpc_channel_num_external_connectivity_watchers().
399   //
400   mutable Mutex external_watchers_mu_;
401   std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
402       external_watchers_;
403 };
404 
405 //
406 // CallData definition
407 //
408 
409 class CallData {
410  public:
411   static grpc_error* Init(grpc_call_element* elem,
412                           const grpc_call_element_args* args);
413   static void Destroy(grpc_call_element* elem,
414                       const grpc_call_final_info* final_info,
415                       grpc_closure* then_schedule_closure);
416   static void StartTransportStreamOpBatch(
417       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
418   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
419 
420   // Invoked by channel for queued calls when name resolution is completed.
421   static void CheckResolution(void* arg, grpc_error* error);
422   // Helper function for applying the service config to a call while
423   // holding ChannelData::resolution_mu_.
424   // Returns true if the service config has been applied to the call, in which
425   // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
426   // with the returned error.
427   bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error);
428   // Schedules a callback to continue processing the call once
429   // resolution is complete.  The callback will not run until after this
430   // method returns.
431   void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error);
432 
433  private:
434   class ResolverQueuedCallCanceller;
435 
436   CallData(grpc_call_element* elem, const ChannelData& chand,
437            const grpc_call_element_args& args);
438   ~CallData();
439 
440   // Returns the index into pending_batches_ to be used for batch.
441   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
442   void PendingBatchesAdd(grpc_call_element* elem,
443                          grpc_transport_stream_op_batch* batch);
444   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
445   // A predicate type and some useful implementations for PendingBatchesFail().
446   typedef bool (*YieldCallCombinerPredicate)(
447       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)448   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
449     return true;
450   }
NoYieldCallCombiner(const CallCombinerClosureList &)451   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
452     return false;
453   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)454   static bool YieldCallCombinerIfPendingBatchesFound(
455       const CallCombinerClosureList& closures) {
456     return closures.size() > 0;
457   }
458   // Fails all pending batches.
459   // If yield_call_combiner_predicate returns true, assumes responsibility for
460   // yielding the call combiner.
461   void PendingBatchesFail(
462       grpc_call_element* elem, grpc_error* error,
463       YieldCallCombinerPredicate yield_call_combiner_predicate);
464   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
465   // Resumes all pending batches on lb_call_.
466   void PendingBatchesResume(grpc_call_element* elem);
467 
468   // Applies service config to the call.  Must be invoked once we know
469   // that the resolver has returned results to the channel.
470   // If an error is returned, the error indicates the status with which
471   // the call should be failed.
472   grpc_error* ApplyServiceConfigToCallLocked(
473       grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
474   // Invoked when the resolver result is applied to the caller, on both
475   // success or failure.
476   static void ResolutionDone(void* arg, grpc_error* error);
477   // Removes the call (if present) from the channel's list of calls queued
478   // for name resolution.
479   void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem);
480   // Adds the call (if not already present) to the channel's list of
481   // calls queued for name resolution.
482   void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem);
483 
484   static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
485       void* arg, grpc_error* error);
486   void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
487       grpc_transport_stream_op_batch* batch);
488 
489   void CreateDynamicCall(grpc_call_element* elem);
490 
491   // State for handling deadlines.
492   // The code in deadline_filter.c requires this to be the first field.
493   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
494   // and this struct both independently store pointers to the call stack
495   // and call combiner.  If/when we have time, find a way to avoid this
496   // without breaking the grpc_deadline_state abstraction.
497   grpc_deadline_state deadline_state_;
498 
499   grpc_slice path_;  // Request path.
500   gpr_cycle_counter call_start_time_;
501   grpc_millis deadline_;
502   Arena* arena_;
503   grpc_call_stack* owning_call_;
504   CallCombiner* call_combiner_;
505   grpc_call_context_element* call_context_;
506 
507   grpc_polling_entity* pollent_ = nullptr;
508 
509   grpc_closure pick_closure_;
510 
511   // Accessed while holding ChannelData::resolution_mu_.
512   bool service_config_applied_ = false;
513   bool queued_pending_resolver_result_ = false;
514   ChannelData::ResolverQueuedCall resolver_queued_call_;
515   ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
516 
517   std::function<void()> on_call_committed_;
518 
519   grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
520   grpc_closure recv_initial_metadata_ready_;
521 
522   RefCountedPtr<DynamicFilters> dynamic_filters_;
523   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
524 
525   // Batches are added to this list when received from above.
526   // They are removed when we are done handling the batch (i.e., when
527   // either we have invoked all of the batch's callbacks or we have
528   // passed the batch down to the LB call and are not intercepting any of
529   // its callbacks).
530   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
531 
532   // Set when we get a cancel_stream op.
533   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
534 };
535 
536 //
537 // RetryingCall definition
538 //
539 
540 class RetryingCall {
541  public:
542   RetryingCall(
543       ChannelData* chand, const grpc_call_element_args& args,
544       grpc_polling_entity* pollent,
545       RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
546       const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy);
547   ~RetryingCall();
548 
549   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
550 
551   RefCountedPtr<SubchannelCall> subchannel_call() const;
552 
553  private:
554   // State used for starting a retryable batch on a subchannel call.
555   // This provides its own grpc_transport_stream_op_batch and other data
556   // structures needed to populate the ops in the batch.
557   // We allocate one struct on the arena for each attempt at starting a
558   // batch on a given subchannel call.
559   struct SubchannelCallBatchData {
560     // Creates a SubchannelCallBatchData object on the call's arena with the
561     // specified refcount.  If set_on_complete is true, the batch's
562     // on_complete callback will be set to point to on_complete();
563     // otherwise, the batch's on_complete callback will be null.
564     static SubchannelCallBatchData* Create(RetryingCall* call, int refcount,
565                                            bool set_on_complete);
566 
Unrefgrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallBatchData567     void Unref() {
568       if (gpr_unref(&refs)) Destroy();
569     }
570 
571     SubchannelCallBatchData(RetryingCall* call, int refcount,
572                             bool set_on_complete);
573     // All dtor code must be added in `Destroy()`. This is because we may
574     // call closures in `SubchannelCallBatchData` after they are unrefed by
575     // `Unref()`, and msan would complain about accessing this class
576     // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
577     // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallBatchData578     ~SubchannelCallBatchData() { Destroy(); }
579     void Destroy();
580 
581     gpr_refcount refs;
582     grpc_call_element* elem;
583     RetryingCall* call;
584     RefCountedPtr<LoadBalancedCall> lb_call;
585     // The batch to use in the subchannel call.
586     // Its payload field points to SubchannelCallRetryState::batch_payload.
587     grpc_transport_stream_op_batch batch;
588     // For intercepting on_complete.
589     grpc_closure on_complete;
590   };
591 
592   // Retry state associated with a subchannel call.
593   // Stored in the parent_data of the subchannel call object.
594   struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anon5e2240680111::RetryingCall::SubchannelCallRetryState595     explicit SubchannelCallRetryState(grpc_call_context_element* context)
596         : batch_payload(context),
597           started_send_initial_metadata(false),
598           completed_send_initial_metadata(false),
599           started_send_trailing_metadata(false),
600           completed_send_trailing_metadata(false),
601           started_recv_initial_metadata(false),
602           completed_recv_initial_metadata(false),
603           started_recv_trailing_metadata(false),
604           completed_recv_trailing_metadata(false),
605           retry_dispatched(false) {}
606 
607     // SubchannelCallBatchData.batch.payload points to this.
608     grpc_transport_stream_op_batch_payload batch_payload;
609     // For send_initial_metadata.
610     // Note that we need to make a copy of the initial metadata for each
611     // subchannel call instead of just referring to the copy in call_data,
612     // because filters in the subchannel stack will probably add entries,
613     // so we need to start in a pristine state for each attempt of the call.
614     grpc_linked_mdelem* send_initial_metadata_storage;
615     grpc_metadata_batch send_initial_metadata;
616     // For send_message.
617     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
618     ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
619     // For send_trailing_metadata.
620     grpc_linked_mdelem* send_trailing_metadata_storage;
621     grpc_metadata_batch send_trailing_metadata;
622     // For intercepting recv_initial_metadata.
623     grpc_metadata_batch recv_initial_metadata;
624     grpc_closure recv_initial_metadata_ready;
625     bool trailing_metadata_available = false;
626     // For intercepting recv_message.
627     grpc_closure recv_message_ready;
628     OrphanablePtr<ByteStream> recv_message;
629     // For intercepting recv_trailing_metadata.
630     grpc_metadata_batch recv_trailing_metadata;
631     grpc_transport_stream_stats collect_stats;
632     grpc_closure recv_trailing_metadata_ready;
633     // These fields indicate which ops have been started and completed on
634     // this subchannel call.
635     size_t started_send_message_count = 0;
636     size_t completed_send_message_count = 0;
637     size_t started_recv_message_count = 0;
638     size_t completed_recv_message_count = 0;
639     bool started_send_initial_metadata : 1;
640     bool completed_send_initial_metadata : 1;
641     bool started_send_trailing_metadata : 1;
642     bool completed_send_trailing_metadata : 1;
643     bool started_recv_initial_metadata : 1;
644     bool completed_recv_initial_metadata : 1;
645     bool started_recv_trailing_metadata : 1;
646     bool completed_recv_trailing_metadata : 1;
647     // State for callback processing.
648     SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
649         nullptr;
650     grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
651     SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
652     grpc_error* recv_message_error = GRPC_ERROR_NONE;
653     SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
654     // NOTE: Do not move this next to the metadata bitfields above. That would
655     //       save space but will also result in a data race because compiler
656     //       will generate a 2 byte store which overwrites the meta-data
657     //       fields upon setting this field.
658     bool retry_dispatched : 1;
659   };
660 
661   // Pending batches stored in call data.
662   struct PendingBatch {
663     // The pending batch.  If nullptr, this slot is empty.
664     grpc_transport_stream_op_batch* batch = nullptr;
665     // Indicates whether payload for send ops has been cached in CallData.
666     bool send_ops_cached = false;
667   };
668 
669   // Caches data for send ops so that it can be retried later, if not
670   // already cached.
671   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
672   void FreeCachedSendInitialMetadata();
673   // Frees cached send_message at index idx.
674   void FreeCachedSendMessage(size_t idx);
675   void FreeCachedSendTrailingMetadata();
676   // Frees cached send ops that have already been completed after
677   // committing the call.
678   void FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState* retry_state);
679   // Frees cached send ops that were completed by the completed batch in
680   // batch_data.  Used when batches are completed after the call is committed.
681   void FreeCachedSendOpDataForCompletedBatch(
682       SubchannelCallBatchData* batch_data,
683       SubchannelCallRetryState* retry_state);
684 
685   // Returns the index into pending_batches_ to be used for batch.
686   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
687   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
688   void PendingBatchClear(PendingBatch* pending);
689   void MaybeClearPendingBatch(PendingBatch* pending);
690   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
691   // A predicate type and some useful implementations for PendingBatchesFail().
692   typedef bool (*YieldCallCombinerPredicate)(
693       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)694   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
695     return true;
696   }
NoYieldCallCombiner(const CallCombinerClosureList &)697   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
698     return false;
699   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)700   static bool YieldCallCombinerIfPendingBatchesFound(
701       const CallCombinerClosureList& closures) {
702     return closures.size() > 0;
703   }
704   // Fails all pending batches.
705   // If yield_call_combiner_predicate returns true, assumes responsibility for
706   // yielding the call combiner.
707   void PendingBatchesFail(
708       grpc_error* error,
709       YieldCallCombinerPredicate yield_call_combiner_predicate);
710   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
711   // Resumes all pending batches on lb_call_.
712   void PendingBatchesResume();
713   // Returns a pointer to the first pending batch for which predicate(batch)
714   // returns true, or null if not found.
715   template <typename Predicate>
716   PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
717 
718   // Commits the call so that no further retry attempts will be performed.
719   void RetryCommit(SubchannelCallRetryState* retry_state);
720   // Starts a retry after appropriate back-off.
721   void DoRetry(SubchannelCallRetryState* retry_state,
722                grpc_millis server_pushback_ms);
723   // Returns true if the call is being retried.
724   bool MaybeRetry(SubchannelCallBatchData* batch_data, grpc_status_code status,
725                   grpc_mdelem* server_pushback_md);
726 
727   // Invokes recv_initial_metadata_ready for a subchannel batch.
728   static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
729   // Intercepts recv_initial_metadata_ready callback for retries.
730   // Commits the call and returns the initial metadata up the stack.
731   static void RecvInitialMetadataReady(void* arg, grpc_error* error);
732 
733   // Invokes recv_message_ready for a subchannel batch.
734   static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
735   // Intercepts recv_message_ready callback for retries.
736   // Commits the call and returns the message up the stack.
737   static void RecvMessageReady(void* arg, grpc_error* error);
738 
739   // Sets *status and *server_pushback_md based on md_batch and error.
740   // Only sets *server_pushback_md if server_pushback_md != nullptr.
741   void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
742                      grpc_status_code* status,
743                      grpc_mdelem** server_pushback_md);
744   // Adds recv_trailing_metadata_ready closure to closures.
745   void AddClosureForRecvTrailingMetadataReady(
746       SubchannelCallBatchData* batch_data, grpc_error* error,
747       CallCombinerClosureList* closures);
748   // Adds any necessary closures for deferred recv_initial_metadata and
749   // recv_message callbacks to closures.
750   static void AddClosuresForDeferredRecvCallbacks(
751       SubchannelCallBatchData* batch_data,
752       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
753   // Returns true if any op in the batch was not yet started.
754   // Only looks at send ops, since recv ops are always started immediately.
755   bool PendingBatchIsUnstarted(PendingBatch* pending,
756                                SubchannelCallRetryState* retry_state);
757   // For any pending batch containing an op that has not yet been started,
758   // adds the pending batch's completion closures to closures.
759   void AddClosuresToFailUnstartedPendingBatches(
760       SubchannelCallRetryState* retry_state, grpc_error* error,
761       CallCombinerClosureList* closures);
762   // Runs necessary closures upon completion of a call attempt.
763   void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
764                                    grpc_error* error);
765   // Intercepts recv_trailing_metadata_ready callback for retries.
766   // Commits the call and returns the trailing metadata up the stack.
767   static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
768 
769   // Adds the on_complete closure for the pending batch completed in
770   // batch_data to closures.
771   void AddClosuresForCompletedPendingBatch(SubchannelCallBatchData* batch_data,
772                                            grpc_error* error,
773                                            CallCombinerClosureList* closures);
774 
775   // If there are any cached ops to replay or pending ops to start on the
776   // subchannel call, adds a closure to closures to invoke
777   // StartRetriableSubchannelBatches().
778   void AddClosuresForReplayOrPendingSendOps(
779       SubchannelCallBatchData* batch_data,
780       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
781 
782   // Callback used to intercept on_complete from subchannel calls.
783   // Called only when retries are enabled.
784   static void OnComplete(void* arg, grpc_error* error);
785 
786   static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
787   // Adds a closure to closures that will execute batch in the call combiner.
788   void AddClosureForSubchannelBatch(grpc_transport_stream_op_batch* batch,
789                                     CallCombinerClosureList* closures);
790   // Adds retriable send_initial_metadata op to batch_data.
791   void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
792                                          SubchannelCallBatchData* batch_data);
793   // Adds retriable send_message op to batch_data.
794   void AddRetriableSendMessageOp(SubchannelCallRetryState* retry_state,
795                                  SubchannelCallBatchData* batch_data);
796   // Adds retriable send_trailing_metadata op to batch_data.
797   void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
798                                           SubchannelCallBatchData* batch_data);
799   // Adds retriable recv_initial_metadata op to batch_data.
800   void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
801                                          SubchannelCallBatchData* batch_data);
802   // Adds retriable recv_message op to batch_data.
803   void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
804                                  SubchannelCallBatchData* batch_data);
805   // Adds retriable recv_trailing_metadata op to batch_data.
806   void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
807                                           SubchannelCallBatchData* batch_data);
808   // Helper function used to start a recv_trailing_metadata batch.  This
809   // is used in the case where a recv_initial_metadata or recv_message
810   // op fails in a way that we know the call is over but when the application
811   // has not yet started its own recv_trailing_metadata op.
812   void StartInternalRecvTrailingMetadata();
813   // If there are any cached send ops that need to be replayed on the
814   // current subchannel call, creates and returns a new subchannel batch
815   // to replay those ops.  Otherwise, returns nullptr.
816   SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
817       SubchannelCallRetryState* retry_state);
818   // Adds subchannel batches for pending batches to closures.
819   void AddSubchannelBatchesForPendingBatches(
820       SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
821   // Constructs and starts whatever subchannel batches are needed on the
822   // subchannel call.
823   static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
824 
825   static void CreateLbCall(void* arg, grpc_error* error);
826 
827   ChannelData* chand_;
828   grpc_polling_entity* pollent_;
829   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
830   const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
831   BackOff retry_backoff_;
832 
833   grpc_slice path_;  // Request path.
834   gpr_cycle_counter call_start_time_;
835   grpc_millis deadline_;
836   Arena* arena_;
837   grpc_call_stack* owning_call_;
838   CallCombiner* call_combiner_;
839   grpc_call_context_element* call_context_;
840 
841   grpc_closure retry_closure_;
842 
843   RefCountedPtr<LoadBalancedCall> lb_call_;
844 
845   // Batches are added to this list when received from above.
846   // They are removed when we are done handling the batch (i.e., when
847   // either we have invoked all of the batch's callbacks or we have
848   // passed the batch down to the LB call and are not intercepting any of
849   // its callbacks).
850   // TODO(roth): Now that the retry code is split out into its own call
851   // object, revamp this to work in a cleaner way, since we no longer need
852   // for batches to ever wait for name resolution or LB picks.
853   PendingBatch pending_batches_[MAX_PENDING_BATCHES];
854   bool pending_send_initial_metadata_ : 1;
855   bool pending_send_message_ : 1;
856   bool pending_send_trailing_metadata_ : 1;
857 
858   // Set when we get a cancel_stream op.
859   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
860 
861   // Retry state.
862   bool enable_retries_ : 1;
863   bool retry_committed_ : 1;
864   bool last_attempt_got_server_pushback_ : 1;
865   int num_attempts_completed_ = 0;
866   size_t bytes_buffered_for_retry_ = 0;
867   grpc_timer retry_timer_;
868 
869   // The number of pending retriable subchannel batches containing send ops.
870   // We hold a ref to the call stack while this is non-zero, since replay
871   // batches may not complete until after all callbacks have been returned
872   // to the surface, and we need to make sure that the call is not destroyed
873   // until all of these batches have completed.
874   // Note that we actually only need to track replay batches, but it's
875   // easier to track all batches with send ops.
876   int num_pending_retriable_subchannel_send_batches_ = 0;
877 
878   // Cached data for retrying send ops.
879   // send_initial_metadata
880   bool seen_send_initial_metadata_ = false;
881   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
882   grpc_metadata_batch send_initial_metadata_;
883   uint32_t send_initial_metadata_flags_;
884   gpr_atm* peer_string_;
885   // send_message
886   // When we get a send_message op, we replace the original byte stream
887   // with a CachingByteStream that caches the slices to a local buffer for
888   // use in retries.
889   // Note: We inline the cache for the first 3 send_message ops and use
890   // dynamic allocation after that.  This number was essentially picked
891   // at random; it could be changed in the future to tune performance.
892   absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
893   // send_trailing_metadata
894   bool seen_send_trailing_metadata_ = false;
895   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
896   grpc_metadata_batch send_trailing_metadata_;
897 };
898 
899 //
900 // LoadBalancedCall definition
901 //
902 
903 // This object is ref-counted, but it cannot inherit from RefCounted<>,
904 // because it is allocated on the arena and can't free its memory when
905 // its refcount goes to zero.  So instead, it manually implements the
906 // same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
907 class LoadBalancedCall {
908  public:
909   static RefCountedPtr<LoadBalancedCall> Create(
910       ChannelData* chand, const grpc_call_element_args& args,
911       grpc_polling_entity* pollent, size_t parent_data_size);
912 
913   LoadBalancedCall(ChannelData* chand, const grpc_call_element_args& args,
914                    grpc_polling_entity* pollent);
915   ~LoadBalancedCall();
916 
917   // Interface of RefCounted<>.
918   RefCountedPtr<LoadBalancedCall> Ref() GRPC_MUST_USE_RESULT;
919   RefCountedPtr<LoadBalancedCall> Ref(const DebugLocation& location,
920                                       const char* reason) GRPC_MUST_USE_RESULT;
921   // When refcount drops to 0, destroys itself and the associated call stack,
922   // but does NOT free the memory because it's in the call arena.
923   void Unref();
924   void Unref(const DebugLocation& location, const char* reason);
925 
926   void* GetParentData();
927 
928   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
929 
930   // Invoked by channel for queued LB picks when the picker is updated.
931   static void PickSubchannel(void* arg, grpc_error* error);
932   // Helper function for performing an LB pick while holding the data plane
933   // mutex.  Returns true if the pick is complete, in which case the caller
934   // must invoke PickDone() or AsyncPickDone() with the returned error.
935   bool PickSubchannelLocked(grpc_error** error);
936   // Schedules a callback to process the completed pick.  The callback
937   // will not run until after this method returns.
938   void AsyncPickDone(grpc_error* error);
939 
subchannel_call() const940   RefCountedPtr<SubchannelCall> subchannel_call() const {
941     return subchannel_call_;
942   }
943 
944  private:
945   // Allow RefCountedPtr<> to access IncrementRefCount().
946   template <typename T>
947   friend class ::grpc_core::RefCountedPtr;
948 
949   class LbQueuedCallCanceller;
950   class Metadata;
951   class LbCallState;
952 
953   // Interface of RefCounted<>.
954   void IncrementRefCount();
955   void IncrementRefCount(const DebugLocation& location, const char* reason);
956 
957   // Returns the index into pending_batches_ to be used for batch.
958   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
959   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
960   static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
961   // A predicate type and some useful implementations for PendingBatchesFail().
962   typedef bool (*YieldCallCombinerPredicate)(
963       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)964   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
965     return true;
966   }
NoYieldCallCombiner(const CallCombinerClosureList &)967   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
968     return false;
969   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)970   static bool YieldCallCombinerIfPendingBatchesFound(
971       const CallCombinerClosureList& closures) {
972     return closures.size() > 0;
973   }
974   // Fails all pending batches.
975   // If yield_call_combiner_predicate returns true, assumes responsibility for
976   // yielding the call combiner.
977   void PendingBatchesFail(
978       grpc_error* error,
979       YieldCallCombinerPredicate yield_call_combiner_predicate);
980   static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
981   // Resumes all pending batches on subchannel_call_.
982   void PendingBatchesResume();
983 
984   static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
985       void* arg, grpc_error* error);
986   void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
987       grpc_transport_stream_op_batch* batch);
988 
989   void CreateSubchannelCall();
990   // Invoked when a pick is completed, on both success or failure.
991   static void PickDone(void* arg, grpc_error* error);
992   // Removes the call from the channel's list of queued picks if present.
993   void MaybeRemoveCallFromLbQueuedCallsLocked();
994   // Adds the call to the channel's list of queued picks if not already present.
995   void MaybeAddCallToLbQueuedCallsLocked();
996 
997   RefCount refs_;
998 
999   ChannelData* chand_;
1000 
1001   // TODO(roth): Instead of duplicating these fields in every filter
1002   // that uses any one of them, we should store them in the call
1003   // context.  This will save per-call memory overhead.
1004   grpc_slice path_;  // Request path.
1005   gpr_cycle_counter call_start_time_;
1006   grpc_millis deadline_;
1007   Arena* arena_;
1008   grpc_call_stack* owning_call_;
1009   CallCombiner* call_combiner_;
1010   grpc_call_context_element* call_context_;
1011 
1012   // Set when we get a cancel_stream op.
1013   grpc_error* cancel_error_ = GRPC_ERROR_NONE;
1014 
1015   grpc_polling_entity* pollent_ = nullptr;
1016 
1017   grpc_closure pick_closure_;
1018 
1019   // Accessed while holding ChannelData::data_plane_mu_.
1020   ChannelData::LbQueuedCall queued_call_;
1021   bool queued_pending_lb_pick_ = false;
1022   const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
1023   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1024   std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*,
1025                      LoadBalancingPolicy::CallState*)>
1026       lb_recv_trailing_metadata_ready_;
1027   LbQueuedCallCanceller* lb_call_canceller_ = nullptr;
1028 
1029   RefCountedPtr<SubchannelCall> subchannel_call_;
1030 
1031   // For intercepting recv_trailing_metadata_ready for the LB policy.
1032   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
1033   grpc_closure recv_trailing_metadata_ready_;
1034   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
1035 
1036   // Batches are added to this list when received from above.
1037   // They are removed when we are done handling the batch (i.e., when
1038   // either we have invoked all of the batch's callbacks or we have
1039   // passed the batch down to the subchannel call and are not
1040   // intercepting any of its callbacks).
1041   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
1042 };
1043 
1044 //
1045 // dynamic termination filter
1046 //
1047 
1048 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
ChannelDataArgCopy(void * p)1049 void* ChannelDataArgCopy(void* p) { return p; }
ChannelDataArgDestroy(void * p)1050 void ChannelDataArgDestroy(void* p) {}
ChannelDataArgCmp(void * p,void * q)1051 int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1052 const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
1053     ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
1054 
1055 // Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
RetryThrottleDataArgCopy(void * p)1056 void* RetryThrottleDataArgCopy(void* p) {
1057   auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1058   retry_throttle_data->Ref().release();
1059   return p;
1060 }
RetryThrottleDataArgDestroy(void * p)1061 void RetryThrottleDataArgDestroy(void* p) {
1062   auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1063   retry_throttle_data->Unref();
1064 }
RetryThrottleDataArgCmp(void * p,void * q)1065 int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1066 const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
1067     RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
1068     RetryThrottleDataArgCmp};
1069 
1070 class DynamicTerminationFilterChannelData {
1071  public:
1072   static grpc_error* Init(grpc_channel_element* elem,
1073                           grpc_channel_element_args* args);
1074 
Destroy(grpc_channel_element * elem)1075   static void Destroy(grpc_channel_element* elem) {
1076     auto* chand =
1077         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1078     chand->~DynamicTerminationFilterChannelData();
1079   }
1080 
1081   // Will never be called.
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1082   static void StartTransportOp(grpc_channel_element* elem,
1083                                grpc_transport_op* op) {}
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1084   static void GetChannelInfo(grpc_channel_element* elem,
1085                              const grpc_channel_info* info) {}
1086 
chand() const1087   ChannelData* chand() const { return chand_; }
retry_throttle_data() const1088   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
1089     return retry_throttle_data_;
1090   }
1091 
1092  private:
GetRetryThrottleDataFromArgs(const grpc_channel_args * args)1093   static RefCountedPtr<ServerRetryThrottleData> GetRetryThrottleDataFromArgs(
1094       const grpc_channel_args* args) {
1095     auto* retry_throttle_data =
1096         grpc_channel_args_find_pointer<ServerRetryThrottleData>(
1097             args, GRPC_ARG_RETRY_THROTTLE_DATA);
1098     if (retry_throttle_data == nullptr) return nullptr;
1099     return retry_throttle_data->Ref();
1100   }
1101 
DynamicTerminationFilterChannelData(const grpc_channel_args * args)1102   explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
1103       : chand_(grpc_channel_args_find_pointer<ChannelData>(
1104             args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
1105         retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
1106 
1107   ChannelData* chand_;
1108   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
1109 };
1110 
1111 class DynamicTerminationFilterCallData {
1112  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)1113   static grpc_error* Init(grpc_call_element* elem,
1114                           const grpc_call_element_args* args) {
1115     new (elem->call_data) DynamicTerminationFilterCallData(*args);
1116     return GRPC_ERROR_NONE;
1117   }
1118 
Destroy(grpc_call_element * elem,const grpc_call_final_info * final_info,grpc_closure * then_schedule_closure)1119   static void Destroy(grpc_call_element* elem,
1120                       const grpc_call_final_info* final_info,
1121                       grpc_closure* then_schedule_closure) {
1122     auto* calld =
1123         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1124     auto* chand =
1125         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1126     RefCountedPtr<SubchannelCall> subchannel_call;
1127     if (chand->chand()->enable_retries()) {
1128       if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
1129         subchannel_call = calld->retrying_call_->subchannel_call();
1130         calld->retrying_call_->~RetryingCall();
1131       }
1132     } else {
1133       if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
1134         subchannel_call = calld->lb_call_->subchannel_call();
1135       }
1136     }
1137     calld->~DynamicTerminationFilterCallData();
1138     if (GPR_LIKELY(subchannel_call != nullptr)) {
1139       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
1140     } else {
1141       // TODO(yashkt) : This can potentially be a Closure::Run
1142       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1143     }
1144   }
1145 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1146   static void StartTransportStreamOpBatch(
1147       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1148     auto* calld =
1149         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1150     auto* chand =
1151         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1152     if (chand->chand()->enable_retries()) {
1153       calld->retrying_call_->StartTransportStreamOpBatch(batch);
1154     } else {
1155       calld->lb_call_->StartTransportStreamOpBatch(batch);
1156     }
1157   }
1158 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1159   static void SetPollent(grpc_call_element* elem,
1160                          grpc_polling_entity* pollent) {
1161     auto* calld =
1162         static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1163     auto* chand =
1164         static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1165     ChannelData* client_channel = chand->chand();
1166     grpc_call_element_args args = {
1167         calld->owning_call_,     nullptr,
1168         calld->call_context_,    calld->path_,
1169         calld->call_start_time_, calld->deadline_,
1170         calld->arena_,           calld->call_combiner_};
1171     if (client_channel->enable_retries()) {
1172       // Get retry settings from service config.
1173       auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1174           calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1175       GPR_ASSERT(svc_cfg_call_data != nullptr);
1176       auto* method_config = static_cast<const ClientChannelMethodParsedConfig*>(
1177           svc_cfg_call_data->GetMethodParsedConfig(
1178               ClientChannelServiceConfigParser::ParserIndex()));
1179       // Create retrying call.
1180       calld->retrying_call_ = calld->arena_->New<RetryingCall>(
1181           client_channel, args, pollent, chand->retry_throttle_data(),
1182           method_config == nullptr ? nullptr : method_config->retry_policy());
1183       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184         gpr_log(
1185             GPR_INFO,
1186             "chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
1187             client_channel, calld, calld->retrying_call_);
1188       }
1189     } else {
1190       calld->lb_call_ =
1191           LoadBalancedCall::Create(client_channel, args, pollent, 0);
1192       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1193         gpr_log(GPR_INFO,
1194                 "chand=%p dynamic_termination_calld=%p: create lb_call=%p",
1195                 chand, client_channel, calld->lb_call_.get());
1196       }
1197     }
1198   }
1199 
1200  private:
DynamicTerminationFilterCallData(const grpc_call_element_args & args)1201   explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
1202       : path_(grpc_slice_ref_internal(args.path)),
1203         call_start_time_(args.start_time),
1204         deadline_(args.deadline),
1205         arena_(args.arena),
1206         owning_call_(args.call_stack),
1207         call_combiner_(args.call_combiner),
1208         call_context_(args.context) {}
1209 
~DynamicTerminationFilterCallData()1210   ~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
1211 
1212   grpc_slice path_;  // Request path.
1213   gpr_cycle_counter call_start_time_;
1214   grpc_millis deadline_;
1215   Arena* arena_;
1216   grpc_call_stack* owning_call_;
1217   CallCombiner* call_combiner_;
1218   grpc_call_context_element* call_context_;
1219 
1220   RetryingCall* retrying_call_ = nullptr;
1221   RefCountedPtr<LoadBalancedCall> lb_call_;
1222 };
1223 
1224 const grpc_channel_filter kDynamicTerminationFilterVtable = {
1225     DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
1226     DynamicTerminationFilterChannelData::StartTransportOp,
1227     sizeof(DynamicTerminationFilterCallData),
1228     DynamicTerminationFilterCallData::Init,
1229     DynamicTerminationFilterCallData::SetPollent,
1230     DynamicTerminationFilterCallData::Destroy,
1231     sizeof(DynamicTerminationFilterChannelData),
1232     DynamicTerminationFilterChannelData::Init,
1233     DynamicTerminationFilterChannelData::Destroy,
1234     DynamicTerminationFilterChannelData::GetChannelInfo,
1235     "dynamic_filter_termination",
1236 };
1237 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1238 grpc_error* DynamicTerminationFilterChannelData::Init(
1239     grpc_channel_element* elem, grpc_channel_element_args* args) {
1240   GPR_ASSERT(args->is_last);
1241   GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
1242   new (elem->channel_data)
1243       DynamicTerminationFilterChannelData(args->channel_args);
1244   return GRPC_ERROR_NONE;
1245 }
1246 
1247 //
1248 // ChannelData::SubchannelWrapper
1249 //
1250 
1251 // This class is a wrapper for Subchannel that hides details of the
1252 // channel's implementation (such as the health check service name and
1253 // connected subchannel) from the LB policy API.
1254 //
1255 // Note that no synchronization is needed here, because even if the
1256 // underlying subchannel is shared between channels, this wrapper will only
1257 // be used within one channel, so it will always be synchronized by the
1258 // control plane work_serializer.
1259 class ChannelData::SubchannelWrapper : public SubchannelInterface {
1260  public:
SubchannelWrapper(ChannelData * chand,Subchannel * subchannel,absl::optional<std::string> health_check_service_name)1261   SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
1262                     absl::optional<std::string> health_check_service_name)
1263       : SubchannelInterface(
1264             GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
1265                 ? "SubchannelWrapper"
1266                 : nullptr),
1267         chand_(chand),
1268         subchannel_(subchannel),
1269         health_check_service_name_(std::move(health_check_service_name)) {
1270     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1271       gpr_log(GPR_INFO,
1272               "chand=%p: creating subchannel wrapper %p for subchannel %p",
1273               chand, this, subchannel_);
1274     }
1275     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
1276     auto* subchannel_node = subchannel_->channelz_node();
1277     if (subchannel_node != nullptr) {
1278       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1279       if (it == chand_->subchannel_refcount_map_.end()) {
1280         chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
1281         it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
1282       }
1283       ++it->second;
1284     }
1285     chand_->subchannel_wrappers_.insert(this);
1286   }
1287 
~SubchannelWrapper()1288   ~SubchannelWrapper() override {
1289     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1290       gpr_log(GPR_INFO,
1291               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
1292               chand_, this, subchannel_);
1293     }
1294     chand_->subchannel_wrappers_.erase(this);
1295     auto* subchannel_node = subchannel_->channelz_node();
1296     if (subchannel_node != nullptr) {
1297       auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1298       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
1299       --it->second;
1300       if (it->second == 0) {
1301         chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
1302         chand_->subchannel_refcount_map_.erase(it);
1303       }
1304     }
1305     GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1306     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
1307   }
1308 
CheckConnectivityState()1309   grpc_connectivity_state CheckConnectivityState() override {
1310     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
1311     grpc_connectivity_state connectivity_state =
1312         subchannel_->CheckConnectivityState(health_check_service_name_,
1313                                             &connected_subchannel);
1314     MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
1315     return connectivity_state;
1316   }
1317 
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1318   void WatchConnectivityState(
1319       grpc_connectivity_state initial_state,
1320       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
1321     auto& watcher_wrapper = watcher_map_[watcher.get()];
1322     GPR_ASSERT(watcher_wrapper == nullptr);
1323     watcher_wrapper = new WatcherWrapper(std::move(watcher),
1324                                          Ref(DEBUG_LOCATION, "WatcherWrapper"),
1325                                          initial_state);
1326     subchannel_->WatchConnectivityState(
1327         initial_state, health_check_service_name_,
1328         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1329             watcher_wrapper));
1330   }
1331 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1332   void CancelConnectivityStateWatch(
1333       ConnectivityStateWatcherInterface* watcher) override {
1334     auto it = watcher_map_.find(watcher);
1335     GPR_ASSERT(it != watcher_map_.end());
1336     subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1337                                               it->second);
1338     watcher_map_.erase(it);
1339   }
1340 
AttemptToConnect()1341   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1342 
ResetBackoff()1343   void ResetBackoff() override { subchannel_->ResetBackoff(); }
1344 
channel_args()1345   const grpc_channel_args* channel_args() override {
1346     return subchannel_->channel_args();
1347   }
1348 
ThrottleKeepaliveTime(int new_keepalive_time)1349   void ThrottleKeepaliveTime(int new_keepalive_time) {
1350     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
1351   }
1352 
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)1353   void UpdateHealthCheckServiceName(
1354       absl::optional<std::string> health_check_service_name) {
1355     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1356       gpr_log(GPR_INFO,
1357               "chand=%p: subchannel wrapper %p: updating health check service "
1358               "name from \"%s\" to \"%s\"",
1359               chand_, this, health_check_service_name_->c_str(),
1360               health_check_service_name->c_str());
1361     }
1362     for (auto& p : watcher_map_) {
1363       WatcherWrapper*& watcher_wrapper = p.second;
1364       // Cancel the current watcher and create a new one using the new
1365       // health check service name.
1366       // TODO(roth): If there is not already an existing health watch
1367       // call for the new name, then the watcher will initially report
1368       // state CONNECTING.  If the LB policy is currently reporting
1369       // state READY, this may cause it to switch to CONNECTING before
1370       // switching back to READY.  This could cause a small delay for
1371       // RPCs being started on the channel.  If/when this becomes a
1372       // problem, we may be able to handle it by waiting for the new
1373       // watcher to report READY before we use it to replace the old one.
1374       WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
1375       subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1376                                                 watcher_wrapper);
1377       watcher_wrapper = replacement;
1378       subchannel_->WatchConnectivityState(
1379           replacement->last_seen_state(), health_check_service_name,
1380           RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1381               replacement));
1382     }
1383     // Save the new health check service name.
1384     health_check_service_name_ = std::move(health_check_service_name);
1385   }
1386 
1387   // Caller must be holding the control-plane work_serializer.
connected_subchannel() const1388   ConnectedSubchannel* connected_subchannel() const {
1389     return connected_subchannel_.get();
1390   }
1391 
1392   // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const1393   ConnectedSubchannel* connected_subchannel_in_data_plane() const {
1394     return connected_subchannel_in_data_plane_.get();
1395   }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1396   void set_connected_subchannel_in_data_plane(
1397       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1398     connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
1399   }
1400 
1401  private:
1402   // Subchannel and SubchannelInterface have different interfaces for
1403   // their respective ConnectivityStateWatcherInterface classes.
1404   // The one in Subchannel updates the ConnectedSubchannel along with
1405   // the state, whereas the one in SubchannelInterface does not expose
1406   // the ConnectedSubchannel.
1407   //
1408   // This wrapper provides a bridge between the two.  It implements
1409   // Subchannel::ConnectivityStateWatcherInterface and wraps
1410   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
1411   // that was passed in by the LB policy.  We pass an instance of this
1412   // class to the underlying Subchannel, and when we get updates from
1413   // the subchannel, we pass those on to the wrapped watcher to return
1414   // the update to the LB policy.  This allows us to set the connected
1415   // subchannel before passing the result back to the LB policy.
1416   class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
1417    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)1418     WatcherWrapper(
1419         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1420             watcher,
1421         RefCountedPtr<SubchannelWrapper> parent,
1422         grpc_connectivity_state initial_state)
1423         : watcher_(std::move(watcher)),
1424           parent_(std::move(parent)),
1425           last_seen_state_(initial_state) {}
1426 
~WatcherWrapper()1427     ~WatcherWrapper() override {
1428       auto* parent = parent_.release();  // ref owned by lambda
1429       parent->chand_->work_serializer_->Run(
1430           [parent]() { parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); },
1431           DEBUG_LOCATION);
1432     }
1433 
OnConnectivityStateChange()1434     void OnConnectivityStateChange() override {
1435       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1436         gpr_log(GPR_INFO,
1437                 "chand=%p: connectivity change for subchannel wrapper %p "
1438                 "subchannel %p; hopping into work_serializer",
1439                 parent_->chand_, parent_.get(), parent_->subchannel_);
1440       }
1441       Ref().release();  // ref owned by lambda
1442       parent_->chand_->work_serializer_->Run(
1443           [this]() {
1444             ApplyUpdateInControlPlaneWorkSerializer();
1445             Unref();
1446           },
1447           DEBUG_LOCATION);
1448     }
1449 
interested_parties()1450     grpc_pollset_set* interested_parties() override {
1451       SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1452           watcher_.get();
1453       if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1454       return watcher->interested_parties();
1455     }
1456 
MakeReplacement()1457     WatcherWrapper* MakeReplacement() {
1458       auto* replacement =
1459           new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
1460       replacement_ = replacement;
1461       return replacement;
1462     }
1463 
last_seen_state() const1464     grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1465 
1466    private:
ApplyUpdateInControlPlaneWorkSerializer()1467     void ApplyUpdateInControlPlaneWorkSerializer() {
1468       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1469         gpr_log(GPR_INFO,
1470                 "chand=%p: processing connectivity change in work serializer "
1471                 "for subchannel wrapper %p subchannel %p "
1472                 "watcher=%p",
1473                 parent_->chand_, parent_.get(), parent_->subchannel_,
1474                 watcher_.get());
1475       }
1476       ConnectivityStateChange state_change = PopConnectivityStateChange();
1477       absl::optional<absl::Cord> keepalive_throttling =
1478           state_change.status.GetPayload(kKeepaliveThrottlingKey);
1479       if (keepalive_throttling.has_value()) {
1480         int new_keepalive_time = -1;
1481         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
1482                              &new_keepalive_time)) {
1483           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
1484             parent_->chand_->keepalive_time_ = new_keepalive_time;
1485             if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1486               gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
1487                       parent_->chand_, parent_->chand_->keepalive_time_);
1488             }
1489             // Propagate the new keepalive time to all subchannels. This is so
1490             // that new transports created by any subchannel (and not just the
1491             // subchannel that received the GOAWAY), use the new keepalive time.
1492             for (auto* subchannel_wrapper :
1493                  parent_->chand_->subchannel_wrappers_) {
1494               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
1495             }
1496           }
1497         } else {
1498           gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
1499                   parent_->chand_,
1500                   std::string(keepalive_throttling.value()).c_str());
1501         }
1502       }
1503       // Ignore update if the parent WatcherWrapper has been replaced
1504       // since this callback was scheduled.
1505       if (watcher_ != nullptr) {
1506         last_seen_state_ = state_change.state;
1507         parent_->MaybeUpdateConnectedSubchannel(
1508             std::move(state_change.connected_subchannel));
1509         watcher_->OnConnectivityStateChange(state_change.state);
1510       }
1511     }
1512 
1513     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1514         watcher_;
1515     RefCountedPtr<SubchannelWrapper> parent_;
1516     grpc_connectivity_state last_seen_state_;
1517     WatcherWrapper* replacement_ = nullptr;
1518   };
1519 
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1520   void MaybeUpdateConnectedSubchannel(
1521       RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1522     // Update the connected subchannel only if the channel is not shutting
1523     // down.  This is because once the channel is shutting down, we
1524     // ignore picker updates from the LB policy, which means that
1525     // UpdateStateAndPickerLocked() will never process the entries
1526     // in chand_->pending_subchannel_updates_.  So we don't want to add
1527     // entries there that will never be processed, since that would
1528     // leave dangling refs to the channel and prevent its destruction.
1529     grpc_error* disconnect_error = chand_->disconnect_error();
1530     if (disconnect_error != GRPC_ERROR_NONE) return;
1531     // Not shutting down, so do the update.
1532     if (connected_subchannel_ != connected_subchannel) {
1533       connected_subchannel_ = std::move(connected_subchannel);
1534       // Record the new connected subchannel so that it can be updated
1535       // in the data plane mutex the next time the picker is updated.
1536       chand_->pending_subchannel_updates_[Ref(
1537           DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1538     }
1539   }
1540 
1541   ChannelData* chand_;
1542   Subchannel* subchannel_;
1543   absl::optional<std::string> health_check_service_name_;
1544   // Maps from the address of the watcher passed to us by the LB policy
1545   // to the address of the WrapperWatcher that we passed to the underlying
1546   // subchannel.  This is needed so that when the LB policy calls
1547   // CancelConnectivityStateWatch() with its watcher, we know the
1548   // corresponding WrapperWatcher to cancel on the underlying subchannel.
1549   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1550   // To be accessed only in the control plane work_serializer.
1551   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1552   // To be accessed only in the data plane mutex.
1553   RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1554 };
1555 
1556 //
1557 // ChannelData::ExternalConnectivityWatcher
1558 //
1559 
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)1560 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1561     ChannelData* chand, grpc_polling_entity pollent,
1562     grpc_connectivity_state* state, grpc_closure* on_complete,
1563     grpc_closure* watcher_timer_init)
1564     : chand_(chand),
1565       pollent_(pollent),
1566       initial_state_(*state),
1567       state_(state),
1568       on_complete_(on_complete),
1569       watcher_timer_init_(watcher_timer_init) {
1570   grpc_polling_entity_add_to_pollset_set(&pollent_,
1571                                          chand_->interested_parties_);
1572   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1573   {
1574     MutexLock lock(&chand_->external_watchers_mu_);
1575     // Will be deleted when the watch is complete.
1576     GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
1577     // Store a ref to the watcher in the external_watchers_ map.
1578     chand->external_watchers_[on_complete] =
1579         Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
1580   }
1581   // Pass the ref from creating the object to Start().
1582   chand_->work_serializer_->Run(
1583       [this]() {
1584         // The ref is passed to AddWatcherLocked().
1585         AddWatcherLocked();
1586       },
1587       DEBUG_LOCATION);
1588 }
1589 
~ExternalConnectivityWatcher()1590 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1591   grpc_polling_entity_del_from_pollset_set(&pollent_,
1592                                            chand_->interested_parties_);
1593   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1594                            "ExternalConnectivityWatcher");
1595 }
1596 
1597 void ChannelData::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ChannelData * chand,grpc_closure * on_complete,bool cancel)1598     RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
1599                                          grpc_closure* on_complete,
1600                                          bool cancel) {
1601   RefCountedPtr<ExternalConnectivityWatcher> watcher;
1602   {
1603     MutexLock lock(&chand->external_watchers_mu_);
1604     auto it = chand->external_watchers_.find(on_complete);
1605     if (it != chand->external_watchers_.end()) {
1606       watcher = std::move(it->second);
1607       chand->external_watchers_.erase(it);
1608     }
1609   }
1610   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
1611   // the mutex before calling it.
1612   if (watcher != nullptr && cancel) watcher->Cancel();
1613 }
1614 
Notify(grpc_connectivity_state state,const absl::Status &)1615 void ChannelData::ExternalConnectivityWatcher::Notify(
1616     grpc_connectivity_state state, const absl::Status& /* status */) {
1617   bool done = false;
1618   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1619                                    MemoryOrder::RELAXED)) {
1620     return;  // Already done.
1621   }
1622   // Remove external watcher.
1623   chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
1624   // Report new state to the user.
1625   *state_ = state;
1626   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
1627   // Hop back into the work_serializer to clean up.
1628   // Not needed in state SHUTDOWN, because the tracker will
1629   // automatically remove all watchers in that case.
1630   if (state != GRPC_CHANNEL_SHUTDOWN) {
1631     chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1632                                   DEBUG_LOCATION);
1633   }
1634 }
1635 
Cancel()1636 void ChannelData::ExternalConnectivityWatcher::Cancel() {
1637   bool done = false;
1638   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1639                                    MemoryOrder::RELAXED)) {
1640     return;  // Already done.
1641   }
1642   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
1643   // Hop back into the work_serializer to clean up.
1644   chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1645                                 DEBUG_LOCATION);
1646 }
1647 
AddWatcherLocked()1648 void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
1649   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
1650   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1651   chand_->state_tracker_.AddWatcher(
1652       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1653 }
1654 
RemoveWatcherLocked()1655 void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1656   chand_->state_tracker_.RemoveWatcher(this);
1657 }
1658 
1659 //
1660 // ChannelData::ConnectivityWatcherAdder
1661 //
1662 
1663 class ChannelData::ConnectivityWatcherAdder {
1664  public:
ConnectivityWatcherAdder(ChannelData * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1665   ConnectivityWatcherAdder(
1666       ChannelData* chand, grpc_connectivity_state initial_state,
1667       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1668       : chand_(chand),
1669         initial_state_(initial_state),
1670         watcher_(std::move(watcher)) {
1671     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1672     chand_->work_serializer_->Run([this]() { AddWatcherLocked(); },
1673                                   DEBUG_LOCATION);
1674   }
1675 
1676  private:
AddWatcherLocked()1677   void AddWatcherLocked() {
1678     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1679     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1680     delete this;
1681   }
1682 
1683   ChannelData* chand_;
1684   grpc_connectivity_state initial_state_;
1685   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1686 };
1687 
1688 //
1689 // ChannelData::ConnectivityWatcherRemover
1690 //
1691 
1692 class ChannelData::ConnectivityWatcherRemover {
1693  public:
ConnectivityWatcherRemover(ChannelData * chand,AsyncConnectivityStateWatcherInterface * watcher)1694   ConnectivityWatcherRemover(ChannelData* chand,
1695                              AsyncConnectivityStateWatcherInterface* watcher)
1696       : chand_(chand), watcher_(watcher) {
1697     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1698     chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1699                                   DEBUG_LOCATION);
1700   }
1701 
1702  private:
RemoveWatcherLocked()1703   void RemoveWatcherLocked() {
1704     chand_->state_tracker_.RemoveWatcher(watcher_);
1705     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1706                              "ConnectivityWatcherRemover");
1707     delete this;
1708   }
1709 
1710   ChannelData* chand_;
1711   AsyncConnectivityStateWatcherInterface* watcher_;
1712 };
1713 
1714 //
1715 // ChannelData::ClientChannelControlHelper
1716 //
1717 
1718 class ChannelData::ClientChannelControlHelper
1719     : public LoadBalancingPolicy::ChannelControlHelper {
1720  public:
ClientChannelControlHelper(ChannelData * chand)1721   explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1722     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1723   }
1724 
~ClientChannelControlHelper()1725   ~ClientChannelControlHelper() override {
1726     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1727                              "ClientChannelControlHelper");
1728   }
1729 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)1730   RefCountedPtr<SubchannelInterface> CreateSubchannel(
1731       ServerAddress address, const grpc_channel_args& args) override {
1732     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
1733     // Determine health check service name.
1734     bool inhibit_health_checking = grpc_channel_arg_get_bool(
1735         grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1736     absl::optional<std::string> health_check_service_name;
1737     if (!inhibit_health_checking) {
1738       health_check_service_name = chand_->health_check_service_name_;
1739     }
1740     // Remove channel args that should not affect subchannel uniqueness.
1741     static const char* args_to_remove[] = {
1742         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1743         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1744     };
1745     // Add channel args needed for the subchannel.
1746     absl::InlinedVector<grpc_arg, 3> args_to_add = {
1747         Subchannel::CreateSubchannelAddressArg(&address.address()),
1748         SubchannelPoolInterface::CreateChannelArg(
1749             chand_->subchannel_pool_.get()),
1750     };
1751     if (address.args() != nullptr) {
1752       for (size_t j = 0; j < address.args()->num_args; ++j) {
1753         args_to_add.emplace_back(address.args()->args[j]);
1754       }
1755     }
1756     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1757         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
1758         args_to_add.data(), args_to_add.size());
1759     gpr_free(args_to_add[0].value.string);
1760     // Create subchannel.
1761     Subchannel* subchannel =
1762         chand_->client_channel_factory_->CreateSubchannel(new_args);
1763     grpc_channel_args_destroy(new_args);
1764     if (subchannel == nullptr) return nullptr;
1765     // Make sure the subchannel has updated keepalive time.
1766     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1767     // Create and return wrapper for the subchannel.
1768     return MakeRefCounted<SubchannelWrapper>(
1769         chand_, subchannel, std::move(health_check_service_name));
1770   }
1771 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1772   void UpdateState(
1773       grpc_connectivity_state state, const absl::Status& status,
1774       std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1775     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1776     grpc_error* disconnect_error = chand_->disconnect_error();
1777     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1778       const char* extra = disconnect_error == GRPC_ERROR_NONE
1779                               ? ""
1780                               : " (ignoring -- channel shutting down)";
1781       gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1782               chand_, ConnectivityStateName(state), status.ToString().c_str(),
1783               picker.get(), extra);
1784     }
1785     // Do update only if not shutting down.
1786     if (disconnect_error == GRPC_ERROR_NONE) {
1787       chand_->UpdateStateAndPickerLocked(state, status, "helper",
1788                                          std::move(picker));
1789     }
1790   }
1791 
RequestReresolution()1792   void RequestReresolution() override {
1793     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1794     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1795       gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1796     }
1797     chand_->resolver_->RequestReresolutionLocked();
1798   }
1799 
AddTraceEvent(TraceSeverity severity,absl::string_view message)1800   void AddTraceEvent(TraceSeverity severity,
1801                      absl::string_view message) override {
1802     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1803     if (chand_->channelz_node_ != nullptr) {
1804       chand_->channelz_node_->AddTraceEvent(
1805           ConvertSeverityEnum(severity),
1806           grpc_slice_from_copied_buffer(message.data(), message.size()));
1807     }
1808   }
1809 
1810  private:
ConvertSeverityEnum(TraceSeverity severity)1811   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1812       TraceSeverity severity) {
1813     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1814     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1815     return channelz::ChannelTrace::Error;
1816   }
1817 
1818   ChannelData* chand_;
1819 };
1820 
1821 //
1822 // ChannelData implementation
1823 //
1824 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1825 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1826                               grpc_channel_element_args* args) {
1827   GPR_ASSERT(args->is_last);
1828   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1829   grpc_error* error = GRPC_ERROR_NONE;
1830   new (elem->channel_data) ChannelData(args, &error);
1831   return error;
1832 }
1833 
Destroy(grpc_channel_element * elem)1834 void ChannelData::Destroy(grpc_channel_element* elem) {
1835   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1836   chand->~ChannelData();
1837 }
1838 
GetEnableRetries(const grpc_channel_args * args)1839 bool GetEnableRetries(const grpc_channel_args* args) {
1840   return grpc_channel_arg_get_bool(
1841       grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1842 }
1843 
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1844 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1845   return static_cast<size_t>(grpc_channel_arg_get_integer(
1846       grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1847       {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1848 }
1849 
GetSubchannelPool(const grpc_channel_args * args)1850 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1851     const grpc_channel_args* args) {
1852   const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1853       grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1854   if (use_local_subchannel_pool) {
1855     return MakeRefCounted<LocalSubchannelPool>();
1856   }
1857   return GlobalSubchannelPool::instance();
1858 }
1859 
GetChannelzNode(const grpc_channel_args * args)1860 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1861   const grpc_arg* arg =
1862       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1863   if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1864     return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1865   }
1866   return nullptr;
1867 }
1868 
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1869 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1870     : deadline_checking_enabled_(
1871           grpc_deadline_checking_enabled(args->channel_args)),
1872       enable_retries_(GetEnableRetries(args->channel_args)),
1873       per_rpc_retry_buffer_size_(
1874           GetMaxPerRpcRetryBufferSize(args->channel_args)),
1875       owning_stack_(args->channel_stack),
1876       client_channel_factory_(
1877           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1878       channelz_node_(GetChannelzNode(args->channel_args)),
1879       work_serializer_(std::make_shared<WorkSerializer>()),
1880       interested_parties_(grpc_pollset_set_create()),
1881       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1882       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1883       disconnect_error_(GRPC_ERROR_NONE) {
1884   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1885     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1886             this, owning_stack_);
1887   }
1888   // Initialize data members.
1889   gpr_mu_init(&info_mu_);
1890   // Start backup polling.
1891   grpc_client_channel_start_backup_polling(interested_parties_);
1892   // Check client channel factory.
1893   if (client_channel_factory_ == nullptr) {
1894     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1895         "Missing client channel factory in args for client channel filter");
1896     return;
1897   }
1898   // Get server name to resolve, using proxy mapper if needed.
1899   const char* server_uri = grpc_channel_arg_get_string(
1900       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1901   if (server_uri == nullptr) {
1902     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1903         "server URI channel arg missing or wrong type in client channel "
1904         "filter");
1905     return;
1906   }
1907   // Get default service config.  If none is specified via the client API,
1908   // we use an empty config.
1909   const char* service_config_json = grpc_channel_arg_get_string(
1910       grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1911   if (service_config_json == nullptr) service_config_json = "{}";
1912   *error = GRPC_ERROR_NONE;
1913   default_service_config_ =
1914       ServiceConfig::Create(args->channel_args, service_config_json, error);
1915   if (*error != GRPC_ERROR_NONE) {
1916     default_service_config_.reset();
1917     return;
1918   }
1919   absl::StatusOr<URI> uri = URI::Parse(server_uri);
1920   if (uri.ok() && !uri->path().empty()) {
1921     server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1922   }
1923   char* proxy_name = nullptr;
1924   grpc_channel_args* new_args = nullptr;
1925   ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1926                                &new_args);
1927   target_uri_.reset(proxy_name != nullptr ? proxy_name
1928                                           : gpr_strdup(server_uri));
1929   // Strip out service config channel arg, so that it doesn't affect
1930   // subchannel uniqueness when the args flow down to that layer.
1931   const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1932   channel_args_ = grpc_channel_args_copy_and_remove(
1933       new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1934   grpc_channel_args_destroy(new_args);
1935   keepalive_time_ = grpc_channel_args_find_integer(
1936       channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1937       {-1 /* default value, unset */, 1, INT_MAX});
1938   if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1939     std::string error_message =
1940         absl::StrCat("the target uri is not valid: ", target_uri_.get());
1941     *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1942     return;
1943   }
1944   *error = GRPC_ERROR_NONE;
1945 }
1946 
~ChannelData()1947 ChannelData::~ChannelData() {
1948   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1949     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1950   }
1951   DestroyResolverAndLbPolicyLocked();
1952   grpc_channel_args_destroy(channel_args_);
1953   GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1954   // Stop backup polling.
1955   grpc_client_channel_stop_backup_polling(interested_parties_);
1956   grpc_pollset_set_destroy(interested_parties_);
1957   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1958   gpr_mu_destroy(&info_mu_);
1959 }
1960 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1961 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1962     const Resolver::Result& resolver_result,
1963     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1964   // Prefer the LB policy config found in the service config.
1965   if (parsed_service_config->parsed_lb_config() != nullptr) {
1966     return parsed_service_config->parsed_lb_config();
1967   }
1968   // Try the deprecated LB policy name from the service config.
1969   // If not, try the setting from channel args.
1970   const char* policy_name = nullptr;
1971   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1972     policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1973   } else {
1974     const grpc_arg* channel_arg =
1975         grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1976     policy_name = grpc_channel_arg_get_string(channel_arg);
1977   }
1978   // Use pick_first if nothing was specified and we didn't select grpclb
1979   // above.
1980   if (policy_name == nullptr) policy_name = "pick_first";
1981   // Now that we have the policy name, construct an empty config for it.
1982   Json config_json = Json::Array{Json::Object{
1983       {policy_name, Json::Object{}},
1984   }};
1985   grpc_error* parse_error = GRPC_ERROR_NONE;
1986   auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1987       config_json, &parse_error);
1988   // The policy name came from one of three places:
1989   // - The deprecated loadBalancingPolicy field in the service config,
1990   //   in which case the code in ClientChannelServiceConfigParser
1991   //   already verified that the policy does not require a config.
1992   // - One of the hard-coded values here, all of which are known to not
1993   //   require a config.
1994   // - A channel arg, in which case the application did something that
1995   //   is a misuse of our API.
1996   // In the first two cases, these assertions will always be true.  In
1997   // the last case, this is probably fine for now.
1998   // TODO(roth): If the last case becomes a problem, add better error
1999   // handling here.
2000   GPR_ASSERT(lb_policy_config != nullptr);
2001   GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
2002   return lb_policy_config;
2003 }
2004 
OnResolverResultChangedLocked(Resolver::Result result)2005 void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) {
2006   // Handle race conditions.
2007   if (resolver_ == nullptr) return;
2008   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2009     gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
2010   }
2011   // We only want to trace the address resolution in the follow cases:
2012   // (a) Address resolution resulted in service config change.
2013   // (b) Address resolution that causes number of backends to go from
2014   //     zero to non-zero.
2015   // (c) Address resolution that causes number of backends to go from
2016   //     non-zero to zero.
2017   // (d) Address resolution that causes a new LB policy to be created.
2018   //
2019   // We track a list of strings to eventually be concatenated and traced.
2020   absl::InlinedVector<const char*, 3> trace_strings;
2021   if (result.addresses.empty() && previous_resolution_contained_addresses_) {
2022     trace_strings.push_back("Address list became empty");
2023   } else if (!result.addresses.empty() &&
2024              !previous_resolution_contained_addresses_) {
2025     trace_strings.push_back("Address list became non-empty");
2026   }
2027   previous_resolution_contained_addresses_ = !result.addresses.empty();
2028   // The result of grpc_error_string() is owned by the error itself.
2029   // We're storing that string in trace_strings, so we need to make sure
2030   // that the error lives until we're done with the string.
2031   grpc_error* service_config_error =
2032       GRPC_ERROR_REF(result.service_config_error);
2033   if (service_config_error != GRPC_ERROR_NONE) {
2034     trace_strings.push_back(grpc_error_string(service_config_error));
2035   }
2036   // Choose the service config.
2037   RefCountedPtr<ServiceConfig> service_config;
2038   RefCountedPtr<ConfigSelector> config_selector;
2039   if (service_config_error != GRPC_ERROR_NONE) {
2040     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2041       gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
2042               this, grpc_error_string(service_config_error));
2043     }
2044     // If the service config was invalid, then fallback to the
2045     // previously returned service config.
2046     if (saved_service_config_ != nullptr) {
2047       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2048         gpr_log(GPR_INFO,
2049                 "chand=%p: resolver returned invalid service config. "
2050                 "Continuing to use previous service config.",
2051                 this);
2052       }
2053       service_config = saved_service_config_;
2054       config_selector = saved_config_selector_;
2055     } else {
2056       // We received an invalid service config and we don't have a
2057       // previous service config to fall back to.  Put the channel into
2058       // TRANSIENT_FAILURE.
2059       OnResolverErrorLocked(GRPC_ERROR_REF(service_config_error));
2060       trace_strings.push_back("no valid service config");
2061     }
2062   } else if (result.service_config == nullptr) {
2063     // Resolver did not return any service config.
2064     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2065       gpr_log(GPR_INFO,
2066               "chand=%p: resolver returned no service config. Using default "
2067               "service config for channel.",
2068               this);
2069     }
2070     service_config = default_service_config_;
2071   } else {
2072     // Use ServiceConfig and ConfigSelector returned by resolver.
2073     service_config = result.service_config;
2074     config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
2075   }
2076   if (service_config != nullptr) {
2077     // Extract global config for client channel.
2078     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2079         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2080             service_config->GetGlobalParsedConfig(
2081                 internal::ClientChannelServiceConfigParser::ParserIndex()));
2082     // Choose LB policy config.
2083     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
2084         ChooseLbPolicy(result, parsed_service_config);
2085     // Check if the ServiceConfig has changed.
2086     const bool service_config_changed =
2087         saved_service_config_ == nullptr ||
2088         service_config->json_string() != saved_service_config_->json_string();
2089     // Check if the ConfigSelector has changed.
2090     const bool config_selector_changed = !ConfigSelector::Equals(
2091         saved_config_selector_.get(), config_selector.get());
2092     // If either has changed, apply the global parameters now.
2093     if (service_config_changed || config_selector_changed) {
2094       // Update service config in control plane.
2095       UpdateServiceConfigInControlPlaneLocked(
2096           std::move(service_config), std::move(config_selector),
2097           parsed_service_config, lb_policy_config->name());
2098     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2099       gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
2100     }
2101     // Create or update LB policy, as needed.
2102     CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
2103                                  std::move(result));
2104     if (service_config_changed || config_selector_changed) {
2105       // Start using new service config for calls.
2106       // This needs to happen after the LB policy has been updated, since
2107       // the ConfigSelector may need the LB policy to know about new
2108       // destinations before it can send RPCs to those destinations.
2109       UpdateServiceConfigInDataPlaneLocked();
2110       // TODO(ncteisen): might be worth somehow including a snippet of the
2111       // config in the trace, at the risk of bloating the trace logs.
2112       trace_strings.push_back("Service config changed");
2113     }
2114   }
2115   // Add channel trace event.
2116   if (!trace_strings.empty()) {
2117     std::string message =
2118         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
2119     if (channelz_node_ != nullptr) {
2120       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
2121                                     grpc_slice_from_cpp_string(message));
2122     }
2123   }
2124   GRPC_ERROR_UNREF(service_config_error);
2125 }
2126 
OnResolverErrorLocked(grpc_error * error)2127 void ChannelData::OnResolverErrorLocked(grpc_error* error) {
2128   if (resolver_ == nullptr) {
2129     GRPC_ERROR_UNREF(error);
2130     return;
2131   }
2132   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2133     gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
2134             grpc_error_string(error));
2135   }
2136   // If we already have an LB policy from a previous resolution
2137   // result, then we continue to let it set the connectivity state.
2138   // Otherwise, we go into TRANSIENT_FAILURE.
2139   if (lb_policy_ == nullptr) {
2140     grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2141         "Resolver transient failure", &error, 1);
2142     {
2143       MutexLock lock(&resolution_mu_);
2144       // Update resolver transient failure.
2145       GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2146       resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
2147       // Process calls that were queued waiting for the resolver result.
2148       for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2149            call = call->next) {
2150         grpc_call_element* elem = call->elem;
2151         CallData* calld = static_cast<CallData*>(elem->call_data);
2152         grpc_error* error = GRPC_ERROR_NONE;
2153         if (calld->CheckResolutionLocked(elem, &error)) {
2154           calld->AsyncResolutionDone(elem, error);
2155         }
2156       }
2157     }
2158     // Update connectivity state.
2159     UpdateStateAndPickerLocked(
2160         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
2161         "resolver failure",
2162         absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2163             state_error));
2164   }
2165   GRPC_ERROR_UNREF(error);
2166 }
2167 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)2168 void ChannelData::CreateOrUpdateLbPolicyLocked(
2169     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
2170     Resolver::Result result) {
2171   // Construct update.
2172   LoadBalancingPolicy::UpdateArgs update_args;
2173   update_args.addresses = std::move(result.addresses);
2174   update_args.config = std::move(lb_policy_config);
2175   // Remove the config selector from channel args so that we're not holding
2176   // unnecessary refs that cause it to be destroyed somewhere other than in the
2177   // WorkSerializer.
2178   const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
2179   update_args.args =
2180       grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
2181   // Create policy if needed.
2182   if (lb_policy_ == nullptr) {
2183     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
2184   }
2185   // Update the policy.
2186   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2187     gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
2188             lb_policy_.get());
2189   }
2190   lb_policy_->UpdateLocked(std::move(update_args));
2191 }
2192 
2193 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)2194 OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked(
2195     const grpc_channel_args& args) {
2196   LoadBalancingPolicy::Args lb_policy_args;
2197   lb_policy_args.work_serializer = work_serializer_;
2198   lb_policy_args.channel_control_helper =
2199       absl::make_unique<ClientChannelControlHelper>(this);
2200   lb_policy_args.args = &args;
2201   OrphanablePtr<LoadBalancingPolicy> lb_policy =
2202       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
2203                                          &grpc_client_channel_routing_trace);
2204   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2205     gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
2206             lb_policy.get());
2207   }
2208   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2209                                    interested_parties_);
2210   return lb_policy;
2211 }
2212 
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)2213 void ChannelData::AddResolverQueuedCall(ResolverQueuedCall* call,
2214                                         grpc_polling_entity* pollent) {
2215   // Add call to queued calls list.
2216   call->next = resolver_queued_calls_;
2217   resolver_queued_calls_ = call;
2218   // Add call's pollent to channel's interested_parties, so that I/O
2219   // can be done under the call's CQ.
2220   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2221 }
2222 
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)2223 void ChannelData::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
2224                                            grpc_polling_entity* pollent) {
2225   // Remove call's pollent from channel's interested_parties.
2226   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2227   // Remove from queued calls list.
2228   for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
2229        call = &(*call)->next) {
2230     if (*call == to_remove) {
2231       *call = to_remove->next;
2232       return;
2233     }
2234   }
2235 }
2236 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)2237 void ChannelData::UpdateServiceConfigInControlPlaneLocked(
2238     RefCountedPtr<ServiceConfig> service_config,
2239     RefCountedPtr<ConfigSelector> config_selector,
2240     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
2241     const char* lb_policy_name) {
2242   UniquePtr<char> service_config_json(
2243       gpr_strdup(service_config->json_string().c_str()));
2244   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2245     gpr_log(GPR_INFO,
2246             "chand=%p: resolver returned updated service config: \"%s\"", this,
2247             service_config_json.get());
2248   }
2249   // Save service config.
2250   saved_service_config_ = std::move(service_config);
2251   // Update health check service name if needed.
2252   if (health_check_service_name_ !=
2253       parsed_service_config->health_check_service_name()) {
2254     health_check_service_name_ =
2255         parsed_service_config->health_check_service_name();
2256     // Update health check service name used by existing subchannel wrappers.
2257     for (auto* subchannel_wrapper : subchannel_wrappers_) {
2258       subchannel_wrapper->UpdateHealthCheckServiceName(
2259           health_check_service_name_);
2260     }
2261   }
2262   // Swap out the data used by GetChannelInfo().
2263   UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
2264   {
2265     MutexLock lock(&info_mu_);
2266     info_lb_policy_name_ = std::move(lb_policy_name_owned);
2267     info_service_config_json_ = std::move(service_config_json);
2268   }
2269   // Save config selector.
2270   saved_config_selector_ = std::move(config_selector);
2271   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2272     gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
2273             saved_config_selector_.get());
2274   }
2275 }
2276 
UpdateServiceConfigInDataPlaneLocked()2277 void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
2278   // Grab ref to service config.
2279   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
2280   // Grab ref to config selector.  Use default if resolver didn't supply one.
2281   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
2282   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2283     gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
2284             saved_config_selector_.get());
2285   }
2286   if (config_selector == nullptr) {
2287     config_selector =
2288         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
2289   }
2290   // Get retry throttle data from service config.
2291   const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2292       static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2293           saved_service_config_->GetGlobalParsedConfig(
2294               internal::ClientChannelServiceConfigParser::ParserIndex()));
2295   absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
2296       retry_throttle_config = parsed_service_config->retry_throttling();
2297   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
2298   if (retry_throttle_config.has_value()) {
2299     retry_throttle_data = internal::ServerRetryThrottleMap::GetDataForServer(
2300         server_name_, retry_throttle_config.value().max_milli_tokens,
2301         retry_throttle_config.value().milli_token_ratio);
2302   }
2303   // Construct per-LB filter stack.
2304   std::vector<const grpc_channel_filter*> filters =
2305       config_selector->GetFilters();
2306   filters.push_back(&kDynamicTerminationFilterVtable);
2307   absl::InlinedVector<grpc_arg, 2> args_to_add;
2308   args_to_add.push_back(grpc_channel_arg_pointer_create(
2309       const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
2310       &kChannelDataArgPointerVtable));
2311   if (retry_throttle_data != nullptr) {
2312     args_to_add.push_back(grpc_channel_arg_pointer_create(
2313         const_cast<char*>(GRPC_ARG_RETRY_THROTTLE_DATA),
2314         retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
2315   }
2316   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
2317       channel_args_, args_to_add.data(), args_to_add.size());
2318   RefCountedPtr<DynamicFilters> dynamic_filters =
2319       DynamicFilters::Create(new_args, std::move(filters));
2320   GPR_ASSERT(dynamic_filters != nullptr);
2321   grpc_channel_args_destroy(new_args);
2322   // Grab data plane lock to update service config.
2323   //
2324   // We defer unreffing the old values (and deallocating memory) until
2325   // after releasing the lock to keep the critical section small.
2326   std::set<grpc_call_element*> calls_pending_resolver_result;
2327   {
2328     MutexLock lock(&resolution_mu_);
2329     GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2330     resolver_transient_failure_error_ = GRPC_ERROR_NONE;
2331     // Update service config.
2332     received_service_config_data_ = true;
2333     // Old values will be unreffed after lock is released.
2334     service_config_.swap(service_config);
2335     config_selector_.swap(config_selector);
2336     dynamic_filters_.swap(dynamic_filters);
2337     // Process calls that were queued waiting for the resolver result.
2338     for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2339          call = call->next) {
2340       grpc_call_element* elem = call->elem;
2341       CallData* calld = static_cast<CallData*>(elem->call_data);
2342       grpc_error* error = GRPC_ERROR_NONE;
2343       if (calld->CheckResolutionLocked(elem, &error)) {
2344         calld->AsyncResolutionDone(elem, error);
2345       }
2346     }
2347   }
2348   // Old values will be unreffed after lock is released when they go out
2349   // of scope.
2350 }
2351 
CreateResolverLocked()2352 void ChannelData::CreateResolverLocked() {
2353   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2354     gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
2355   }
2356   resolver_ = ResolverRegistry::CreateResolver(
2357       target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
2358       absl::make_unique<ResolverResultHandler>(this));
2359   // Since the validity of the args was checked when the channel was created,
2360   // CreateResolver() must return a non-null result.
2361   GPR_ASSERT(resolver_ != nullptr);
2362   UpdateStateAndPickerLocked(
2363       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
2364       absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
2365   resolver_->StartLocked();
2366   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2367     gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
2368   }
2369 }
2370 
DestroyResolverAndLbPolicyLocked()2371 void ChannelData::DestroyResolverAndLbPolicyLocked() {
2372   if (resolver_ != nullptr) {
2373     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2374       gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
2375               resolver_.get());
2376     }
2377     resolver_.reset();
2378     if (lb_policy_ != nullptr) {
2379       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2380         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
2381                 lb_policy_.get());
2382       }
2383       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
2384                                        interested_parties_);
2385       lb_policy_.reset();
2386     }
2387   }
2388 }
2389 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)2390 void ChannelData::UpdateStateAndPickerLocked(
2391     grpc_connectivity_state state, const absl::Status& status,
2392     const char* reason,
2393     std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
2394   // Clean the control plane when entering IDLE.
2395   if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2396     saved_service_config_.reset();
2397     saved_config_selector_.reset();
2398   }
2399   // Update connectivity state.
2400   state_tracker_.SetState(state, status, reason);
2401   if (channelz_node_ != nullptr) {
2402     channelz_node_->SetConnectivityState(state);
2403     channelz_node_->AddTraceEvent(
2404         channelz::ChannelTrace::Severity::Info,
2405         grpc_slice_from_static_string(
2406             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
2407                 state)));
2408   }
2409   // Grab data plane lock to do subchannel updates and update the picker.
2410   //
2411   // Note that we want to minimize the work done while holding the data
2412   // plane lock, to keep the critical section small.  So, for all of the
2413   // objects that we might wind up unreffing here, we actually hold onto
2414   // the refs until after we release the lock, and then unref them at
2415   // that point.  This includes the following:
2416   // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
2417   // - ref stored in service_config_
2418   // - ref stored in config_selector_
2419   // - ref stored in dynamic_filters_
2420   // - ownership of the existing picker in picker_
2421   RefCountedPtr<ServiceConfig> service_config_to_unref;
2422   RefCountedPtr<ConfigSelector> config_selector_to_unref;
2423   RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
2424   {
2425     MutexLock lock(&data_plane_mu_);
2426     // Handle subchannel updates.
2427     for (auto& p : pending_subchannel_updates_) {
2428       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2429         gpr_log(GPR_INFO,
2430                 "chand=%p: updating subchannel wrapper %p data plane "
2431                 "connected_subchannel to %p",
2432                 this, p.first.get(), p.second.get());
2433       }
2434       // Note: We do not remove the entry from pending_subchannel_updates_
2435       // here, since this would unref the subchannel wrapper; instead,
2436       // we wait until we've released the lock to clear the map.
2437       p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
2438     }
2439     // Swap out the picker.
2440     // Note: Original value will be destroyed after the lock is released.
2441     picker_.swap(picker);
2442     // Clean the data plane if the updated picker is nullptr.
2443     if (picker_ == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2444       received_service_config_data_ = false;
2445       // Note: We save the objects to unref until after the lock is released.
2446       service_config_to_unref = std::move(service_config_);
2447       config_selector_to_unref = std::move(config_selector_);
2448       dynamic_filters_to_unref = std::move(dynamic_filters_);
2449     }
2450     // Re-process queued picks.
2451     for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
2452          call = call->next) {
2453       grpc_error* error = GRPC_ERROR_NONE;
2454       if (call->lb_call->PickSubchannelLocked(&error)) {
2455         call->lb_call->AsyncPickDone(error);
2456       }
2457     }
2458   }
2459   // Clear the pending update map after releasing the lock, to keep the
2460   // critical section small.
2461   pending_subchannel_updates_.clear();
2462 }
2463 
DoPingLocked(grpc_transport_op * op)2464 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
2465   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
2466     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
2467   }
2468   LoadBalancingPolicy::PickResult result =
2469       picker_->Pick(LoadBalancingPolicy::PickArgs());
2470   ConnectedSubchannel* connected_subchannel = nullptr;
2471   if (result.subchannel != nullptr) {
2472     SubchannelWrapper* subchannel =
2473         static_cast<SubchannelWrapper*>(result.subchannel.get());
2474     connected_subchannel = subchannel->connected_subchannel();
2475   }
2476   if (connected_subchannel != nullptr) {
2477     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
2478   } else {
2479     if (result.error == GRPC_ERROR_NONE) {
2480       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2481           "LB policy dropped call on ping");
2482     }
2483   }
2484   return result.error;
2485 }
2486 
StartTransportOpLocked(grpc_transport_op * op)2487 void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
2488   // Connectivity watch.
2489   if (op->start_connectivity_watch != nullptr) {
2490     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
2491                               std::move(op->start_connectivity_watch));
2492   }
2493   if (op->stop_connectivity_watch != nullptr) {
2494     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
2495   }
2496   // Ping.
2497   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2498     grpc_error* error = DoPingLocked(op);
2499     if (error != GRPC_ERROR_NONE) {
2500       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
2501                    GRPC_ERROR_REF(error));
2502       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
2503     }
2504     op->bind_pollset = nullptr;
2505     op->send_ping.on_initiate = nullptr;
2506     op->send_ping.on_ack = nullptr;
2507   }
2508   // Reset backoff.
2509   if (op->reset_connect_backoff) {
2510     if (lb_policy_ != nullptr) {
2511       lb_policy_->ResetBackoffLocked();
2512     }
2513   }
2514   // Disconnect or enter IDLE.
2515   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
2516     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2517       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
2518               grpc_error_string(op->disconnect_with_error));
2519     }
2520     DestroyResolverAndLbPolicyLocked();
2521     intptr_t value;
2522     if (grpc_error_get_int(op->disconnect_with_error,
2523                            GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
2524         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
2525       if (disconnect_error() == GRPC_ERROR_NONE) {
2526         // Enter IDLE state.
2527         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
2528                                    "channel entering IDLE", nullptr);
2529       }
2530       GRPC_ERROR_UNREF(op->disconnect_with_error);
2531     } else {
2532       // Disconnect.
2533       GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
2534                  GRPC_ERROR_NONE);
2535       disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
2536       UpdateStateAndPickerLocked(
2537           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
2538           absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2539               GRPC_ERROR_REF(op->disconnect_with_error)));
2540     }
2541   }
2542   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
2543   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
2544 }
2545 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)2546 void ChannelData::StartTransportOp(grpc_channel_element* elem,
2547                                    grpc_transport_op* op) {
2548   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2549   GPR_ASSERT(op->set_accept_stream == false);
2550   // Handle bind_pollset.
2551   if (op->bind_pollset != nullptr) {
2552     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
2553   }
2554   // Pop into control plane work_serializer for remaining ops.
2555   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
2556   chand->work_serializer_->Run(
2557       [chand, op]() { chand->StartTransportOpLocked(op); }, DEBUG_LOCATION);
2558 }
2559 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)2560 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
2561                                  const grpc_channel_info* info) {
2562   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2563   MutexLock lock(&chand->info_mu_);
2564   if (info->lb_policy_name != nullptr) {
2565     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
2566   }
2567   if (info->service_config_json != nullptr) {
2568     *info->service_config_json =
2569         gpr_strdup(chand->info_service_config_json_.get());
2570   }
2571 }
2572 
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)2573 void ChannelData::AddLbQueuedCall(LbQueuedCall* call,
2574                                   grpc_polling_entity* pollent) {
2575   // Add call to queued picks list.
2576   call->next = lb_queued_calls_;
2577   lb_queued_calls_ = call;
2578   // Add call's pollent to channel's interested_parties, so that I/O
2579   // can be done under the call's CQ.
2580   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2581 }
2582 
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)2583 void ChannelData::RemoveLbQueuedCall(LbQueuedCall* to_remove,
2584                                      grpc_polling_entity* pollent) {
2585   // Remove call's pollent from channel's interested_parties.
2586   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2587   // Remove from queued picks list.
2588   for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
2589        call = &(*call)->next) {
2590     if (*call == to_remove) {
2591       *call = to_remove->next;
2592       return;
2593     }
2594   }
2595 }
2596 
2597 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const2598 ChannelData::GetConnectedSubchannelInDataPlane(
2599     SubchannelInterface* subchannel) const {
2600   SubchannelWrapper* subchannel_wrapper =
2601       static_cast<SubchannelWrapper*>(subchannel);
2602   ConnectedSubchannel* connected_subchannel =
2603       subchannel_wrapper->connected_subchannel_in_data_plane();
2604   if (connected_subchannel == nullptr) return nullptr;
2605   return connected_subchannel->Ref();
2606 }
2607 
TryToConnectLocked()2608 void ChannelData::TryToConnectLocked() {
2609   if (lb_policy_ != nullptr) {
2610     lb_policy_->ExitIdleLocked();
2611   } else if (resolver_ == nullptr) {
2612     CreateResolverLocked();
2613   }
2614   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2615 }
2616 
CheckConnectivityState(bool try_to_connect)2617 grpc_connectivity_state ChannelData::CheckConnectivityState(
2618     bool try_to_connect) {
2619   grpc_connectivity_state out = state_tracker_.state();
2620   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2621     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2622     work_serializer_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION);
2623   }
2624   return out;
2625 }
2626 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2627 void ChannelData::AddConnectivityWatcher(
2628     grpc_connectivity_state initial_state,
2629     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2630   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2631 }
2632 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2633 void ChannelData::RemoveConnectivityWatcher(
2634     AsyncConnectivityStateWatcherInterface* watcher) {
2635   new ConnectivityWatcherRemover(this, watcher);
2636 }
2637 
2638 //
2639 // CallData implementation
2640 //
2641 
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)2642 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
2643                    const grpc_call_element_args& args)
2644     : deadline_state_(elem, args,
2645                       GPR_LIKELY(chand.deadline_checking_enabled())
2646                           ? args.deadline
2647                           : GRPC_MILLIS_INF_FUTURE),
2648       path_(grpc_slice_ref_internal(args.path)),
2649       call_start_time_(args.start_time),
2650       deadline_(args.deadline),
2651       arena_(args.arena),
2652       owning_call_(args.call_stack),
2653       call_combiner_(args.call_combiner),
2654       call_context_(args.context) {}
2655 
~CallData()2656 CallData::~CallData() {
2657   grpc_slice_unref_internal(path_);
2658   GRPC_ERROR_UNREF(cancel_error_);
2659   // Make sure there are no remaining pending batches.
2660   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2661     GPR_ASSERT(pending_batches_[i] == nullptr);
2662   }
2663 }
2664 
Init(grpc_call_element * elem,const grpc_call_element_args * args)2665 grpc_error* CallData::Init(grpc_call_element* elem,
2666                            const grpc_call_element_args* args) {
2667   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2668   new (elem->call_data) CallData(elem, *chand, *args);
2669   return GRPC_ERROR_NONE;
2670 }
2671 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2672 void CallData::Destroy(grpc_call_element* elem,
2673                        const grpc_call_final_info* /*final_info*/,
2674                        grpc_closure* then_schedule_closure) {
2675   CallData* calld = static_cast<CallData*>(elem->call_data);
2676   RefCountedPtr<DynamicFilters::Call> dynamic_call =
2677       std::move(calld->dynamic_call_);
2678   calld->~CallData();
2679   if (GPR_LIKELY(dynamic_call != nullptr)) {
2680     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2681   } else {
2682     // TODO(yashkt) : This can potentially be a Closure::Run
2683     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
2684   }
2685 }
2686 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2687 void CallData::StartTransportStreamOpBatch(
2688     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2689   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2690   CallData* calld = static_cast<CallData*>(elem->call_data);
2691   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2692   if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2693     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2694   }
2695   // Intercept recv_initial_metadata for config selector on-committed callback.
2696   if (batch->recv_initial_metadata) {
2697     calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
2698   }
2699   // If we've previously been cancelled, immediately fail any new batches.
2700   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2701     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2702       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2703               chand, calld, grpc_error_string(calld->cancel_error_));
2704     }
2705     // Note: This will release the call combiner.
2706     grpc_transport_stream_op_batch_finish_with_failure(
2707         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2708     return;
2709   }
2710   // Handle cancellation.
2711   if (GPR_UNLIKELY(batch->cancel_stream)) {
2712     // Stash a copy of cancel_error in our call data, so that we can use
2713     // it for subsequent operations.  This ensures that if the call is
2714     // cancelled before any batches are passed down (e.g., if the deadline
2715     // is in the past when the call starts), we can return the right
2716     // error to the caller when the first batch does get passed down.
2717     GRPC_ERROR_UNREF(calld->cancel_error_);
2718     calld->cancel_error_ =
2719         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2720     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2721       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2722               calld, grpc_error_string(calld->cancel_error_));
2723     }
2724     // If we do not have a dynamic call (i.e., name resolution has not
2725     // yet completed), fail all pending batches.  Otherwise, send the
2726     // cancellation down to the dynamic call.
2727     if (calld->dynamic_call_ == nullptr) {
2728       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2729                                 NoYieldCallCombiner);
2730       // Note: This will release the call combiner.
2731       grpc_transport_stream_op_batch_finish_with_failure(
2732           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2733     } else {
2734       // Note: This will release the call combiner.
2735       calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2736     }
2737     return;
2738   }
2739   // Add the batch to the pending list.
2740   calld->PendingBatchesAdd(elem, batch);
2741   // Check if we've already created a dynamic call.
2742   // Note that once we have done so, we do not need to acquire the channel's
2743   // resolution mutex, which is more efficient (especially for streaming calls).
2744   if (calld->dynamic_call_ != nullptr) {
2745     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2746       gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2747               chand, calld, calld->dynamic_call_.get());
2748     }
2749     calld->PendingBatchesResume(elem);
2750     return;
2751   }
2752   // We do not yet have a dynamic call.
2753   // For batches containing a send_initial_metadata op, acquire the
2754   // channel's resolution mutex to apply the service config to the call,
2755   // after which we will create a dynamic call.
2756   if (GPR_LIKELY(batch->send_initial_metadata)) {
2757     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2758       gpr_log(GPR_INFO,
2759               "chand=%p calld=%p: grabbing resolution mutex to apply service "
2760               "config",
2761               chand, calld);
2762     }
2763     CheckResolution(elem, GRPC_ERROR_NONE);
2764   } else {
2765     // For all other batches, release the call combiner.
2766     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2767       gpr_log(GPR_INFO,
2768               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2769               calld);
2770     }
2771     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2772                             "batch does not include send_initial_metadata");
2773   }
2774 }
2775 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2776 void CallData::SetPollent(grpc_call_element* elem,
2777                           grpc_polling_entity* pollent) {
2778   CallData* calld = static_cast<CallData*>(elem->call_data);
2779   calld->pollent_ = pollent;
2780 }
2781 
2782 //
2783 // pending_batches management
2784 //
2785 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2786 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2787   // Note: It is important the send_initial_metadata be the first entry
2788   // here, since the code in pick_subchannel_locked() assumes it will be.
2789   if (batch->send_initial_metadata) return 0;
2790   if (batch->send_message) return 1;
2791   if (batch->send_trailing_metadata) return 2;
2792   if (batch->recv_initial_metadata) return 3;
2793   if (batch->recv_message) return 4;
2794   if (batch->recv_trailing_metadata) return 5;
2795   GPR_UNREACHABLE_CODE(return (size_t)-1);
2796 }
2797 
2798 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2799 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2800                                  grpc_transport_stream_op_batch* batch) {
2801   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802   const size_t idx = GetBatchIndex(batch);
2803   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804     gpr_log(GPR_INFO,
2805             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2806             this, idx);
2807   }
2808   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2809   GPR_ASSERT(pending == nullptr);
2810   pending = batch;
2811 }
2812 
2813 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2814 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2815   grpc_transport_stream_op_batch* batch =
2816       static_cast<grpc_transport_stream_op_batch*>(arg);
2817   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2818   // Note: This will release the call combiner.
2819   grpc_transport_stream_op_batch_finish_with_failure(
2820       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2821 }
2822 
2823 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2824 void CallData::PendingBatchesFail(
2825     grpc_call_element* elem, grpc_error* error,
2826     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2827   GPR_ASSERT(error != GRPC_ERROR_NONE);
2828   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2829     size_t num_batches = 0;
2830     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831       if (pending_batches_[i] != nullptr) ++num_batches;
2832     }
2833     gpr_log(GPR_INFO,
2834             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2835             elem->channel_data, this, num_batches, grpc_error_string(error));
2836   }
2837   CallCombinerClosureList closures;
2838   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2839     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2840     if (batch != nullptr) {
2841       batch->handler_private.extra_arg = this;
2842       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2843                         FailPendingBatchInCallCombiner, batch,
2844                         grpc_schedule_on_exec_ctx);
2845       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2846                    "PendingBatchesFail");
2847       batch = nullptr;
2848     }
2849   }
2850   if (yield_call_combiner_predicate(closures)) {
2851     closures.RunClosures(call_combiner_);
2852   } else {
2853     closures.RunClosuresWithoutYielding(call_combiner_);
2854   }
2855   GRPC_ERROR_UNREF(error);
2856 }
2857 
2858 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)2859 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2860                                                 grpc_error* /*ignored*/) {
2861   grpc_transport_stream_op_batch* batch =
2862       static_cast<grpc_transport_stream_op_batch*>(arg);
2863   auto* elem =
2864       static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2865   auto* calld = static_cast<CallData*>(elem->call_data);
2866   // Note: This will release the call combiner.
2867   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2868 }
2869 
2870 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2871 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2872   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2873   // Retries not enabled; send down batches as-is.
2874   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2875     size_t num_batches = 0;
2876     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2877       if (pending_batches_[i] != nullptr) ++num_batches;
2878     }
2879     gpr_log(GPR_INFO,
2880             "chand=%p calld=%p: starting %" PRIuPTR
2881             " pending batches on dynamic_call=%p",
2882             chand, this, num_batches, dynamic_call_.get());
2883   }
2884   CallCombinerClosureList closures;
2885   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2886     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2887     if (batch != nullptr) {
2888       batch->handler_private.extra_arg = elem;
2889       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2890                         ResumePendingBatchInCallCombiner, batch, nullptr);
2891       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2892                    "PendingBatchesResume");
2893       batch = nullptr;
2894     }
2895   }
2896   // Note: This will release the call combiner.
2897   closures.RunClosures(call_combiner_);
2898 }
2899 
2900 //
2901 // name resolution
2902 //
2903 
2904 // A class to handle the call combiner cancellation callback for a
2905 // queued pick.
2906 class CallData::ResolverQueuedCallCanceller {
2907  public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2908   explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2909     auto* calld = static_cast<CallData*>(elem->call_data);
2910     GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2911     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2912                       grpc_schedule_on_exec_ctx);
2913     calld->call_combiner_->SetNotifyOnCancel(&closure_);
2914   }
2915 
2916  private:
CancelLocked(void * arg,grpc_error * error)2917   static void CancelLocked(void* arg, grpc_error* error) {
2918     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2919     auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
2920     auto* calld = static_cast<CallData*>(self->elem_->call_data);
2921     {
2922       MutexLock lock(chand->resolution_mu());
2923       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2924         gpr_log(GPR_INFO,
2925                 "chand=%p calld=%p: cancelling resolver queued pick: "
2926                 "error=%s self=%p calld->resolver_pick_canceller=%p",
2927                 chand, calld, grpc_error_string(error), self,
2928                 calld->resolver_call_canceller_);
2929       }
2930       if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2931         // Remove pick from list of queued picks.
2932         calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2933         // Fail pending batches on the call.
2934         calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2935                                   YieldCallCombinerIfPendingBatchesFound);
2936       }
2937     }
2938     GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2939     delete self;
2940   }
2941 
2942   grpc_call_element* elem_;
2943   grpc_closure closure_;
2944 };
2945 
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2946 void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2947     grpc_call_element* elem) {
2948   if (!queued_pending_resolver_result_) return;
2949   auto* chand = static_cast<ChannelData*>(elem->channel_data);
2950   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2951     gpr_log(GPR_INFO,
2952             "chand=%p calld=%p: removing from resolver queued picks list",
2953             chand, this);
2954   }
2955   chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2956   queued_pending_resolver_result_ = false;
2957   // Lame the call combiner canceller.
2958   resolver_call_canceller_ = nullptr;
2959 }
2960 
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2961 void CallData::MaybeAddCallToResolverQueuedCallsLocked(
2962     grpc_call_element* elem) {
2963   if (queued_pending_resolver_result_) return;
2964   auto* chand = static_cast<ChannelData*>(elem->channel_data);
2965   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2966     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2967             chand, this);
2968   }
2969   queued_pending_resolver_result_ = true;
2970   resolver_queued_call_.elem = elem;
2971   chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2972   // Register call combiner cancellation callback.
2973   resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2974 }
2975 
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2976 grpc_error* CallData::ApplyServiceConfigToCallLocked(
2977     grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2978   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2979   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2980     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2981             chand, this);
2982   }
2983   ConfigSelector* config_selector = chand->config_selector();
2984   if (config_selector != nullptr) {
2985     // Use the ConfigSelector to determine the config for the call.
2986     ConfigSelector::CallConfig call_config =
2987         config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2988     if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2989     on_call_committed_ = std::move(call_config.on_call_committed);
2990     // Create a ServiceConfigCallData for the call.  This stores a ref to the
2991     // ServiceConfig and caches the right set of parsed configs to use for
2992     // the call.  The MethodConfig will store itself in the call context,
2993     // so that it can be accessed by filters in the subchannel, and it
2994     // will be cleaned up when the call ends.
2995     auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2996         std::move(call_config.service_config), call_config.method_configs,
2997         std::move(call_config.call_attributes), call_context_);
2998     // Apply our own method params to the call.
2999     auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
3000         service_config_call_data->GetMethodParsedConfig(
3001             internal::ClientChannelServiceConfigParser::ParserIndex()));
3002     if (method_params != nullptr) {
3003       // If the deadline from the service config is shorter than the one
3004       // from the client API, reset the deadline timer.
3005       if (chand->deadline_checking_enabled() && method_params->timeout() != 0) {
3006         const grpc_millis per_method_deadline =
3007             grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3008             method_params->timeout();
3009         if (per_method_deadline < deadline_) {
3010           deadline_ = per_method_deadline;
3011           grpc_deadline_state_reset(elem, deadline_);
3012         }
3013       }
3014       // If the service config set wait_for_ready and the application
3015       // did not explicitly set it, use the value from the service config.
3016       uint32_t* send_initial_metadata_flags =
3017           &pending_batches_[0]
3018                ->payload->send_initial_metadata.send_initial_metadata_flags;
3019       if (method_params->wait_for_ready().has_value() &&
3020           !(*send_initial_metadata_flags &
3021             GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3022         if (method_params->wait_for_ready().value()) {
3023           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3024         } else {
3025           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3026         }
3027       }
3028     }
3029     // Set the dynamic filter stack.
3030     dynamic_filters_ = chand->dynamic_filters();
3031   }
3032   return GRPC_ERROR_NONE;
3033 }
3034 
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error * error)3035 void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback(
3036     void* arg, grpc_error* error) {
3037   auto* self = static_cast<CallData*>(arg);
3038   if (self->on_call_committed_ != nullptr) {
3039     self->on_call_committed_();
3040     self->on_call_committed_ = nullptr;
3041   }
3042   // Chain to original callback.
3043   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3044                GRPC_ERROR_REF(error));
3045 }
3046 
3047 // TODO(roth): Consider not intercepting this callback unless we
3048 // actually need to, if this causes a performance problem.
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)3049 void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
3050     grpc_transport_stream_op_batch* batch) {
3051   original_recv_initial_metadata_ready_ =
3052       batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3053   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
3054                     RecvInitialMetadataReadyForConfigSelectorCommitCallback,
3055                     this, nullptr);
3056   batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3057       &recv_initial_metadata_ready_;
3058 }
3059 
AsyncResolutionDone(grpc_call_element * elem,grpc_error * error)3060 void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) {
3061   GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
3062   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
3063 }
3064 
ResolutionDone(void * arg,grpc_error * error)3065 void CallData::ResolutionDone(void* arg, grpc_error* error) {
3066   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3067   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3068   CallData* calld = static_cast<CallData*>(elem->call_data);
3069   if (error != GRPC_ERROR_NONE) {
3070     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3071       gpr_log(GPR_INFO,
3072               "chand=%p calld=%p: error applying config to call: error=%s",
3073               chand, calld, grpc_error_string(error));
3074     }
3075     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3076     return;
3077   }
3078   calld->CreateDynamicCall(elem);
3079 }
3080 
CheckResolution(void * arg,grpc_error * error)3081 void CallData::CheckResolution(void* arg, grpc_error* error) {
3082   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3083   CallData* calld = static_cast<CallData*>(elem->call_data);
3084   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3085   bool resolution_complete;
3086   {
3087     MutexLock lock(chand->resolution_mu());
3088     resolution_complete = calld->CheckResolutionLocked(elem, &error);
3089   }
3090   if (resolution_complete) {
3091     ResolutionDone(elem, error);
3092     GRPC_ERROR_UNREF(error);
3093   }
3094 }
3095 
CheckResolutionLocked(grpc_call_element * elem,grpc_error ** error)3096 bool CallData::CheckResolutionLocked(grpc_call_element* elem,
3097                                      grpc_error** error) {
3098   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3099   // If we're still in IDLE, we need to start resolving.
3100   if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
3101     // Bounce into the control plane work serializer to start resolving,
3102     // in case we are still in IDLE state.  Since we are holding on to the
3103     // resolution mutex here, we offload it on the ExecCtx so that we don't
3104     // deadlock with ourselves.
3105     GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked");
3106     ExecCtx::Run(
3107         DEBUG_LOCATION,
3108         GRPC_CLOSURE_CREATE(
3109             [](void* arg, grpc_error* /*error*/) {
3110               auto* chand = static_cast<ChannelData*>(arg);
3111               chand->work_serializer()->Run(
3112                   [chand]() {
3113                     chand->CheckConnectivityState(/*try_to_connect=*/true);
3114                     GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(),
3115                                              "CheckResolutionLocked");
3116                   },
3117                   DEBUG_LOCATION);
3118             },
3119             chand, nullptr),
3120         GRPC_ERROR_NONE);
3121   }
3122   // Get send_initial_metadata batch and flags.
3123   auto& send_initial_metadata =
3124       pending_batches_[0]->payload->send_initial_metadata;
3125   grpc_metadata_batch* initial_metadata_batch =
3126       send_initial_metadata.send_initial_metadata;
3127   const uint32_t send_initial_metadata_flags =
3128       send_initial_metadata.send_initial_metadata_flags;
3129   // If we don't yet have a resolver result, we need to queue the call
3130   // until we get one.
3131   if (GPR_UNLIKELY(!chand->received_service_config_data())) {
3132     // If the resolver returned transient failure before returning the
3133     // first service config, fail any non-wait_for_ready calls.
3134     grpc_error* resolver_error = chand->resolver_transient_failure_error();
3135     if (resolver_error != GRPC_ERROR_NONE &&
3136         (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
3137             0) {
3138       MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3139       *error = GRPC_ERROR_REF(resolver_error);
3140       return true;
3141     }
3142     // Either the resolver has not yet returned a result, or it has
3143     // returned transient failure but the call is wait_for_ready.  In
3144     // either case, queue the call.
3145     MaybeAddCallToResolverQueuedCallsLocked(elem);
3146     return false;
3147   }
3148   // Apply service config to call if not yet applied.
3149   if (GPR_LIKELY(!service_config_applied_)) {
3150     service_config_applied_ = true;
3151     *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
3152   }
3153   MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3154   return true;
3155 }
3156 
CreateDynamicCall(grpc_call_element * elem)3157 void CallData::CreateDynamicCall(grpc_call_element* elem) {
3158   auto* chand = static_cast<ChannelData*>(elem->channel_data);
3159   DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
3160                                      pollent_,
3161                                      path_,
3162                                      call_start_time_,
3163                                      deadline_,
3164                                      arena_,
3165                                      call_context_,
3166                                      call_combiner_};
3167   grpc_error* error = GRPC_ERROR_NONE;
3168   DynamicFilters* channel_stack = args.channel_stack.get();
3169   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
3170   if (error != GRPC_ERROR_NONE) {
3171     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3172       gpr_log(GPR_INFO,
3173               "chand=%p calld=%p: failed to create dynamic call: error=%s",
3174               chand, this, grpc_error_string(error));
3175     }
3176     PendingBatchesFail(elem, error, YieldCallCombiner);
3177     return;
3178   }
3179   PendingBatchesResume(elem);
3180 }
3181 
3182 //
3183 // RetryingCall implementation
3184 //
3185 
3186 // Retry support:
3187 //
3188 // In order to support retries, we act as a proxy for stream op batches.
3189 // When we get a batch from the surface, we add it to our list of pending
3190 // batches, and we then use those batches to construct separate "child"
3191 // batches to be started on the subchannel call.  When the child batches
3192 // return, we then decide which pending batches have been completed and
3193 // schedule their callbacks accordingly.  If a subchannel call fails and
3194 // we want to retry it, we do a new pick and start again, constructing
3195 // new "child" batches for the new subchannel call.
3196 //
3197 // Note that retries are committed when receiving data from the server
3198 // (except for Trailers-Only responses).  However, there may be many
3199 // send ops started before receiving any data, so we may have already
3200 // completed some number of send ops (and returned the completions up to
3201 // the surface) by the time we realize that we need to retry.  To deal
3202 // with this, we cache data for send ops, so that we can replay them on a
3203 // different subchannel call even after we have completed the original
3204 // batches.
3205 //
3206 // There are two sets of data to maintain:
3207 // - In call_data (in the parent channel), we maintain a list of pending
3208 //   ops and cached data for send ops.
3209 // - In the subchannel call, we maintain state to indicate what ops have
3210 //   already been sent down to that call.
3211 //
3212 // When constructing the "child" batches, we compare those two sets of
3213 // data to see which batches need to be sent to the subchannel call.
3214 
3215 // TODO(roth): In subsequent PRs:
3216 // - add support for transparent retries (including initial metadata)
3217 // - figure out how to record stats in census for retries
3218 //   (census filter is on top of this one)
3219 // - add census stats for retries
3220 
RetryingCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,const ClientChannelMethodParsedConfig::RetryPolicy * retry_policy)3221 RetryingCall::RetryingCall(
3222     ChannelData* chand, const grpc_call_element_args& args,
3223     grpc_polling_entity* pollent,
3224     RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
3225     const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy)
3226     : chand_(chand),
3227       pollent_(pollent),
3228       retry_throttle_data_(std::move(retry_throttle_data)),
3229       retry_policy_(retry_policy),
3230       retry_backoff_(
3231           BackOff::Options()
3232               .set_initial_backoff(
3233                   retry_policy_ == nullptr ? 0 : retry_policy_->initial_backoff)
3234               .set_multiplier(retry_policy_ == nullptr
3235                                   ? 0
3236                                   : retry_policy_->backoff_multiplier)
3237               .set_jitter(RETRY_BACKOFF_JITTER)
3238               .set_max_backoff(
3239                   retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff)),
3240       path_(grpc_slice_ref_internal(args.path)),
3241       call_start_time_(args.start_time),
3242       deadline_(args.deadline),
3243       arena_(args.arena),
3244       owning_call_(args.call_stack),
3245       call_combiner_(args.call_combiner),
3246       call_context_(args.context),
3247       pending_send_initial_metadata_(false),
3248       pending_send_message_(false),
3249       pending_send_trailing_metadata_(false),
3250       enable_retries_(true),
3251       retry_committed_(false),
3252       last_attempt_got_server_pushback_(false) {}
3253 
~RetryingCall()3254 RetryingCall::~RetryingCall() {
3255   grpc_slice_unref_internal(path_);
3256   GRPC_ERROR_UNREF(cancel_error_);
3257   // Make sure there are no remaining pending batches.
3258   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3259     GPR_ASSERT(pending_batches_[i].batch == nullptr);
3260   }
3261 }
3262 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3263 void RetryingCall::StartTransportStreamOpBatch(
3264     grpc_transport_stream_op_batch* batch) {
3265   // If we've previously been cancelled, immediately fail any new batches.
3266   if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
3267     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3268       gpr_log(GPR_INFO,
3269               "chand=%p retrying_call=%p: failing batch with error: %s", chand_,
3270               this, grpc_error_string(cancel_error_));
3271     }
3272     // Note: This will release the call combiner.
3273     grpc_transport_stream_op_batch_finish_with_failure(
3274         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3275     return;
3276   }
3277   // Handle cancellation.
3278   if (GPR_UNLIKELY(batch->cancel_stream)) {
3279     // Stash a copy of cancel_error in our call data, so that we can use
3280     // it for subsequent operations.  This ensures that if the call is
3281     // cancelled before any batches are passed down (e.g., if the deadline
3282     // is in the past when the call starts), we can return the right
3283     // error to the caller when the first batch does get passed down.
3284     GRPC_ERROR_UNREF(cancel_error_);
3285     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3286     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3287       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: recording cancel_error=%s",
3288               chand_, this, grpc_error_string(cancel_error_));
3289     }
3290     // If we do not have an LB call (i.e., a pick has not yet been started),
3291     // fail all pending batches.  Otherwise, send the cancellation down to the
3292     // LB call.
3293     if (lb_call_ == nullptr) {
3294       // TODO(roth): If there is a pending retry callback, do we need to
3295       // cancel it here?
3296       PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
3297       // Note: This will release the call combiner.
3298       grpc_transport_stream_op_batch_finish_with_failure(
3299           batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3300     } else {
3301       // Note: This will release the call combiner.
3302       lb_call_->StartTransportStreamOpBatch(batch);
3303     }
3304     return;
3305   }
3306   // Add the batch to the pending list.
3307   PendingBatchesAdd(batch);
3308   // Create LB call if needed.
3309   // TODO(roth): If we get a new batch from the surface after the
3310   // initial retry attempt has failed, while the retry timer is pending,
3311   // we should queue the batch and not try to send it immediately.
3312   if (lb_call_ == nullptr) {
3313     // We do not yet have an LB call, so create one.
3314     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3315       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: creating LB call", chand_,
3316               this);
3317     }
3318     CreateLbCall(this, GRPC_ERROR_NONE);
3319     return;
3320   }
3321   // Send batches to LB call.
3322   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3323     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: starting batch on lb_call=%p",
3324             chand_, this, lb_call_.get());
3325   }
3326   PendingBatchesResume();
3327 }
3328 
subchannel_call() const3329 RefCountedPtr<SubchannelCall> RetryingCall::subchannel_call() const {
3330   if (lb_call_ == nullptr) return nullptr;
3331   return lb_call_->subchannel_call();
3332 }
3333 
3334 //
3335 // send op data caching
3336 //
3337 
MaybeCacheSendOpsForBatch(PendingBatch * pending)3338 void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
3339   if (pending->send_ops_cached) return;
3340   pending->send_ops_cached = true;
3341   grpc_transport_stream_op_batch* batch = pending->batch;
3342   // Save a copy of metadata for send_initial_metadata ops.
3343   if (batch->send_initial_metadata) {
3344     seen_send_initial_metadata_ = true;
3345     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
3346     grpc_metadata_batch* send_initial_metadata =
3347         batch->payload->send_initial_metadata.send_initial_metadata;
3348     send_initial_metadata_storage_ =
3349         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3350             sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
3351     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
3352                              send_initial_metadata_storage_);
3353     send_initial_metadata_flags_ =
3354         batch->payload->send_initial_metadata.send_initial_metadata_flags;
3355     peer_string_ = batch->payload->send_initial_metadata.peer_string;
3356   }
3357   // Set up cache for send_message ops.
3358   if (batch->send_message) {
3359     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
3360         std::move(batch->payload->send_message.send_message));
3361     send_messages_.push_back(cache);
3362   }
3363   // Save metadata batch for send_trailing_metadata ops.
3364   if (batch->send_trailing_metadata) {
3365     seen_send_trailing_metadata_ = true;
3366     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
3367     grpc_metadata_batch* send_trailing_metadata =
3368         batch->payload->send_trailing_metadata.send_trailing_metadata;
3369     send_trailing_metadata_storage_ =
3370         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3371             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
3372     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
3373                              send_trailing_metadata_storage_);
3374   }
3375 }
3376 
FreeCachedSendInitialMetadata()3377 void RetryingCall::FreeCachedSendInitialMetadata() {
3378   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3379     gpr_log(GPR_INFO,
3380             "chand=%p retrying_call=%p: destroying send_initial_metadata",
3381             chand_, this);
3382   }
3383   grpc_metadata_batch_destroy(&send_initial_metadata_);
3384 }
3385 
FreeCachedSendMessage(size_t idx)3386 void RetryingCall::FreeCachedSendMessage(size_t idx) {
3387   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3388     gpr_log(GPR_INFO,
3389             "chand=%p retrying_call=%p: destroying send_messages[%" PRIuPTR "]",
3390             chand_, this, idx);
3391   }
3392   send_messages_[idx]->Destroy();
3393 }
3394 
FreeCachedSendTrailingMetadata()3395 void RetryingCall::FreeCachedSendTrailingMetadata() {
3396   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3397     gpr_log(GPR_INFO,
3398             "chand_=%p retrying_call=%p: destroying send_trailing_metadata",
3399             chand_, this);
3400   }
3401   grpc_metadata_batch_destroy(&send_trailing_metadata_);
3402 }
3403 
FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState * retry_state)3404 void RetryingCall::FreeCachedSendOpDataAfterCommit(
3405     SubchannelCallRetryState* retry_state) {
3406   if (retry_state->completed_send_initial_metadata) {
3407     FreeCachedSendInitialMetadata();
3408   }
3409   for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
3410     FreeCachedSendMessage(i);
3411   }
3412   if (retry_state->completed_send_trailing_metadata) {
3413     FreeCachedSendTrailingMetadata();
3414   }
3415 }
3416 
FreeCachedSendOpDataForCompletedBatch(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)3417 void RetryingCall::FreeCachedSendOpDataForCompletedBatch(
3418     SubchannelCallBatchData* batch_data,
3419     SubchannelCallRetryState* retry_state) {
3420   if (batch_data->batch.send_initial_metadata) {
3421     FreeCachedSendInitialMetadata();
3422   }
3423   if (batch_data->batch.send_message) {
3424     FreeCachedSendMessage(retry_state->completed_send_message_count - 1);
3425   }
3426   if (batch_data->batch.send_trailing_metadata) {
3427     FreeCachedSendTrailingMetadata();
3428   }
3429 }
3430 
3431 //
3432 // pending_batches management
3433 //
3434 
GetBatchIndex(grpc_transport_stream_op_batch * batch)3435 size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
3436   // Note: It is important the send_initial_metadata be the first entry
3437   // here, since the code in pick_subchannel_locked() assumes it will be.
3438   if (batch->send_initial_metadata) return 0;
3439   if (batch->send_message) return 1;
3440   if (batch->send_trailing_metadata) return 2;
3441   if (batch->recv_initial_metadata) return 3;
3442   if (batch->recv_message) return 4;
3443   if (batch->recv_trailing_metadata) return 5;
3444   GPR_UNREACHABLE_CODE(return (size_t)-1);
3445 }
3446 
3447 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3448 void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) {
3449   const size_t idx = GetBatchIndex(batch);
3450   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3451     gpr_log(
3452         GPR_INFO,
3453         "chand_=%p retrying_call=%p: adding pending batch at index %" PRIuPTR,
3454         chand_, this, idx);
3455   }
3456   PendingBatch* pending = &pending_batches_[idx];
3457   GPR_ASSERT(pending->batch == nullptr);
3458   pending->batch = batch;
3459   pending->send_ops_cached = false;
3460   if (enable_retries_) {
3461     // Update state in calld about pending batches.
3462     // Also check if the batch takes us over the retry buffer limit.
3463     // Note: We don't check the size of trailing metadata here, because
3464     // gRPC clients do not send trailing metadata.
3465     if (batch->send_initial_metadata) {
3466       pending_send_initial_metadata_ = true;
3467       bytes_buffered_for_retry_ += grpc_metadata_batch_size(
3468           batch->payload->send_initial_metadata.send_initial_metadata);
3469     }
3470     if (batch->send_message) {
3471       pending_send_message_ = true;
3472       bytes_buffered_for_retry_ +=
3473           batch->payload->send_message.send_message->length();
3474     }
3475     if (batch->send_trailing_metadata) {
3476       pending_send_trailing_metadata_ = true;
3477     }
3478     if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
3479                      chand_->per_rpc_retry_buffer_size())) {
3480       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3481         gpr_log(GPR_INFO,
3482                 "chand=%p retrying_call=%p: exceeded retry buffer size, "
3483                 "committing",
3484                 chand_, this);
3485       }
3486       SubchannelCallRetryState* retry_state =
3487           lb_call_ == nullptr ? nullptr
3488                               : static_cast<SubchannelCallRetryState*>(
3489                                     lb_call_->GetParentData());
3490       RetryCommit(retry_state);
3491       // If we are not going to retry and have not yet started, pretend
3492       // retries are disabled so that we don't bother with retry overhead.
3493       if (num_attempts_completed_ == 0) {
3494         if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3495           gpr_log(GPR_INFO,
3496                   "chand=%p retrying_call=%p: disabling retries before first "
3497                   "attempt",
3498                   chand_, this);
3499         }
3500         // TODO(roth): Treat this as a commit?
3501         enable_retries_ = false;
3502       }
3503     }
3504   }
3505 }
3506 
PendingBatchClear(PendingBatch * pending)3507 void RetryingCall::PendingBatchClear(PendingBatch* pending) {
3508   if (enable_retries_) {
3509     if (pending->batch->send_initial_metadata) {
3510       pending_send_initial_metadata_ = false;
3511     }
3512     if (pending->batch->send_message) {
3513       pending_send_message_ = false;
3514     }
3515     if (pending->batch->send_trailing_metadata) {
3516       pending_send_trailing_metadata_ = false;
3517     }
3518   }
3519   pending->batch = nullptr;
3520 }
3521 
MaybeClearPendingBatch(PendingBatch * pending)3522 void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) {
3523   grpc_transport_stream_op_batch* batch = pending->batch;
3524   // We clear the pending batch if all of its callbacks have been
3525   // scheduled and reset to nullptr.
3526   if (batch->on_complete == nullptr &&
3527       (!batch->recv_initial_metadata ||
3528        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
3529            nullptr) &&
3530       (!batch->recv_message ||
3531        batch->payload->recv_message.recv_message_ready == nullptr) &&
3532       (!batch->recv_trailing_metadata ||
3533        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
3534            nullptr)) {
3535     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3536       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: clearing pending batch",
3537               chand_, this);
3538     }
3539     PendingBatchClear(pending);
3540   }
3541 }
3542 
3543 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)3544 void RetryingCall::FailPendingBatchInCallCombiner(void* arg,
3545                                                   grpc_error* error) {
3546   grpc_transport_stream_op_batch* batch =
3547       static_cast<grpc_transport_stream_op_batch*>(arg);
3548   RetryingCall* call =
3549       static_cast<RetryingCall*>(batch->handler_private.extra_arg);
3550   // Note: This will release the call combiner.
3551   grpc_transport_stream_op_batch_finish_with_failure(
3552       batch, GRPC_ERROR_REF(error), call->call_combiner_);
3553 }
3554 
3555 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)3556 void RetryingCall::PendingBatchesFail(
3557     grpc_error* error,
3558     YieldCallCombinerPredicate yield_call_combiner_predicate) {
3559   GPR_ASSERT(error != GRPC_ERROR_NONE);
3560   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3561     size_t num_batches = 0;
3562     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3563       if (pending_batches_[i].batch != nullptr) ++num_batches;
3564     }
3565     gpr_log(GPR_INFO,
3566             "chand=%p retrying_call=%p: failing %" PRIuPTR
3567             " pending batches: %s",
3568             chand_, this, num_batches, grpc_error_string(error));
3569   }
3570   CallCombinerClosureList closures;
3571   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3572     PendingBatch* pending = &pending_batches_[i];
3573     grpc_transport_stream_op_batch* batch = pending->batch;
3574     if (batch != nullptr) {
3575       batch->handler_private.extra_arg = this;
3576       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3577                         FailPendingBatchInCallCombiner, batch,
3578                         grpc_schedule_on_exec_ctx);
3579       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
3580                    "PendingBatchesFail");
3581       PendingBatchClear(pending);
3582     }
3583   }
3584   if (yield_call_combiner_predicate(closures)) {
3585     closures.RunClosures(call_combiner_);
3586   } else {
3587     closures.RunClosuresWithoutYielding(call_combiner_);
3588   }
3589   GRPC_ERROR_UNREF(error);
3590 }
3591 
3592 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)3593 void RetryingCall::ResumePendingBatchInCallCombiner(void* arg,
3594                                                     grpc_error* /*ignored*/) {
3595   grpc_transport_stream_op_batch* batch =
3596       static_cast<grpc_transport_stream_op_batch*>(arg);
3597   auto* lb_call =
3598       static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
3599   // Note: This will release the call combiner.
3600   lb_call->StartTransportStreamOpBatch(batch);
3601 }
3602 
3603 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3604 void RetryingCall::PendingBatchesResume() {
3605   if (enable_retries_) {
3606     StartRetriableSubchannelBatches(this, GRPC_ERROR_NONE);
3607     return;
3608   }
3609   // Retries not enabled; send down batches as-is.
3610   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3611     size_t num_batches = 0;
3612     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3613       if (pending_batches_[i].batch != nullptr) ++num_batches;
3614     }
3615     gpr_log(GPR_INFO,
3616             "chand=%p retrying_call=%p: starting %" PRIuPTR
3617             " pending batches on lb_call=%p",
3618             chand_, this, num_batches, lb_call_.get());
3619   }
3620   CallCombinerClosureList closures;
3621   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3622     PendingBatch* pending = &pending_batches_[i];
3623     grpc_transport_stream_op_batch* batch = pending->batch;
3624     if (batch != nullptr) {
3625       batch->handler_private.extra_arg = lb_call_.get();
3626       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3627                         ResumePendingBatchInCallCombiner, batch, nullptr);
3628       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3629                    "PendingBatchesResume");
3630       PendingBatchClear(pending);
3631     }
3632   }
3633   // Note: This will release the call combiner.
3634   closures.RunClosures(call_combiner_);
3635 }
3636 
3637 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)3638 RetryingCall::PendingBatch* RetryingCall::PendingBatchFind(
3639     const char* log_message, Predicate predicate) {
3640   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3641     PendingBatch* pending = &pending_batches_[i];
3642     grpc_transport_stream_op_batch* batch = pending->batch;
3643     if (batch != nullptr && predicate(batch)) {
3644       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3645         gpr_log(
3646             GPR_INFO,
3647             "chand=%p retrying_call=%p: %s pending batch at index %" PRIuPTR,
3648             chand_, this, log_message, i);
3649       }
3650       return pending;
3651     }
3652   }
3653   return nullptr;
3654 }
3655 
3656 //
3657 // retry code
3658 //
3659 
RetryCommit(SubchannelCallRetryState * retry_state)3660 void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) {
3661   if (retry_committed_) return;
3662   retry_committed_ = true;
3663   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3664     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: committing retries", chand_,
3665             this);
3666   }
3667   if (retry_state != nullptr) {
3668     FreeCachedSendOpDataAfterCommit(retry_state);
3669   }
3670 }
3671 
DoRetry(SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)3672 void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state,
3673                            grpc_millis server_pushback_ms) {
3674   GPR_ASSERT(retry_policy_ != nullptr);
3675   // Reset LB call.
3676   lb_call_.reset();
3677   // Compute backoff delay.
3678   grpc_millis next_attempt_time;
3679   if (server_pushback_ms >= 0) {
3680     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
3681     last_attempt_got_server_pushback_ = true;
3682   } else {
3683     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
3684       last_attempt_got_server_pushback_ = false;
3685     }
3686     next_attempt_time = retry_backoff_.NextAttemptTime();
3687   }
3688   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3689     gpr_log(GPR_INFO,
3690             "chand=%p retrying_call=%p: retrying failed call in %" PRId64 " ms",
3691             chand_, this, next_attempt_time - ExecCtx::Get()->Now());
3692   }
3693   // Schedule retry after computed delay.
3694   GRPC_CLOSURE_INIT(&retry_closure_, CreateLbCall, this, nullptr);
3695   grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
3696   // Update bookkeeping.
3697   if (retry_state != nullptr) retry_state->retry_dispatched = true;
3698 }
3699 
MaybeRetry(SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)3700 bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data,
3701                               grpc_status_code status,
3702                               grpc_mdelem* server_pushback_md) {
3703   // Get retry policy.
3704   if (retry_policy_ == nullptr) return false;
3705   // If we've already dispatched a retry from this call, return true.
3706   // This catches the case where the batch has multiple callbacks
3707   // (i.e., it includes either recv_message or recv_initial_metadata).
3708   SubchannelCallRetryState* retry_state = nullptr;
3709   if (batch_data != nullptr) {
3710     retry_state = static_cast<SubchannelCallRetryState*>(
3711         batch_data->lb_call->GetParentData());
3712     if (retry_state->retry_dispatched) {
3713       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3714         gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retry already dispatched",
3715                 chand_, this);
3716       }
3717       return true;
3718     }
3719   }
3720   // Check status.
3721   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
3722     if (retry_throttle_data_ != nullptr) {
3723       retry_throttle_data_->RecordSuccess();
3724     }
3725     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3726       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call succeeded", chand_,
3727               this);
3728     }
3729     return false;
3730   }
3731   // Status is not OK.  Check whether the status is retryable.
3732   if (!retry_policy_->retryable_status_codes.Contains(status)) {
3733     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3734       gpr_log(
3735           GPR_INFO,
3736           "chand=%p retrying_call=%p: status %s not configured as retryable",
3737           chand_, this, grpc_status_code_to_string(status));
3738     }
3739     return false;
3740   }
3741   // Record the failure and check whether retries are throttled.
3742   // Note that it's important for this check to come after the status
3743   // code check above, since we should only record failures whose statuses
3744   // match the configured retryable status codes, so that we don't count
3745   // things like failures due to malformed requests (INVALID_ARGUMENT).
3746   // Conversely, it's important for this to come before the remaining
3747   // checks, so that we don't fail to record failures due to other factors.
3748   if (retry_throttle_data_ != nullptr &&
3749       !retry_throttle_data_->RecordFailure()) {
3750     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3751       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries throttled", chand_,
3752               this);
3753     }
3754     return false;
3755   }
3756   // Check whether the call is committed.
3757   if (retry_committed_) {
3758     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3759       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries already committed",
3760               chand_, this);
3761     }
3762     return false;
3763   }
3764   // Check whether we have retries remaining.
3765   ++num_attempts_completed_;
3766   if (num_attempts_completed_ >= retry_policy_->max_attempts) {
3767     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3768       gpr_log(GPR_INFO, "chand=%p retrying_call=%p: exceeded %d retry attempts",
3769               chand_, this, retry_policy_->max_attempts);
3770     }
3771     return false;
3772   }
3773   // If the call was cancelled from the surface, don't retry.
3774   if (cancel_error_ != GRPC_ERROR_NONE) {
3775     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3776       gpr_log(GPR_INFO,
3777               "chand=%p retrying_call=%p: call cancelled from surface, not "
3778               "retrying",
3779               chand_, this);
3780     }
3781     return false;
3782   }
3783   // Check server push-back.
3784   grpc_millis server_pushback_ms = -1;
3785   if (server_pushback_md != nullptr) {
3786     // If the value is "-1" or any other unparseable string, we do not retry.
3787     uint32_t ms;
3788     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
3789       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3790         gpr_log(
3791             GPR_INFO,
3792             "chand=%p retrying_call=%p: not retrying due to server push-back",
3793             chand_, this);
3794       }
3795       return false;
3796     } else {
3797       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3798         gpr_log(GPR_INFO,
3799                 "chand=%p retrying_call=%p: server push-back: retry in %u ms",
3800                 chand_, this, ms);
3801       }
3802       server_pushback_ms = static_cast<grpc_millis>(ms);
3803     }
3804   }
3805   DoRetry(retry_state, server_pushback_ms);
3806   return true;
3807 }
3808 
3809 //
3810 // RetryingCall::SubchannelCallBatchData
3811 //
3812 
3813 RetryingCall::SubchannelCallBatchData*
Create(RetryingCall * call,int refcount,bool set_on_complete)3814 RetryingCall::SubchannelCallBatchData::Create(RetryingCall* call, int refcount,
3815                                               bool set_on_complete) {
3816   return call->arena_->New<SubchannelCallBatchData>(call, refcount,
3817                                                     set_on_complete);
3818 }
3819 
SubchannelCallBatchData(RetryingCall * call,int refcount,bool set_on_complete)3820 RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData(
3821     RetryingCall* call, int refcount, bool set_on_complete)
3822     : call(call), lb_call(call->lb_call_) {
3823   SubchannelCallRetryState* retry_state =
3824       static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3825   batch.payload = &retry_state->batch_payload;
3826   gpr_ref_init(&refs, refcount);
3827   if (set_on_complete) {
3828     GRPC_CLOSURE_INIT(&on_complete, RetryingCall::OnComplete, this,
3829                       grpc_schedule_on_exec_ctx);
3830     batch.on_complete = &on_complete;
3831   }
3832   GRPC_CALL_STACK_REF(call->owning_call_, "batch_data");
3833 }
3834 
Destroy()3835 void RetryingCall::SubchannelCallBatchData::Destroy() {
3836   SubchannelCallRetryState* retry_state =
3837       static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3838   if (batch.send_initial_metadata) {
3839     grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
3840   }
3841   if (batch.send_trailing_metadata) {
3842     grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
3843   }
3844   if (batch.recv_initial_metadata) {
3845     grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
3846   }
3847   if (batch.recv_trailing_metadata) {
3848     grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
3849   }
3850   lb_call.reset();
3851   GRPC_CALL_STACK_UNREF(call->owning_call_, "batch_data");
3852 }
3853 
3854 //
3855 // recv_initial_metadata callback handling
3856 //
3857 
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)3858 void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg,
3859                                                      grpc_error* error) {
3860   SubchannelCallBatchData* batch_data =
3861       static_cast<SubchannelCallBatchData*>(arg);
3862   // Find pending batch.
3863   PendingBatch* pending = batch_data->call->PendingBatchFind(
3864       "invoking recv_initial_metadata_ready for",
3865       [](grpc_transport_stream_op_batch* batch) {
3866         return batch->recv_initial_metadata &&
3867                batch->payload->recv_initial_metadata
3868                        .recv_initial_metadata_ready != nullptr;
3869       });
3870   GPR_ASSERT(pending != nullptr);
3871   // Return metadata.
3872   SubchannelCallRetryState* retry_state =
3873       static_cast<SubchannelCallRetryState*>(
3874           batch_data->lb_call->GetParentData());
3875   grpc_metadata_batch_move(
3876       &retry_state->recv_initial_metadata,
3877       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
3878   // Update bookkeeping.
3879   // Note: Need to do this before invoking the callback, since invoking
3880   // the callback will result in yielding the call combiner.
3881   grpc_closure* recv_initial_metadata_ready =
3882       pending->batch->payload->recv_initial_metadata
3883           .recv_initial_metadata_ready;
3884   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3885       nullptr;
3886   batch_data->call->MaybeClearPendingBatch(pending);
3887   batch_data->Unref();
3888   // Invoke callback.
3889   Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
3890                GRPC_ERROR_REF(error));
3891 }
3892 
RecvInitialMetadataReady(void * arg,grpc_error * error)3893 void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) {
3894   SubchannelCallBatchData* batch_data =
3895       static_cast<SubchannelCallBatchData*>(arg);
3896   RetryingCall* call = batch_data->call;
3897   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3898     gpr_log(
3899         GPR_INFO,
3900         "chand=%p retrying_call=%p: got recv_initial_metadata_ready, error=%s",
3901         call->chand_, call, grpc_error_string(error));
3902   }
3903   SubchannelCallRetryState* retry_state =
3904       static_cast<SubchannelCallRetryState*>(
3905           batch_data->lb_call->GetParentData());
3906   retry_state->completed_recv_initial_metadata = true;
3907   // If a retry was already dispatched, then we're not going to use the
3908   // result of this recv_initial_metadata op, so do nothing.
3909   if (retry_state->retry_dispatched) {
3910     GRPC_CALL_COMBINER_STOP(
3911         call->call_combiner_,
3912         "recv_initial_metadata_ready after retry dispatched");
3913     return;
3914   }
3915   // If we got an error or a Trailers-Only response and have not yet gotten
3916   // the recv_trailing_metadata_ready callback, then defer propagating this
3917   // callback back to the surface.  We can evaluate whether to retry when
3918   // recv_trailing_metadata comes back.
3919   if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
3920                     error != GRPC_ERROR_NONE) &&
3921                    !retry_state->completed_recv_trailing_metadata)) {
3922     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3923       gpr_log(
3924           GPR_INFO,
3925           "chand=%p retrying_call=%p: deferring recv_initial_metadata_ready "
3926           "(Trailers-Only)",
3927           call->chand_, call);
3928     }
3929     retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
3930     retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
3931     if (!retry_state->started_recv_trailing_metadata) {
3932       // recv_trailing_metadata not yet started by application; start it
3933       // ourselves to get status.
3934       call->StartInternalRecvTrailingMetadata();
3935     } else {
3936       GRPC_CALL_COMBINER_STOP(
3937           call->call_combiner_,
3938           "recv_initial_metadata_ready trailers-only or error");
3939     }
3940     return;
3941   }
3942   // Received valid initial metadata, so commit the call.
3943   call->RetryCommit(retry_state);
3944   // Invoke the callback to return the result to the surface.
3945   // Manually invoking a callback function; it does not take ownership of error.
3946   call->InvokeRecvInitialMetadataCallback(batch_data, error);
3947 }
3948 
3949 //
3950 // recv_message callback handling
3951 //
3952 
InvokeRecvMessageCallback(void * arg,grpc_error * error)3953 void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
3954   SubchannelCallBatchData* batch_data =
3955       static_cast<SubchannelCallBatchData*>(arg);
3956   RetryingCall* call = batch_data->call;
3957   // Find pending op.
3958   PendingBatch* pending = call->PendingBatchFind(
3959       "invoking recv_message_ready for",
3960       [](grpc_transport_stream_op_batch* batch) {
3961         return batch->recv_message &&
3962                batch->payload->recv_message.recv_message_ready != nullptr;
3963       });
3964   GPR_ASSERT(pending != nullptr);
3965   // Return payload.
3966   SubchannelCallRetryState* retry_state =
3967       static_cast<SubchannelCallRetryState*>(
3968           batch_data->lb_call->GetParentData());
3969   *pending->batch->payload->recv_message.recv_message =
3970       std::move(retry_state->recv_message);
3971   // Update bookkeeping.
3972   // Note: Need to do this before invoking the callback, since invoking
3973   // the callback will result in yielding the call combiner.
3974   grpc_closure* recv_message_ready =
3975       pending->batch->payload->recv_message.recv_message_ready;
3976   pending->batch->payload->recv_message.recv_message_ready = nullptr;
3977   call->MaybeClearPendingBatch(pending);
3978   batch_data->Unref();
3979   // Invoke callback.
3980   Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
3981 }
3982 
RecvMessageReady(void * arg,grpc_error * error)3983 void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) {
3984   SubchannelCallBatchData* batch_data =
3985       static_cast<SubchannelCallBatchData*>(arg);
3986   RetryingCall* call = batch_data->call;
3987   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3988     gpr_log(GPR_INFO,
3989             "chand=%p retrying_call=%p: got recv_message_ready, error=%s",
3990             call->chand_, call, grpc_error_string(error));
3991   }
3992   SubchannelCallRetryState* retry_state =
3993       static_cast<SubchannelCallRetryState*>(
3994           batch_data->lb_call->GetParentData());
3995   ++retry_state->completed_recv_message_count;
3996   // If a retry was already dispatched, then we're not going to use the
3997   // result of this recv_message op, so do nothing.
3998   if (retry_state->retry_dispatched) {
3999     GRPC_CALL_COMBINER_STOP(call->call_combiner_,
4000                             "recv_message_ready after retry dispatched");
4001     return;
4002   }
4003   // If we got an error or the payload was nullptr and we have not yet gotten
4004   // the recv_trailing_metadata_ready callback, then defer propagating this
4005   // callback back to the surface.  We can evaluate whether to retry when
4006   // recv_trailing_metadata comes back.
4007   if (GPR_UNLIKELY(
4008           (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
4009           !retry_state->completed_recv_trailing_metadata)) {
4010     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4011       gpr_log(
4012           GPR_INFO,
4013           "chand=%p retrying_call=%p: deferring recv_message_ready (nullptr "
4014           "message and recv_trailing_metadata pending)",
4015           call->chand_, call);
4016     }
4017     retry_state->recv_message_ready_deferred_batch = batch_data;
4018     retry_state->recv_message_error = GRPC_ERROR_REF(error);
4019     if (!retry_state->started_recv_trailing_metadata) {
4020       // recv_trailing_metadata not yet started by application; start it
4021       // ourselves to get status.
4022       call->StartInternalRecvTrailingMetadata();
4023     } else {
4024       GRPC_CALL_COMBINER_STOP(call->call_combiner_, "recv_message_ready null");
4025     }
4026     return;
4027   }
4028   // Received a valid message, so commit the call.
4029   call->RetryCommit(retry_state);
4030   // Invoke the callback to return the result to the surface.
4031   // Manually invoking a callback function; it does not take ownership of error.
4032   call->InvokeRecvMessageCallback(batch_data, error);
4033 }
4034 
4035 //
4036 // recv_trailing_metadata handling
4037 //
4038 
GetCallStatus(grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)4039 void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch,
4040                                  grpc_error* error, grpc_status_code* status,
4041                                  grpc_mdelem** server_pushback_md) {
4042   if (error != GRPC_ERROR_NONE) {
4043     grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
4044   } else {
4045     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
4046     *status =
4047         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
4048     if (server_pushback_md != nullptr &&
4049         md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
4050       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
4051     }
4052   }
4053   GRPC_ERROR_UNREF(error);
4054 }
4055 
AddClosureForRecvTrailingMetadataReady(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4056 void RetryingCall::AddClosureForRecvTrailingMetadataReady(
4057     SubchannelCallBatchData* batch_data, grpc_error* error,
4058     CallCombinerClosureList* closures) {
4059   // Find pending batch.
4060   PendingBatch* pending = PendingBatchFind(
4061       "invoking recv_trailing_metadata for",
4062       [](grpc_transport_stream_op_batch* batch) {
4063         return batch->recv_trailing_metadata &&
4064                batch->payload->recv_trailing_metadata
4065                        .recv_trailing_metadata_ready != nullptr;
4066       });
4067   // If we generated the recv_trailing_metadata op internally via
4068   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
4069   if (pending == nullptr) {
4070     GRPC_ERROR_UNREF(error);
4071     return;
4072   }
4073   // Return metadata.
4074   SubchannelCallRetryState* retry_state =
4075       static_cast<SubchannelCallRetryState*>(
4076           batch_data->lb_call->GetParentData());
4077   grpc_metadata_batch_move(
4078       &retry_state->recv_trailing_metadata,
4079       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
4080   // Add closure.
4081   closures->Add(pending->batch->payload->recv_trailing_metadata
4082                     .recv_trailing_metadata_ready,
4083                 error, "recv_trailing_metadata_ready for pending batch");
4084   // Update bookkeeping.
4085   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
4086       nullptr;
4087   MaybeClearPendingBatch(pending);
4088 }
4089 
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4090 void RetryingCall::AddClosuresForDeferredRecvCallbacks(
4091     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4092     CallCombinerClosureList* closures) {
4093   if (batch_data->batch.recv_trailing_metadata) {
4094     // Add closure for deferred recv_initial_metadata_ready.
4095     if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
4096                      nullptr)) {
4097       GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4098                         InvokeRecvInitialMetadataCallback,
4099                         retry_state->recv_initial_metadata_ready_deferred_batch,
4100                         grpc_schedule_on_exec_ctx);
4101       closures->Add(&retry_state->recv_initial_metadata_ready,
4102                     retry_state->recv_initial_metadata_error,
4103                     "resuming recv_initial_metadata_ready");
4104       retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
4105     }
4106     // Add closure for deferred recv_message_ready.
4107     if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
4108                      nullptr)) {
4109       GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
4110                         InvokeRecvMessageCallback,
4111                         retry_state->recv_message_ready_deferred_batch,
4112                         grpc_schedule_on_exec_ctx);
4113       closures->Add(&retry_state->recv_message_ready,
4114                     retry_state->recv_message_error,
4115                     "resuming recv_message_ready");
4116       retry_state->recv_message_ready_deferred_batch = nullptr;
4117     }
4118   }
4119 }
4120 
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)4121 bool RetryingCall::PendingBatchIsUnstarted(
4122     PendingBatch* pending, SubchannelCallRetryState* retry_state) {
4123   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
4124     return false;
4125   }
4126   if (pending->batch->send_initial_metadata &&
4127       !retry_state->started_send_initial_metadata) {
4128     return true;
4129   }
4130   if (pending->batch->send_message &&
4131       retry_state->started_send_message_count < send_messages_.size()) {
4132     return true;
4133   }
4134   if (pending->batch->send_trailing_metadata &&
4135       !retry_state->started_send_trailing_metadata) {
4136     return true;
4137   }
4138   return false;
4139 }
4140 
AddClosuresToFailUnstartedPendingBatches(SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)4141 void RetryingCall::AddClosuresToFailUnstartedPendingBatches(
4142     SubchannelCallRetryState* retry_state, grpc_error* error,
4143     CallCombinerClosureList* closures) {
4144   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4145     PendingBatch* pending = &pending_batches_[i];
4146     if (PendingBatchIsUnstarted(pending, retry_state)) {
4147       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4148         gpr_log(GPR_INFO,
4149                 "chand=%p retrying_call=%p: failing unstarted pending batch at "
4150                 "index "
4151                 "%" PRIuPTR,
4152                 chand_, this, i);
4153       }
4154       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
4155                     "failing on_complete for pending batch");
4156       pending->batch->on_complete = nullptr;
4157       MaybeClearPendingBatch(pending);
4158     }
4159   }
4160   GRPC_ERROR_UNREF(error);
4161 }
4162 
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)4163 void RetryingCall::RunClosuresForCompletedCall(
4164     SubchannelCallBatchData* batch_data, grpc_error* error) {
4165   SubchannelCallRetryState* retry_state =
4166       static_cast<SubchannelCallRetryState*>(
4167           batch_data->lb_call->GetParentData());
4168   // Construct list of closures to execute.
4169   CallCombinerClosureList closures;
4170   // First, add closure for recv_trailing_metadata_ready.
4171   AddClosureForRecvTrailingMetadataReady(batch_data, GRPC_ERROR_REF(error),
4172                                          &closures);
4173   // If there are deferred recv_initial_metadata_ready or recv_message_ready
4174   // callbacks, add them to closures.
4175   AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
4176   // Add closures to fail any pending batches that have not yet been started.
4177   AddClosuresToFailUnstartedPendingBatches(retry_state, GRPC_ERROR_REF(error),
4178                                            &closures);
4179   // Don't need batch_data anymore.
4180   batch_data->Unref();
4181   // Schedule all of the closures identified above.
4182   // Note: This will release the call combiner.
4183   closures.RunClosures(call_combiner_);
4184   GRPC_ERROR_UNREF(error);
4185 }
4186 
RecvTrailingMetadataReady(void * arg,grpc_error * error)4187 void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
4188   SubchannelCallBatchData* batch_data =
4189       static_cast<SubchannelCallBatchData*>(arg);
4190   RetryingCall* call = batch_data->call;
4191   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4192     gpr_log(
4193         GPR_INFO,
4194         "chand=%p retrying_call=%p: got recv_trailing_metadata_ready, error=%s",
4195         call->chand_, call, grpc_error_string(error));
4196   }
4197   SubchannelCallRetryState* retry_state =
4198       static_cast<SubchannelCallRetryState*>(
4199           batch_data->lb_call->GetParentData());
4200   retry_state->completed_recv_trailing_metadata = true;
4201   // Get the call's status and check for server pushback metadata.
4202   grpc_status_code status = GRPC_STATUS_OK;
4203   grpc_mdelem* server_pushback_md = nullptr;
4204   grpc_metadata_batch* md_batch =
4205       batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
4206   call->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
4207                       &server_pushback_md);
4208   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4209     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call finished, status=%s",
4210             call->chand_, call, grpc_status_code_to_string(status));
4211   }
4212   // Check if we should retry.
4213   if (call->MaybeRetry(batch_data, status, server_pushback_md)) {
4214     // Unref batch_data for deferred recv_initial_metadata_ready or
4215     // recv_message_ready callbacks, if any.
4216     if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
4217       batch_data->Unref();
4218       GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
4219     }
4220     if (retry_state->recv_message_ready_deferred_batch != nullptr) {
4221       batch_data->Unref();
4222       GRPC_ERROR_UNREF(retry_state->recv_message_error);
4223     }
4224     batch_data->Unref();
4225     return;
4226   }
4227   // Not retrying, so commit the call.
4228   call->RetryCommit(retry_state);
4229   // Run any necessary closures.
4230   call->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
4231 }
4232 
4233 //
4234 // on_complete callback handling
4235 //
4236 
AddClosuresForCompletedPendingBatch(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4237 void RetryingCall::AddClosuresForCompletedPendingBatch(
4238     SubchannelCallBatchData* batch_data, grpc_error* error,
4239     CallCombinerClosureList* closures) {
4240   PendingBatch* pending = PendingBatchFind(
4241       "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
4242         // Match the pending batch with the same set of send ops as the
4243         // subchannel batch we've just completed.
4244         return batch->on_complete != nullptr &&
4245                batch_data->batch.send_initial_metadata ==
4246                    batch->send_initial_metadata &&
4247                batch_data->batch.send_message == batch->send_message &&
4248                batch_data->batch.send_trailing_metadata ==
4249                    batch->send_trailing_metadata;
4250       });
4251   // If batch_data is a replay batch, then there will be no pending
4252   // batch to complete.
4253   if (pending == nullptr) {
4254     GRPC_ERROR_UNREF(error);
4255     return;
4256   }
4257   // Add closure.
4258   closures->Add(pending->batch->on_complete, error,
4259                 "on_complete for pending batch");
4260   pending->batch->on_complete = nullptr;
4261   MaybeClearPendingBatch(pending);
4262 }
4263 
AddClosuresForReplayOrPendingSendOps(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4264 void RetryingCall::AddClosuresForReplayOrPendingSendOps(
4265     SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4266     CallCombinerClosureList* closures) {
4267   bool have_pending_send_message_ops =
4268       retry_state->started_send_message_count < send_messages_.size();
4269   bool have_pending_send_trailing_metadata_op =
4270       seen_send_trailing_metadata_ &&
4271       !retry_state->started_send_trailing_metadata;
4272   if (!have_pending_send_message_ops &&
4273       !have_pending_send_trailing_metadata_op) {
4274     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4275       PendingBatch* pending = &pending_batches_[i];
4276       grpc_transport_stream_op_batch* batch = pending->batch;
4277       if (batch == nullptr || pending->send_ops_cached) continue;
4278       if (batch->send_message) have_pending_send_message_ops = true;
4279       if (batch->send_trailing_metadata) {
4280         have_pending_send_trailing_metadata_op = true;
4281       }
4282     }
4283   }
4284   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
4285     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4286       gpr_log(GPR_INFO,
4287               "chand=%p retrying_call=%p: starting next batch for pending send "
4288               "op(s)",
4289               chand_, this);
4290     }
4291     GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
4292                       StartRetriableSubchannelBatches, this,
4293                       grpc_schedule_on_exec_ctx);
4294     closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
4295                   "starting next batch for send_* op(s)");
4296   }
4297 }
4298 
OnComplete(void * arg,grpc_error * error)4299 void RetryingCall::OnComplete(void* arg, grpc_error* error) {
4300   SubchannelCallBatchData* batch_data =
4301       static_cast<SubchannelCallBatchData*>(arg);
4302   RetryingCall* call = batch_data->call;
4303   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4304     gpr_log(GPR_INFO,
4305             "chand=%p retrying_call=%p: got on_complete, error=%s, batch=%s",
4306             call->chand_, call, grpc_error_string(error),
4307             grpc_transport_stream_op_batch_string(&batch_data->batch).c_str());
4308   }
4309   SubchannelCallRetryState* retry_state =
4310       static_cast<SubchannelCallRetryState*>(
4311           batch_data->lb_call->GetParentData());
4312   // Update bookkeeping in retry_state.
4313   if (batch_data->batch.send_initial_metadata) {
4314     retry_state->completed_send_initial_metadata = true;
4315   }
4316   if (batch_data->batch.send_message) {
4317     ++retry_state->completed_send_message_count;
4318   }
4319   if (batch_data->batch.send_trailing_metadata) {
4320     retry_state->completed_send_trailing_metadata = true;
4321   }
4322   // If the call is committed, free cached data for send ops that we've just
4323   // completed.
4324   if (call->retry_committed_) {
4325     call->FreeCachedSendOpDataForCompletedBatch(batch_data, retry_state);
4326   }
4327   // Construct list of closures to execute.
4328   CallCombinerClosureList closures;
4329   // If a retry was already dispatched, that means we saw
4330   // recv_trailing_metadata before this, so we do nothing here.
4331   // Otherwise, invoke the callback to return the result to the surface.
4332   if (!retry_state->retry_dispatched) {
4333     // Add closure for the completed pending batch, if any.
4334     call->AddClosuresForCompletedPendingBatch(batch_data, GRPC_ERROR_REF(error),
4335                                               &closures);
4336     // If needed, add a callback to start any replay or pending send ops on
4337     // the subchannel call.
4338     if (!retry_state->completed_recv_trailing_metadata) {
4339       call->AddClosuresForReplayOrPendingSendOps(batch_data, retry_state,
4340                                                  &closures);
4341     }
4342   }
4343   // Track number of pending subchannel send batches and determine if this
4344   // was the last one.
4345   --call->num_pending_retriable_subchannel_send_batches_;
4346   const bool last_send_batch_complete =
4347       call->num_pending_retriable_subchannel_send_batches_ == 0;
4348   // Don't need batch_data anymore.
4349   batch_data->Unref();
4350   // Schedule all of the closures identified above.
4351   // Note: This yeilds the call combiner.
4352   closures.RunClosures(call->call_combiner_);
4353   // If this was the last subchannel send batch, unref the call stack.
4354   if (last_send_batch_complete) {
4355     GRPC_CALL_STACK_UNREF(call->owning_call_, "subchannel_send_batches");
4356   }
4357 }
4358 
4359 //
4360 // subchannel batch construction
4361 //
4362 
StartBatchInCallCombiner(void * arg,grpc_error *)4363 void RetryingCall::StartBatchInCallCombiner(void* arg,
4364                                             grpc_error* /*ignored*/) {
4365   grpc_transport_stream_op_batch* batch =
4366       static_cast<grpc_transport_stream_op_batch*>(arg);
4367   auto* lb_call =
4368       static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
4369   // Note: This will release the call combiner.
4370   lb_call->StartTransportStreamOpBatch(batch);
4371 }
4372 
AddClosureForSubchannelBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)4373 void RetryingCall::AddClosureForSubchannelBatch(
4374     grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
4375   batch->handler_private.extra_arg = lb_call_.get();
4376   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
4377                     batch, grpc_schedule_on_exec_ctx);
4378   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4379     gpr_log(GPR_INFO,
4380             "chand=%p retrying_call=%p: starting subchannel batch: %s", chand_,
4381             this, grpc_transport_stream_op_batch_string(batch).c_str());
4382   }
4383   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
4384                 "start_subchannel_batch");
4385 }
4386 
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4387 void RetryingCall::AddRetriableSendInitialMetadataOp(
4388     SubchannelCallRetryState* retry_state,
4389     SubchannelCallBatchData* batch_data) {
4390   // Maps the number of retries to the corresponding metadata value slice.
4391   const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
4392                                              &GRPC_MDSTR_3, &GRPC_MDSTR_4};
4393   // We need to make a copy of the metadata batch for each attempt, since
4394   // the filters in the subchannel stack may modify this batch, and we don't
4395   // want those modifications to be passed forward to subsequent attempts.
4396   //
4397   // If we've already completed one or more attempts, add the
4398   // grpc-retry-attempts header.
4399   retry_state->send_initial_metadata_storage =
4400       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4401           sizeof(grpc_linked_mdelem) *
4402           (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
4403   grpc_metadata_batch_copy(&send_initial_metadata_,
4404                            &retry_state->send_initial_metadata,
4405                            retry_state->send_initial_metadata_storage);
4406   if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
4407                        .grpc_previous_rpc_attempts != nullptr)) {
4408     grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
4409                                GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4410   }
4411   if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
4412     grpc_mdelem retry_md = grpc_mdelem_create(
4413         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
4414         *retry_count_strings[num_attempts_completed_ - 1], nullptr);
4415     grpc_error* error = grpc_metadata_batch_add_tail(
4416         &retry_state->send_initial_metadata,
4417         &retry_state
4418              ->send_initial_metadata_storage[send_initial_metadata_.list.count],
4419         retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4420     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
4421       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
4422               grpc_error_string(error));
4423       GPR_ASSERT(false);
4424     }
4425   }
4426   retry_state->started_send_initial_metadata = true;
4427   batch_data->batch.send_initial_metadata = true;
4428   batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
4429       &retry_state->send_initial_metadata;
4430   batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
4431       send_initial_metadata_flags_;
4432   batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
4433 }
4434 
AddRetriableSendMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4435 void RetryingCall::AddRetriableSendMessageOp(
4436     SubchannelCallRetryState* retry_state,
4437     SubchannelCallBatchData* batch_data) {
4438   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4439     gpr_log(GPR_INFO,
4440             "chand=%p retrying_call=%p: starting calld->send_messages[%" PRIuPTR
4441             "]",
4442             chand_, this, retry_state->started_send_message_count);
4443   }
4444   ByteStreamCache* cache =
4445       send_messages_[retry_state->started_send_message_count];
4446   ++retry_state->started_send_message_count;
4447   retry_state->send_message.Init(cache);
4448   batch_data->batch.send_message = true;
4449   batch_data->batch.payload->send_message.send_message.reset(
4450       retry_state->send_message.get());
4451 }
4452 
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4453 void RetryingCall::AddRetriableSendTrailingMetadataOp(
4454     SubchannelCallRetryState* retry_state,
4455     SubchannelCallBatchData* batch_data) {
4456   // We need to make a copy of the metadata batch for each attempt, since
4457   // the filters in the subchannel stack may modify this batch, and we don't
4458   // want those modifications to be passed forward to subsequent attempts.
4459   retry_state->send_trailing_metadata_storage =
4460       static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4461           sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
4462   grpc_metadata_batch_copy(&send_trailing_metadata_,
4463                            &retry_state->send_trailing_metadata,
4464                            retry_state->send_trailing_metadata_storage);
4465   retry_state->started_send_trailing_metadata = true;
4466   batch_data->batch.send_trailing_metadata = true;
4467   batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
4468       &retry_state->send_trailing_metadata;
4469 }
4470 
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4471 void RetryingCall::AddRetriableRecvInitialMetadataOp(
4472     SubchannelCallRetryState* retry_state,
4473     SubchannelCallBatchData* batch_data) {
4474   retry_state->started_recv_initial_metadata = true;
4475   batch_data->batch.recv_initial_metadata = true;
4476   grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
4477   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
4478       &retry_state->recv_initial_metadata;
4479   batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
4480       &retry_state->trailing_metadata_available;
4481   GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4482                     RecvInitialMetadataReady, batch_data,
4483                     grpc_schedule_on_exec_ctx);
4484   batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
4485       &retry_state->recv_initial_metadata_ready;
4486 }
4487 
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4488 void RetryingCall::AddRetriableRecvMessageOp(
4489     SubchannelCallRetryState* retry_state,
4490     SubchannelCallBatchData* batch_data) {
4491   ++retry_state->started_recv_message_count;
4492   batch_data->batch.recv_message = true;
4493   batch_data->batch.payload->recv_message.recv_message =
4494       &retry_state->recv_message;
4495   GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
4496                     batch_data, grpc_schedule_on_exec_ctx);
4497   batch_data->batch.payload->recv_message.recv_message_ready =
4498       &retry_state->recv_message_ready;
4499 }
4500 
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4501 void RetryingCall::AddRetriableRecvTrailingMetadataOp(
4502     SubchannelCallRetryState* retry_state,
4503     SubchannelCallBatchData* batch_data) {
4504   retry_state->started_recv_trailing_metadata = true;
4505   batch_data->batch.recv_trailing_metadata = true;
4506   grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
4507   batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
4508       &retry_state->recv_trailing_metadata;
4509   batch_data->batch.payload->recv_trailing_metadata.collect_stats =
4510       &retry_state->collect_stats;
4511   GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
4512                     RecvTrailingMetadataReady, batch_data,
4513                     grpc_schedule_on_exec_ctx);
4514   batch_data->batch.payload->recv_trailing_metadata
4515       .recv_trailing_metadata_ready =
4516       &retry_state->recv_trailing_metadata_ready;
4517 }
4518 
StartInternalRecvTrailingMetadata()4519 void RetryingCall::StartInternalRecvTrailingMetadata() {
4520   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4521     gpr_log(
4522         GPR_INFO,
4523         "chand=%p retrying_call=%p: call failed but recv_trailing_metadata not "
4524         "started; starting it internally",
4525         chand_, this);
4526   }
4527   SubchannelCallRetryState* retry_state =
4528       static_cast<SubchannelCallRetryState*>(lb_call_->GetParentData());
4529   // Create batch_data with 2 refs, since this batch will be unreffed twice:
4530   // once for the recv_trailing_metadata_ready callback when the subchannel
4531   // batch returns, and again when we actually get a recv_trailing_metadata
4532   // op from the surface.
4533   SubchannelCallBatchData* batch_data =
4534       SubchannelCallBatchData::Create(this, 2, false /* set_on_complete */);
4535   AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4536   retry_state->recv_trailing_metadata_internal_batch = batch_data;
4537   // Note: This will release the call combiner.
4538   lb_call_->StartTransportStreamOpBatch(&batch_data->batch);
4539 }
4540 
4541 // If there are any cached send ops that need to be replayed on the
4542 // current subchannel call, creates and returns a new subchannel batch
4543 // to replay those ops.  Otherwise, returns nullptr.
4544 RetryingCall::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(SubchannelCallRetryState * retry_state)4545 RetryingCall::MaybeCreateSubchannelBatchForReplay(
4546     SubchannelCallRetryState* retry_state) {
4547   SubchannelCallBatchData* replay_batch_data = nullptr;
4548   // send_initial_metadata.
4549   if (seen_send_initial_metadata_ &&
4550       !retry_state->started_send_initial_metadata &&
4551       !pending_send_initial_metadata_) {
4552     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4553       gpr_log(GPR_INFO,
4554               "chand=%p retrying_call=%p: replaying previously completed "
4555               "send_initial_metadata op",
4556               chand_, this);
4557     }
4558     replay_batch_data =
4559         SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4560     AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
4561   }
4562   // send_message.
4563   // Note that we can only have one send_message op in flight at a time.
4564   if (retry_state->started_send_message_count < send_messages_.size() &&
4565       retry_state->started_send_message_count ==
4566           retry_state->completed_send_message_count &&
4567       !pending_send_message_) {
4568     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4569       gpr_log(GPR_INFO,
4570               "chand=%p retrying_call=%p: replaying previously completed "
4571               "send_message op",
4572               chand_, this);
4573     }
4574     if (replay_batch_data == nullptr) {
4575       replay_batch_data =
4576           SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4577     }
4578     AddRetriableSendMessageOp(retry_state, replay_batch_data);
4579   }
4580   // send_trailing_metadata.
4581   // Note that we only add this op if we have no more send_message ops
4582   // to start, since we can't send down any more send_message ops after
4583   // send_trailing_metadata.
4584   if (seen_send_trailing_metadata_ &&
4585       retry_state->started_send_message_count == send_messages_.size() &&
4586       !retry_state->started_send_trailing_metadata &&
4587       !pending_send_trailing_metadata_) {
4588     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4589       gpr_log(GPR_INFO,
4590               "chand=%p retrying_call=%p: replaying previously completed "
4591               "send_trailing_metadata op",
4592               chand_, this);
4593     }
4594     if (replay_batch_data == nullptr) {
4595       replay_batch_data =
4596           SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4597     }
4598     AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
4599   }
4600   return replay_batch_data;
4601 }
4602 
AddSubchannelBatchesForPendingBatches(SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4603 void RetryingCall::AddSubchannelBatchesForPendingBatches(
4604     SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
4605   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4606     PendingBatch* pending = &pending_batches_[i];
4607     grpc_transport_stream_op_batch* batch = pending->batch;
4608     if (batch == nullptr) continue;
4609     // Skip any batch that either (a) has already been started on this
4610     // subchannel call or (b) we can't start yet because we're still
4611     // replaying send ops that need to be completed first.
4612     // TODO(roth): Note that if any one op in the batch can't be sent
4613     // yet due to ops that we're replaying, we don't start any of the ops
4614     // in the batch.  This is probably okay, but it could conceivably
4615     // lead to increased latency in some cases -- e.g., we could delay
4616     // starting a recv op due to it being in the same batch with a send
4617     // op.  If/when we revamp the callback protocol in
4618     // transport_stream_op_batch, we may be able to fix this.
4619     if (batch->send_initial_metadata &&
4620         retry_state->started_send_initial_metadata) {
4621       continue;
4622     }
4623     if (batch->send_message && retry_state->completed_send_message_count <
4624                                    retry_state->started_send_message_count) {
4625       continue;
4626     }
4627     // Note that we only start send_trailing_metadata if we have no more
4628     // send_message ops to start, since we can't send down any more
4629     // send_message ops after send_trailing_metadata.
4630     if (batch->send_trailing_metadata &&
4631         (retry_state->started_send_message_count + batch->send_message <
4632              send_messages_.size() ||
4633          retry_state->started_send_trailing_metadata)) {
4634       continue;
4635     }
4636     if (batch->recv_initial_metadata &&
4637         retry_state->started_recv_initial_metadata) {
4638       continue;
4639     }
4640     if (batch->recv_message && retry_state->completed_recv_message_count <
4641                                    retry_state->started_recv_message_count) {
4642       continue;
4643     }
4644     if (batch->recv_trailing_metadata &&
4645         retry_state->started_recv_trailing_metadata) {
4646       // If we previously completed a recv_trailing_metadata op
4647       // initiated by StartInternalRecvTrailingMetadata(), use the
4648       // result of that instead of trying to re-start this op.
4649       if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
4650                         nullptr))) {
4651         // If the batch completed, then trigger the completion callback
4652         // directly, so that we return the previously returned results to
4653         // the application.  Otherwise, just unref the internally
4654         // started subchannel batch, since we'll propagate the
4655         // completion when it completes.
4656         if (retry_state->completed_recv_trailing_metadata) {
4657           // Batches containing recv_trailing_metadata always succeed.
4658           closures->Add(
4659               &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
4660               "re-executing recv_trailing_metadata_ready to propagate "
4661               "internally triggered result");
4662         } else {
4663           retry_state->recv_trailing_metadata_internal_batch->Unref();
4664         }
4665         retry_state->recv_trailing_metadata_internal_batch = nullptr;
4666       }
4667       continue;
4668     }
4669     // If we're not retrying, just send the batch as-is.
4670     // TODO(roth): This condition doesn't seem exactly right -- maybe need a
4671     // notion of "draining" once we've committed and are done replaying?
4672     if (retry_policy_ == nullptr || retry_committed_) {
4673       AddClosureForSubchannelBatch(batch, closures);
4674       PendingBatchClear(pending);
4675       continue;
4676     }
4677     // Create batch with the right number of callbacks.
4678     const bool has_send_ops = batch->send_initial_metadata ||
4679                               batch->send_message ||
4680                               batch->send_trailing_metadata;
4681     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
4682                               batch->recv_message +
4683                               batch->recv_trailing_metadata;
4684     SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
4685         this, num_callbacks, has_send_ops /* set_on_complete */);
4686     // Cache send ops if needed.
4687     MaybeCacheSendOpsForBatch(pending);
4688     // send_initial_metadata.
4689     if (batch->send_initial_metadata) {
4690       AddRetriableSendInitialMetadataOp(retry_state, batch_data);
4691     }
4692     // send_message.
4693     if (batch->send_message) {
4694       AddRetriableSendMessageOp(retry_state, batch_data);
4695     }
4696     // send_trailing_metadata.
4697     if (batch->send_trailing_metadata) {
4698       AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
4699     }
4700     // recv_initial_metadata.
4701     if (batch->recv_initial_metadata) {
4702       // recv_flags is only used on the server side.
4703       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
4704       AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
4705     }
4706     // recv_message.
4707     if (batch->recv_message) {
4708       AddRetriableRecvMessageOp(retry_state, batch_data);
4709     }
4710     // recv_trailing_metadata.
4711     if (batch->recv_trailing_metadata) {
4712       AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4713     }
4714     AddClosureForSubchannelBatch(&batch_data->batch, closures);
4715     // Track number of pending subchannel send batches.
4716     // If this is the first one, take a ref to the call stack.
4717     if (batch->send_initial_metadata || batch->send_message ||
4718         batch->send_trailing_metadata) {
4719       if (num_pending_retriable_subchannel_send_batches_ == 0) {
4720         GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
4721       }
4722       ++num_pending_retriable_subchannel_send_batches_;
4723     }
4724   }
4725 }
4726 
StartRetriableSubchannelBatches(void * arg,grpc_error *)4727 void RetryingCall::StartRetriableSubchannelBatches(void* arg,
4728                                                    grpc_error* /*ignored*/) {
4729   RetryingCall* call = static_cast<RetryingCall*>(arg);
4730   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4731     gpr_log(GPR_INFO,
4732             "chand=%p retrying_call=%p: constructing retriable batches",
4733             call->chand_, call);
4734   }
4735   SubchannelCallRetryState* retry_state =
4736       static_cast<SubchannelCallRetryState*>(call->lb_call_->GetParentData());
4737   // Construct list of closures to execute, one for each pending batch.
4738   CallCombinerClosureList closures;
4739   // Replay previously-returned send_* ops if needed.
4740   SubchannelCallBatchData* replay_batch_data =
4741       call->MaybeCreateSubchannelBatchForReplay(retry_state);
4742   if (replay_batch_data != nullptr) {
4743     call->AddClosureForSubchannelBatch(&replay_batch_data->batch, &closures);
4744     // Track number of pending subchannel send batches.
4745     // If this is the first one, take a ref to the call stack.
4746     if (call->num_pending_retriable_subchannel_send_batches_ == 0) {
4747       GRPC_CALL_STACK_REF(call->owning_call_, "subchannel_send_batches");
4748     }
4749     ++call->num_pending_retriable_subchannel_send_batches_;
4750   }
4751   // Now add pending batches.
4752   call->AddSubchannelBatchesForPendingBatches(retry_state, &closures);
4753   // Start batches on subchannel call.
4754   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4755     gpr_log(GPR_INFO,
4756             "chand=%p retrying_call=%p: starting %" PRIuPTR
4757             " retriable batches on lb_call=%p",
4758             call->chand_, call, closures.size(), call->lb_call_.get());
4759   }
4760   // Note: This will yield the call combiner.
4761   closures.RunClosures(call->call_combiner_);
4762 }
4763 
CreateLbCall(void * arg,grpc_error *)4764 void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) {
4765   auto* call = static_cast<RetryingCall*>(arg);
4766   const size_t parent_data_size =
4767       call->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
4768   grpc_call_element_args args = {call->owning_call_,     nullptr,
4769                                  call->call_context_,    call->path_,
4770                                  call->call_start_time_, call->deadline_,
4771                                  call->arena_,           call->call_combiner_};
4772   call->lb_call_ = LoadBalancedCall::Create(call->chand_, args, call->pollent_,
4773                                             parent_data_size);
4774   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
4775     gpr_log(GPR_INFO, "chand=%p retrying_call=%p: create lb_call=%p",
4776             call->chand_, call, call->lb_call_.get());
4777   }
4778   if (parent_data_size > 0) {
4779     new (call->lb_call_->GetParentData())
4780         SubchannelCallRetryState(call->call_context_);
4781   }
4782   call->PendingBatchesResume();
4783 }
4784 
4785 //
4786 // LoadBalancedCall::Metadata
4787 //
4788 
4789 class LoadBalancedCall::Metadata
4790     : public LoadBalancingPolicy::MetadataInterface {
4791  public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)4792   Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
4793       : lb_call_(lb_call), batch_(batch) {}
4794 
Add(absl::string_view key,absl::string_view value)4795   void Add(absl::string_view key, absl::string_view value) override {
4796     grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
4797         lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
4798     linked_mdelem->md = grpc_mdelem_from_slices(
4799         ExternallyManagedSlice(key.data(), key.size()),
4800         ExternallyManagedSlice(value.data(), value.size()));
4801     GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
4802                GRPC_ERROR_NONE);
4803   }
4804 
begin() const4805   iterator begin() const override {
4806     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4807                   "iterator size too large");
4808     return iterator(
4809         this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
4810   }
end() const4811   iterator end() const override {
4812     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4813                   "iterator size too large");
4814     return iterator(this, 0);
4815   }
4816 
erase(iterator it)4817   iterator erase(iterator it) override {
4818     grpc_linked_mdelem* linked_mdelem =
4819         reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
4820     intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
4821     grpc_metadata_batch_remove(batch_, linked_mdelem);
4822     return iterator(this, handle);
4823   }
4824 
4825  private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const4826   grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
4827     if (entry != nullptr && batch_->idx.named.path == entry) {
4828       return entry->next;
4829     }
4830     return entry;
4831   }
4832 
IteratorHandleNext(intptr_t handle) const4833   intptr_t IteratorHandleNext(intptr_t handle) const override {
4834     grpc_linked_mdelem* linked_mdelem =
4835         reinterpret_cast<grpc_linked_mdelem*>(handle);
4836     return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
4837   }
4838 
IteratorHandleGet(intptr_t handle) const4839   std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
4840       intptr_t handle) const override {
4841     grpc_linked_mdelem* linked_mdelem =
4842         reinterpret_cast<grpc_linked_mdelem*>(handle);
4843     return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
4844                           StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
4845   }
4846 
4847   LoadBalancedCall* lb_call_;
4848   grpc_metadata_batch* batch_;
4849 };
4850 
4851 //
4852 // LoadBalancedCall::LbCallState
4853 //
4854 
4855 class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState {
4856  public:
LbCallState(LoadBalancedCall * lb_call)4857   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
4858 
Alloc(size_t size)4859   void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
4860 
GetBackendMetricData()4861   const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
4862       override {
4863     if (lb_call_->backend_metric_data_ == nullptr) {
4864       grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
4865                                    .x_endpoint_load_metrics_bin;
4866       if (md != nullptr) {
4867         lb_call_->backend_metric_data_ =
4868             ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
4869       }
4870     }
4871     return lb_call_->backend_metric_data_;
4872   }
4873 
ExperimentalGetCallAttribute(const char * key)4874   absl::string_view ExperimentalGetCallAttribute(const char* key) override {
4875     auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
4876         lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
4877     auto& call_attributes = service_config_call_data->call_attributes();
4878     auto it = call_attributes.find(key);
4879     if (it == call_attributes.end()) return absl::string_view();
4880     return it->second;
4881   }
4882 
4883  private:
4884   LoadBalancedCall* lb_call_;
4885 };
4886 
4887 //
4888 // LoadBalancedCall
4889 //
4890 
Create(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,size_t parent_data_size)4891 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Create(
4892     ChannelData* chand, const grpc_call_element_args& args,
4893     grpc_polling_entity* pollent, size_t parent_data_size) {
4894   const size_t alloc_size =
4895       parent_data_size > 0
4896           ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) +
4897              parent_data_size)
4898           : sizeof(LoadBalancedCall);
4899   auto* lb_call = static_cast<LoadBalancedCall*>(args.arena->Alloc(alloc_size));
4900   new (lb_call) LoadBalancedCall(chand, args, pollent);
4901   return lb_call;
4902 }
4903 
LoadBalancedCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent)4904 LoadBalancedCall::LoadBalancedCall(ChannelData* chand,
4905                                    const grpc_call_element_args& args,
4906                                    grpc_polling_entity* pollent)
4907     : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
4908                    ? "LoadBalancedCall"
4909                    : nullptr),
4910       chand_(chand),
4911       path_(grpc_slice_ref_internal(args.path)),
4912       call_start_time_(args.start_time),
4913       deadline_(args.deadline),
4914       arena_(args.arena),
4915       owning_call_(args.call_stack),
4916       call_combiner_(args.call_combiner),
4917       call_context_(args.context),
4918       pollent_(pollent) {}
4919 
~LoadBalancedCall()4920 LoadBalancedCall::~LoadBalancedCall() {
4921   grpc_slice_unref_internal(path_);
4922   GRPC_ERROR_UNREF(cancel_error_);
4923   if (backend_metric_data_ != nullptr) {
4924     backend_metric_data_
4925         ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
4926   }
4927   // Make sure there are no remaining pending batches.
4928   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4929     GPR_ASSERT(pending_batches_[i] == nullptr);
4930   }
4931 }
4932 
Ref()4933 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref() {
4934   IncrementRefCount();
4935   return RefCountedPtr<LoadBalancedCall>(this);
4936 }
4937 
Ref(const DebugLocation & location,const char * reason)4938 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref(
4939     const DebugLocation& location, const char* reason) {
4940   IncrementRefCount(location, reason);
4941   return RefCountedPtr<LoadBalancedCall>(this);
4942 }
4943 
Unref()4944 void LoadBalancedCall::Unref() {
4945   if (GPR_UNLIKELY(refs_.Unref())) {
4946     this->~LoadBalancedCall();
4947   }
4948 }
4949 
Unref(const DebugLocation & location,const char * reason)4950 void LoadBalancedCall::Unref(const DebugLocation& location,
4951                              const char* reason) {
4952   if (GPR_UNLIKELY(refs_.Unref(location, reason))) {
4953     this->~LoadBalancedCall();
4954   }
4955 }
4956 
IncrementRefCount()4957 void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); }
4958 
IncrementRefCount(const DebugLocation & location,const char * reason)4959 void LoadBalancedCall::IncrementRefCount(const DebugLocation& location,
4960                                          const char* reason) {
4961   refs_.Ref(location, reason);
4962 }
4963 
GetParentData()4964 void* LoadBalancedCall::GetParentData() {
4965   return reinterpret_cast<char*>(this) +
4966          GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall));
4967 }
4968 
GetBatchIndex(grpc_transport_stream_op_batch * batch)4969 size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
4970   // Note: It is important the send_initial_metadata be the first entry
4971   // here, since the code in pick_subchannel_locked() assumes it will be.
4972   if (batch->send_initial_metadata) return 0;
4973   if (batch->send_message) return 1;
4974   if (batch->send_trailing_metadata) return 2;
4975   if (batch->recv_initial_metadata) return 3;
4976   if (batch->recv_message) return 4;
4977   if (batch->recv_trailing_metadata) return 5;
4978   GPR_UNREACHABLE_CODE(return (size_t)-1);
4979 }
4980 
4981 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)4982 void LoadBalancedCall::PendingBatchesAdd(
4983     grpc_transport_stream_op_batch* batch) {
4984   const size_t idx = GetBatchIndex(batch);
4985   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4986     gpr_log(GPR_INFO,
4987             "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
4988             chand_, this, idx);
4989   }
4990   GPR_ASSERT(pending_batches_[idx] == nullptr);
4991   pending_batches_[idx] = batch;
4992 }
4993 
4994 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)4995 void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg,
4996                                                       grpc_error* error) {
4997   grpc_transport_stream_op_batch* batch =
4998       static_cast<grpc_transport_stream_op_batch*>(arg);
4999   auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
5000   // Note: This will release the call combiner.
5001   grpc_transport_stream_op_batch_finish_with_failure(
5002       batch, GRPC_ERROR_REF(error), self->call_combiner_);
5003 }
5004 
5005 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)5006 void LoadBalancedCall::PendingBatchesFail(
5007     grpc_error* error,
5008     YieldCallCombinerPredicate yield_call_combiner_predicate) {
5009   GPR_ASSERT(error != GRPC_ERROR_NONE);
5010   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5011     size_t num_batches = 0;
5012     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5013       if (pending_batches_[i] != nullptr) ++num_batches;
5014     }
5015     gpr_log(GPR_INFO,
5016             "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
5017             chand_, this, num_batches, grpc_error_string(error));
5018   }
5019   CallCombinerClosureList closures;
5020   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5021     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5022     if (batch != nullptr) {
5023       batch->handler_private.extra_arg = this;
5024       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5025                         FailPendingBatchInCallCombiner, batch,
5026                         grpc_schedule_on_exec_ctx);
5027       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
5028                    "PendingBatchesFail");
5029       batch = nullptr;
5030     }
5031   }
5032   if (yield_call_combiner_predicate(closures)) {
5033     closures.RunClosures(call_combiner_);
5034   } else {
5035     closures.RunClosuresWithoutYielding(call_combiner_);
5036   }
5037   GRPC_ERROR_UNREF(error);
5038 }
5039 
5040 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)5041 void LoadBalancedCall::ResumePendingBatchInCallCombiner(
5042     void* arg, grpc_error* /*ignored*/) {
5043   grpc_transport_stream_op_batch* batch =
5044       static_cast<grpc_transport_stream_op_batch*>(arg);
5045   SubchannelCall* subchannel_call =
5046       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
5047   // Note: This will release the call combiner.
5048   subchannel_call->StartTransportStreamOpBatch(batch);
5049 }
5050 
5051 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()5052 void LoadBalancedCall::PendingBatchesResume() {
5053   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5054     size_t num_batches = 0;
5055     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5056       if (pending_batches_[i] != nullptr) ++num_batches;
5057     }
5058     gpr_log(GPR_INFO,
5059             "chand=%p lb_call=%p: starting %" PRIuPTR
5060             " pending batches on subchannel_call=%p",
5061             chand_, this, num_batches, subchannel_call_.get());
5062   }
5063   CallCombinerClosureList closures;
5064   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5065     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5066     if (batch != nullptr) {
5067       batch->handler_private.extra_arg = subchannel_call_.get();
5068       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5069                         ResumePendingBatchInCallCombiner, batch,
5070                         grpc_schedule_on_exec_ctx);
5071       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
5072                    "PendingBatchesResume");
5073       batch = nullptr;
5074     }
5075   }
5076   // Note: This will release the call combiner.
5077   closures.RunClosures(call_combiner_);
5078 }
5079 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)5080 void LoadBalancedCall::StartTransportStreamOpBatch(
5081     grpc_transport_stream_op_batch* batch) {
5082   // Intercept recv_trailing_metadata_ready for LB callback.
5083   if (batch->recv_trailing_metadata) {
5084     InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
5085   }
5086   // If we've previously been cancelled, immediately fail any new batches.
5087   if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
5088     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5089       gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
5090               chand_, this, grpc_error_string(cancel_error_));
5091     }
5092     // Note: This will release the call combiner.
5093     grpc_transport_stream_op_batch_finish_with_failure(
5094         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5095     return;
5096   }
5097   // Handle cancellation.
5098   if (GPR_UNLIKELY(batch->cancel_stream)) {
5099     // Stash a copy of cancel_error in our call data, so that we can use
5100     // it for subsequent operations.  This ensures that if the call is
5101     // cancelled before any batches are passed down (e.g., if the deadline
5102     // is in the past when the call starts), we can return the right
5103     // error to the caller when the first batch does get passed down.
5104     GRPC_ERROR_UNREF(cancel_error_);
5105     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
5106     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5107       gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
5108               chand_, this, grpc_error_string(cancel_error_));
5109     }
5110     // If we do not have a subchannel call (i.e., a pick has not yet
5111     // been started), fail all pending batches.  Otherwise, send the
5112     // cancellation down to the subchannel call.
5113     if (subchannel_call_ == nullptr) {
5114       PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
5115       // Note: This will release the call combiner.
5116       grpc_transport_stream_op_batch_finish_with_failure(
5117           batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5118     } else {
5119       // Note: This will release the call combiner.
5120       subchannel_call_->StartTransportStreamOpBatch(batch);
5121     }
5122     return;
5123   }
5124   // Add the batch to the pending list.
5125   PendingBatchesAdd(batch);
5126   // Check if we've already gotten a subchannel call.
5127   // Note that once we have picked a subchannel, we do not need to acquire
5128   // the channel's data plane mutex, which is more efficient (especially for
5129   // streaming calls).
5130   if (subchannel_call_ != nullptr) {
5131     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5132       gpr_log(GPR_INFO,
5133               "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
5134               chand_, this, subchannel_call_.get());
5135     }
5136     PendingBatchesResume();
5137     return;
5138   }
5139   // We do not yet have a subchannel call.
5140   // For batches containing a send_initial_metadata op, acquire the
5141   // channel's data plane mutex to pick a subchannel.
5142   if (GPR_LIKELY(batch->send_initial_metadata)) {
5143     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5144       gpr_log(GPR_INFO,
5145               "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
5146               chand_, this);
5147     }
5148     PickSubchannel(this, GRPC_ERROR_NONE);
5149   } else {
5150     // For all other batches, release the call combiner.
5151     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5152       gpr_log(GPR_INFO,
5153               "chand=%p lb_call=%p: saved batch, yielding call combiner",
5154               chand_, this);
5155     }
5156     GRPC_CALL_COMBINER_STOP(call_combiner_,
5157                             "batch does not include send_initial_metadata");
5158   }
5159 }
5160 
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)5161 void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy(
5162     void* arg, grpc_error* error) {
5163   auto* self = static_cast<LoadBalancedCall*>(arg);
5164   if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
5165     // Set error if call did not succeed.
5166     grpc_error* error_for_lb = GRPC_ERROR_NONE;
5167     if (error != GRPC_ERROR_NONE) {
5168       error_for_lb = error;
5169     } else {
5170       const auto& fields = self->recv_trailing_metadata_->idx.named;
5171       GPR_ASSERT(fields.grpc_status != nullptr);
5172       grpc_status_code status =
5173           grpc_get_status_code_from_metadata(fields.grpc_status->md);
5174       std::string msg;
5175       if (status != GRPC_STATUS_OK) {
5176         error_for_lb = grpc_error_set_int(
5177             GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
5178             GRPC_ERROR_INT_GRPC_STATUS, status);
5179         if (fields.grpc_message != nullptr) {
5180           error_for_lb = grpc_error_set_str(
5181               error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
5182               grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
5183         }
5184       }
5185     }
5186     // Invoke callback to LB policy.
5187     Metadata trailing_metadata(self, self->recv_trailing_metadata_);
5188     LbCallState lb_call_state(self);
5189     self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
5190                                            &lb_call_state);
5191     if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
5192   }
5193   // Chain to original callback.
5194   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
5195                GRPC_ERROR_REF(error));
5196 }
5197 
5198 // TODO(roth): Consider not intercepting this callback unless we
5199 // actually need to, if this causes a performance problem.
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)5200 void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
5201     grpc_transport_stream_op_batch* batch) {
5202   recv_trailing_metadata_ =
5203       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
5204   original_recv_trailing_metadata_ready_ =
5205       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
5206   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
5207                     RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
5208                     grpc_schedule_on_exec_ctx);
5209   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
5210       &recv_trailing_metadata_ready_;
5211 }
5212 
CreateSubchannelCall()5213 void LoadBalancedCall::CreateSubchannelCall() {
5214   SubchannelCall::Args call_args = {
5215       std::move(connected_subchannel_), pollent_, path_, call_start_time_,
5216       deadline_, arena_,
5217       // TODO(roth): When we implement hedging support, we will probably
5218       // need to use a separate call context for each subchannel call.
5219       call_context_, call_combiner_};
5220   grpc_error* error = GRPC_ERROR_NONE;
5221   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
5222   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5223     gpr_log(GPR_INFO,
5224             "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
5225             this, subchannel_call_.get(), grpc_error_string(error));
5226   }
5227   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
5228     PendingBatchesFail(error, YieldCallCombiner);
5229   } else {
5230     PendingBatchesResume();
5231   }
5232 }
5233 
5234 // A class to handle the call combiner cancellation callback for a
5235 // queued pick.
5236 // TODO(roth): When we implement hedging support, we won't be able to
5237 // register a call combiner cancellation closure for each LB pick,
5238 // because there may be multiple LB picks happening in parallel.
5239 // Instead, we will probably need to maintain a list in the CallData
5240 // object of pending LB picks to be cancelled when the closure runs.
5241 class LoadBalancedCall::LbQueuedCallCanceller {
5242  public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)5243   explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
5244       : lb_call_(std::move(lb_call)) {
5245     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
5246     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
5247     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
5248   }
5249 
5250  private:
CancelLocked(void * arg,grpc_error * error)5251   static void CancelLocked(void* arg, grpc_error* error) {
5252     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
5253     auto* lb_call = self->lb_call_.get();
5254     auto* chand = lb_call->chand_;
5255     {
5256       MutexLock lock(chand->data_plane_mu());
5257       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5258         gpr_log(GPR_INFO,
5259                 "chand=%p lb_call=%p: cancelling queued pick: "
5260                 "error=%s self=%p calld->pick_canceller=%p",
5261                 chand, lb_call, grpc_error_string(error), self,
5262                 lb_call->lb_call_canceller_);
5263       }
5264       if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
5265         // Remove pick from list of queued picks.
5266         lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
5267         // Fail pending batches on the call.
5268         lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
5269                                     YieldCallCombinerIfPendingBatchesFound);
5270       }
5271     }
5272     GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
5273     delete self;
5274   }
5275 
5276   RefCountedPtr<LoadBalancedCall> lb_call_;
5277   grpc_closure closure_;
5278 };
5279 
MaybeRemoveCallFromLbQueuedCallsLocked()5280 void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
5281   if (!queued_pending_lb_pick_) return;
5282   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5283     gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
5284             chand_, this);
5285   }
5286   chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
5287   queued_pending_lb_pick_ = false;
5288   // Lame the call combiner canceller.
5289   lb_call_canceller_ = nullptr;
5290 }
5291 
MaybeAddCallToLbQueuedCallsLocked()5292 void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
5293   if (queued_pending_lb_pick_) return;
5294   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5295     gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
5296             chand_, this);
5297   }
5298   queued_pending_lb_pick_ = true;
5299   queued_call_.lb_call = this;
5300   chand_->AddLbQueuedCall(&queued_call_, pollent_);
5301   // Register call combiner cancellation callback.
5302   lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
5303 }
5304 
AsyncPickDone(grpc_error * error)5305 void LoadBalancedCall::AsyncPickDone(grpc_error* error) {
5306   GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
5307   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
5308 }
5309 
PickDone(void * arg,grpc_error * error)5310 void LoadBalancedCall::PickDone(void* arg, grpc_error* error) {
5311   auto* self = static_cast<LoadBalancedCall*>(arg);
5312   if (error != GRPC_ERROR_NONE) {
5313     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5314       gpr_log(GPR_INFO,
5315               "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
5316               self->chand_, self, grpc_error_string(error));
5317     }
5318     self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
5319     return;
5320   }
5321   self->CreateSubchannelCall();
5322 }
5323 
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)5324 const char* PickResultTypeName(
5325     LoadBalancingPolicy::PickResult::ResultType type) {
5326   switch (type) {
5327     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
5328       return "COMPLETE";
5329     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5330       return "QUEUE";
5331     case LoadBalancingPolicy::PickResult::PICK_FAILED:
5332       return "FAILED";
5333   }
5334   GPR_UNREACHABLE_CODE(return "UNKNOWN");
5335 }
5336 
PickSubchannel(void * arg,grpc_error * error)5337 void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) {
5338   auto* self = static_cast<LoadBalancedCall*>(arg);
5339   bool pick_complete;
5340   {
5341     MutexLock lock(self->chand_->data_plane_mu());
5342     pick_complete = self->PickSubchannelLocked(&error);
5343   }
5344   if (pick_complete) {
5345     PickDone(self, error);
5346     GRPC_ERROR_UNREF(error);
5347   }
5348 }
5349 
PickSubchannelLocked(grpc_error ** error)5350 bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) {
5351   GPR_ASSERT(connected_subchannel_ == nullptr);
5352   GPR_ASSERT(subchannel_call_ == nullptr);
5353   // Grab initial metadata.
5354   auto& send_initial_metadata =
5355       pending_batches_[0]->payload->send_initial_metadata;
5356   grpc_metadata_batch* initial_metadata_batch =
5357       send_initial_metadata.send_initial_metadata;
5358   const uint32_t send_initial_metadata_flags =
5359       send_initial_metadata.send_initial_metadata_flags;
5360   // Perform LB pick.
5361   LoadBalancingPolicy::PickArgs pick_args;
5362   pick_args.path = StringViewFromSlice(path_);
5363   LbCallState lb_call_state(this);
5364   pick_args.call_state = &lb_call_state;
5365   Metadata initial_metadata(this, initial_metadata_batch);
5366   pick_args.initial_metadata = &initial_metadata;
5367   auto result = chand_->picker()->Pick(pick_args);
5368   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5369     gpr_log(
5370         GPR_INFO,
5371         "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
5372         chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
5373         grpc_error_string(result.error));
5374   }
5375   switch (result.type) {
5376     case LoadBalancingPolicy::PickResult::PICK_FAILED: {
5377       // If we're shutting down, fail all RPCs.
5378       grpc_error* disconnect_error = chand_->disconnect_error();
5379       if (disconnect_error != GRPC_ERROR_NONE) {
5380         GRPC_ERROR_UNREF(result.error);
5381         MaybeRemoveCallFromLbQueuedCallsLocked();
5382         *error = GRPC_ERROR_REF(disconnect_error);
5383         return true;
5384       }
5385       // If wait_for_ready is false, then the error indicates the RPC
5386       // attempt's final status.
5387       if ((send_initial_metadata_flags &
5388            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
5389         grpc_error* new_error =
5390             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
5391                 "Failed to pick subchannel", &result.error, 1);
5392         GRPC_ERROR_UNREF(result.error);
5393         *error = new_error;
5394         MaybeRemoveCallFromLbQueuedCallsLocked();
5395         return true;
5396       }
5397       // If wait_for_ready is true, then queue to retry when we get a new
5398       // picker.
5399       GRPC_ERROR_UNREF(result.error);
5400     }
5401     // Fallthrough
5402     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5403       MaybeAddCallToLbQueuedCallsLocked();
5404       return false;
5405     default:  // PICK_COMPLETE
5406       MaybeRemoveCallFromLbQueuedCallsLocked();
5407       // Handle drops.
5408       if (GPR_UNLIKELY(result.subchannel == nullptr)) {
5409         result.error = grpc_error_set_int(
5410             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
5411                 "Call dropped by load balancing policy"),
5412             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
5413       } else {
5414         // Grab a ref to the connected subchannel while we're still
5415         // holding the data plane mutex.
5416         connected_subchannel_ =
5417             chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
5418         GPR_ASSERT(connected_subchannel_ != nullptr);
5419       }
5420       lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
5421       *error = result.error;
5422       return true;
5423   }
5424 }
5425 
5426 }  // namespace
5427 }  // namespace grpc_core
5428 
5429 /*************************************************************************
5430  * EXPORTED SYMBOLS
5431  */
5432 
5433 using grpc_core::CallData;
5434 using grpc_core::ChannelData;
5435 
5436 const grpc_channel_filter grpc_client_channel_filter = {
5437     CallData::StartTransportStreamOpBatch,
5438     ChannelData::StartTransportOp,
5439     sizeof(CallData),
5440     CallData::Init,
5441     CallData::SetPollent,
5442     CallData::Destroy,
5443     sizeof(ChannelData),
5444     ChannelData::Init,
5445     ChannelData::Destroy,
5446     ChannelData::GetChannelInfo,
5447     "client-channel",
5448 };
5449 
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)5450 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
5451     grpc_channel_element* elem, int try_to_connect) {
5452   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5453   return chand->CheckConnectivityState(try_to_connect);
5454 }
5455 
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)5456 int grpc_client_channel_num_external_connectivity_watchers(
5457     grpc_channel_element* elem) {
5458   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5459   return chand->NumExternalConnectivityWatchers();
5460 }
5461 
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)5462 void grpc_client_channel_watch_connectivity_state(
5463     grpc_channel_element* elem, grpc_polling_entity pollent,
5464     grpc_connectivity_state* state, grpc_closure* on_complete,
5465     grpc_closure* watcher_timer_init) {
5466   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5467   if (state == nullptr) {
5468     // Handle cancellation.
5469     GPR_ASSERT(watcher_timer_init == nullptr);
5470     chand->RemoveExternalConnectivityWatcher(on_complete, /*cancel=*/true);
5471     return;
5472   }
5473   // Handle addition.
5474   return chand->AddExternalConnectivityWatcher(pollent, state, on_complete,
5475                                                watcher_timer_init);
5476 }
5477 
grpc_client_channel_start_connectivity_watch(grpc_channel_element * elem,grpc_connectivity_state initial_state,grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface> watcher)5478 void grpc_client_channel_start_connectivity_watch(
5479     grpc_channel_element* elem, grpc_connectivity_state initial_state,
5480     grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
5481         watcher) {
5482   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5483   chand->AddConnectivityWatcher(initial_state, std::move(watcher));
5484 }
5485 
grpc_client_channel_stop_connectivity_watch(grpc_channel_element * elem,grpc_core::AsyncConnectivityStateWatcherInterface * watcher)5486 void grpc_client_channel_stop_connectivity_watch(
5487     grpc_channel_element* elem,
5488     grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
5489   auto* chand = static_cast<ChannelData*>(elem->channel_data);
5490   chand->RemoveConnectivityWatcher(watcher);
5491 }
5492