• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H
18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H
19 
20 #include <grpc/grpc.h>
21 #include <grpc/impl/connectivity_state.h>
22 #include <grpc/support/port_platform.h>
23 #include <stddef.h>
24 
25 #include <atomic>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/container/flat_hash_set.h"
33 #include "absl/functional/any_invocable.h"
34 #include "absl/status/status.h"
35 #include "absl/strings/string_view.h"
36 #include "absl/types/optional.h"
37 #include "src/core/channelz/channelz.h"
38 #include "src/core/client_channel/client_channel_args.h"
39 #include "src/core/client_channel/client_channel_factory.h"
40 #include "src/core/client_channel/config_selector.h"
41 #include "src/core/client_channel/dynamic_filters.h"
42 #include "src/core/client_channel/subchannel.h"
43 #include "src/core/client_channel/subchannel_pool_interface.h"
44 #include "src/core/filter/blackboard.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/channel_fwd.h"
47 #include "src/core/lib/channel/channel_stack.h"
48 #include "src/core/lib/iomgr/call_combiner.h"
49 #include "src/core/lib/iomgr/closure.h"
50 #include "src/core/lib/iomgr/error.h"
51 #include "src/core/lib/iomgr/iomgr_fwd.h"
52 #include "src/core/lib/iomgr/polling_entity.h"
53 #include "src/core/lib/resource_quota/arena.h"
54 #include "src/core/lib/slice/slice.h"
55 #include "src/core/lib/transport/connectivity_state.h"
56 #include "src/core/lib/transport/metadata_batch.h"
57 #include "src/core/lib/transport/transport.h"
58 #include "src/core/load_balancing/backend_metric_data.h"
59 #include "src/core/load_balancing/lb_policy.h"
60 #include "src/core/resolver/resolver.h"
61 #include "src/core/service_config/service_config.h"
62 #include "src/core/telemetry/call_tracer.h"
63 #include "src/core/util/orphanable.h"
64 #include "src/core/util/ref_counted.h"
65 #include "src/core/util/ref_counted_ptr.h"
66 #include "src/core/util/sync.h"
67 #include "src/core/util/time.h"
68 #include "src/core/util/time_precise.h"
69 #include "src/core/util/work_serializer.h"
70 
71 //
72 // Client channel filter
73 //
74 
75 // A client channel is a channel that begins disconnected, and can connect
76 // to some endpoint on demand. If that endpoint disconnects, it will be
77 // connected to again later.
78 //
79 // Calls on a disconnected client channel are queued until a connection is
80 // established.
81 
82 // Max number of batches that can be pending on a call at any given
83 // time.  This includes one batch for each of the following ops:
84 //   recv_initial_metadata
85 //   send_initial_metadata
86 //   recv_message
87 //   send_message
88 //   recv_trailing_metadata
89 //   send_trailing_metadata
90 #define MAX_PENDING_BATCHES 6
91 
92 namespace grpc_core {
93 
94 class ClientChannelFilter final {
95  public:
96   static const grpc_channel_filter kFilter;
97 
98   class LoadBalancedCall;
99   class FilterBasedLoadBalancedCall;
100 
101   // Flag that this object gets stored in channel args as a raw pointer.
102   struct RawPointerChannelArgTag {};
ChannelArgName()103   static absl::string_view ChannelArgName() {
104     return "grpc.internal.client_channel_filter";
105   }
106 
107   grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
108 
109   // Starts a one-time connectivity state watch.  When the channel's state
110   // becomes different from *state, sets *state to the new state and
111   // schedules on_complete.  The watcher_timer_init callback is invoked as
112   // soon as the watch is actually started (i.e., after hopping into the
113   // client channel combiner).  I/O will be serviced via pollent.
114   //
115   // This is intended to be used when starting a watch from outside of C-core
116   // via grpc_channel_watch_connectivity_state().  It should not be used
117   // by other callers.
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)118   void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
119                                       grpc_connectivity_state* state,
120                                       grpc_closure* on_complete,
121                                       grpc_closure* watcher_timer_init) {
122     new ExternalConnectivityWatcher(this, pollent, state, on_complete,
123                                     watcher_timer_init);
124   }
125 
126   // Cancels a pending external watcher previously added by
127   // AddExternalConnectivityWatcher().
CancelExternalConnectivityWatcher(grpc_closure * on_complete)128   void CancelExternalConnectivityWatcher(grpc_closure* on_complete) {
129     ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
130         this, on_complete, /*cancel=*/true);
131   }
132 
133   // Starts and stops a connectivity watch.  The watcher will be initially
134   // notified as soon as the state changes from initial_state and then on
135   // every subsequent state change until either the watch is stopped or
136   // it is notified that the state has changed to SHUTDOWN.
137   //
138   // This is intended to be used when starting watches from code inside of
139   // C-core (e.g., for a nested control plane channel for things like xds).
140   void AddConnectivityWatcher(
141       grpc_connectivity_state initial_state,
142       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
143   void RemoveConnectivityWatcher(
144       AsyncConnectivityStateWatcherInterface* watcher);
145 
146   OrphanablePtr<FilterBasedLoadBalancedCall> CreateLoadBalancedCall(
147       const grpc_call_element_args& args, grpc_polling_entity* pollent,
148       grpc_closure* on_call_destruction_complete,
149       absl::AnyInvocable<void()> on_commit, bool is_transparent_retry);
150 
151  private:
152   class CallData;
153   class FilterBasedCallData;
154   class ResolverResultHandler;
155   class SubchannelWrapper;
156   class ClientChannelControlHelper;
157   class ConnectivityWatcherAdder;
158   class ConnectivityWatcherRemover;
159 
160   // Represents a pending connectivity callback from an external caller
161   // via grpc_client_channel_watch_connectivity_state().
162   class ExternalConnectivityWatcher final
163       : public ConnectivityStateWatcherInterface {
164    public:
165     ExternalConnectivityWatcher(ClientChannelFilter* chand,
166                                 grpc_polling_entity pollent,
167                                 grpc_connectivity_state* state,
168                                 grpc_closure* on_complete,
169                                 grpc_closure* watcher_timer_init);
170 
171     ~ExternalConnectivityWatcher() override;
172 
173     // Removes the watcher from the external_watchers_ map.
174     static void RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
175                                                      grpc_closure* on_complete,
176                                                      bool cancel);
177 
178     void Notify(grpc_connectivity_state state,
179                 const absl::Status& /* status */) override;
180 
181     void Cancel();
182 
183    private:
184     // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
185     // from Start().
186     void AddWatcherLocked()
187         ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_);
188     void RemoveWatcherLocked()
189         ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_);
190 
191     ClientChannelFilter* chand_;
192     grpc_polling_entity pollent_;
193     grpc_connectivity_state initial_state_;
194     grpc_connectivity_state* state_;
195     grpc_closure* on_complete_;
196     grpc_closure* watcher_timer_init_;
197     std::atomic<bool> done_{false};
198   };
199 
200   ClientChannelFilter(grpc_channel_element_args* args,
201                       grpc_error_handle* error);
202   ~ClientChannelFilter();
203 
204   // Filter vtable functions.
205   static grpc_error_handle Init(grpc_channel_element* elem,
206                                 grpc_channel_element_args* args);
207   static void Destroy(grpc_channel_element* elem);
208   static void StartTransportOp(grpc_channel_element* elem,
209                                grpc_transport_op* op);
210   static void GetChannelInfo(grpc_channel_element* elem,
211                              const grpc_channel_info* info);
212 
213   // Note: All methods with "Locked" suffix must be invoked from within
214   // work_serializer_.
215 
216   void ReprocessQueuedResolverCalls()
217       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_);
218 
219   void OnResolverResultChangedLocked(Resolver::Result result)
220       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
221   void OnResolverErrorLocked(absl::Status status)
222       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
223 
224   absl::Status CreateOrUpdateLbPolicyLocked(
225       RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
226       const absl::optional<std::string>& health_check_service_name,
227       Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
228   OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
229       const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
230 
231   void UpdateStateLocked(grpc_connectivity_state state,
232                          const absl::Status& status, const char* reason)
233       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
234 
235   void UpdateStateAndPickerLocked(
236       grpc_connectivity_state state, const absl::Status& status,
237       const char* reason,
238       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
239       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
240 
241   void UpdateServiceConfigInControlPlaneLocked(
242       RefCountedPtr<ServiceConfig> service_config,
243       RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name)
244       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
245 
246   void UpdateServiceConfigInDataPlaneLocked(const ChannelArgs& args)
247       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
248 
249   void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
250   void DestroyResolverAndLbPolicyLocked()
251       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
252 
253   grpc_error_handle DoPingLocked(grpc_transport_op* op)
254       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
255 
256   void StartTransportOpLocked(grpc_transport_op* op)
257       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
258 
259   void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_);
260 
261   //
262   // Fields set at construction and never modified.
263   //
264   ChannelArgs channel_args_;
265   grpc_channel_stack* owning_stack_;
266   ClientChannelFactory* client_channel_factory_;
267   RefCountedPtr<ServiceConfig> default_service_config_;
268   std::string target_uri_;
269   std::string uri_to_resolve_;
270   std::string default_authority_;
271   channelz::ChannelNode* channelz_node_;
272   grpc_pollset_set* interested_parties_;
273   const size_t service_config_parser_index_;
274 
275   //
276   // Fields related to name resolution.  Guarded by resolution_mu_.
277   //
278   mutable Mutex resolution_mu_;
279   // List of calls queued waiting for resolver result.
280   absl::flat_hash_set<CallData*> resolver_queued_calls_
281       ABSL_GUARDED_BY(resolution_mu_);
282   // Data from service config.
283   absl::Status resolver_transient_failure_error_
284       ABSL_GUARDED_BY(resolution_mu_);
285   bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false;
286   RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_);
287   RefCountedPtr<ConfigSelector> config_selector_
288       ABSL_GUARDED_BY(resolution_mu_);
289   RefCountedPtr<DynamicFilters> dynamic_filters_
290       ABSL_GUARDED_BY(resolution_mu_);
291 
292   //
293   // Fields related to LB picks.  Guarded by lb_mu_.
294   //
295   mutable Mutex lb_mu_;
296   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_
297       ABSL_GUARDED_BY(lb_mu_);
298   absl::flat_hash_set<RefCountedPtr<LoadBalancedCall>,
299                       RefCountedPtrHash<LoadBalancedCall>,
300                       RefCountedPtrEq<LoadBalancedCall>>
301       lb_queued_calls_ ABSL_GUARDED_BY(lb_mu_);
302 
303   //
304   // Fields used in the control plane.  Guarded by work_serializer.
305   //
306   std::shared_ptr<WorkSerializer> work_serializer_;
307   ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_);
308   OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(*work_serializer_);
309   bool previous_resolution_contained_addresses_
310       ABSL_GUARDED_BY(*work_serializer_) = false;
311   RefCountedPtr<ServiceConfig> saved_service_config_
312       ABSL_GUARDED_BY(*work_serializer_);
313   RefCountedPtr<ConfigSelector> saved_config_selector_
314       ABSL_GUARDED_BY(*work_serializer_);
315   RefCountedPtr<const Blackboard> blackboard_
316       ABSL_GUARDED_BY(*work_serializer_);
317   OrphanablePtr<LoadBalancingPolicy> lb_policy_
318       ABSL_GUARDED_BY(*work_serializer_);
319   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_
320       ABSL_GUARDED_BY(*work_serializer_);
321   // The number of SubchannelWrapper instances referencing a given Subchannel.
322   std::map<Subchannel*, int> subchannel_refcount_map_
323       ABSL_GUARDED_BY(*work_serializer_);
324   // The set of SubchannelWrappers that currently exist.
325   // No need to hold a ref, since the map is updated in the control-plane
326   // work_serializer when the SubchannelWrappers are created and destroyed.
327   absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_
328       ABSL_GUARDED_BY(*work_serializer_);
329   int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1;
330   grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_);
331 
332   //
333   // Fields guarded by a mutex, since they need to be accessed
334   // synchronously via get_channel_info().
335   //
336   Mutex info_mu_;
337   std::string info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_);
338   std::string info_service_config_json_ ABSL_GUARDED_BY(info_mu_);
339 
340   //
341   // Fields guarded by a mutex, since they need to be accessed
342   // synchronously via grpc_channel_num_external_connectivity_watchers().
343   //
344   mutable Mutex external_watchers_mu_;
345   std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
346       external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_);
347 };
348 
349 //
350 // ClientChannelFilter::LoadBalancedCall
351 //
352 
353 // TODO(roth): As part of simplifying cancellation in the filter stack,
354 // this should no longer need to be ref-counted.
355 class ClientChannelFilter::LoadBalancedCall
356     : public InternallyRefCounted<LoadBalancedCall, UnrefCallDtor> {
357  public:
358   LoadBalancedCall(ClientChannelFilter* chand, Arena* arena,
359                    absl::AnyInvocable<void()> on_commit,
360                    bool is_transparent_retry);
361   ~LoadBalancedCall() override;
362 
Orphan()363   void Orphan() override { Unref(); }
364 
365   // Called by channel when removing a call from the list of queued calls.
366   void RemoveCallFromLbQueuedCallsLocked()
367       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
368 
369   // Called by the channel for each queued call when a new picker
370   // becomes available.
371   virtual void RetryPickLocked()
372       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0;
373 
374  protected:
chand()375   ClientChannelFilter* chand() const { return chand_; }
call_attempt_tracer()376   ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const {
377     return DownCast<ClientCallTracer::CallAttemptTracer*>(
378         arena_->GetContext<CallTracerInterface>());
379   }
connected_subchannel()380   ConnectedSubchannel* connected_subchannel() const {
381     return connected_subchannel_.get();
382   }
383   LoadBalancingPolicy::SubchannelCallTrackerInterface*
lb_subchannel_call_tracker()384   lb_subchannel_call_tracker() const {
385     return lb_subchannel_call_tracker_.get();
386   }
arena()387   Arena* arena() const { return arena_; }
388 
Commit()389   void Commit() {
390     auto on_commit = std::move(on_commit_);
391     on_commit();
392   }
393 
394   // Attempts an LB pick.  The following outcomes are possible:
395   // - No pick result is available yet.  The call will be queued and
396   //   nullopt will be returned.  The channel will later call
397   //   RetryPickLocked() when a new picker is available and the pick
398   //   should be retried.
399   // - The pick failed.  If the call is not wait_for_ready, a non-OK
400   //   status will be returned.  (If the call *is* wait_for_ready,
401   //   it will be queued instead.)
402   // - The pick completed successfully.  A connected subchannel is
403   //   stored and an OK status will be returned.
404   absl::optional<absl::Status> PickSubchannel(bool was_queued);
405 
406   void RecordCallCompletion(absl::Status status,
407                             grpc_metadata_batch* recv_trailing_metadata,
408                             grpc_transport_stream_stats* transport_stream_stats,
409                             absl::string_view peer_address);
410 
411   void RecordLatency();
412 
413  private:
414   class LbCallState;
415   class Metadata;
416   class BackendMetricAccessor;
417 
418   virtual grpc_polling_entity* pollent() = 0;
419   virtual grpc_metadata_batch* send_initial_metadata() const = 0;
420 
421   // Helper function for performing an LB pick with a specified picker.
422   // Returns true if the pick is complete.
423   bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker,
424                           grpc_error_handle* error);
425   // Adds the call to the channel's list of queued picks if not already present.
426   void AddCallToLbQueuedCallsLocked()
427       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
428 
429   // Called when adding the call to the LB queue.
430   virtual void OnAddToQueueLocked()
431       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0;
432 
433   ClientChannelFilter* chand_;
434 
435   absl::AnyInvocable<void()> on_commit_;
436 
437   gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter();
438 
439   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
440   const BackendMetricData* backend_metric_data_ = nullptr;
441   std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface>
442       lb_subchannel_call_tracker_;
443   Arena* const arena_;
444 };
445 
446 class ClientChannelFilter::FilterBasedLoadBalancedCall final
447     : public ClientChannelFilter::LoadBalancedCall {
448  public:
449   // If on_call_destruction_complete is non-null, then it will be
450   // invoked once the LoadBalancedCall is completely destroyed.
451   // If it is null, then the caller is responsible for checking whether
452   // the LB call has a subchannel call and ensuring that the
453   // on_call_destruction_complete closure passed down from the surface
454   // is not invoked until after the subchannel call stack is destroyed.
455   FilterBasedLoadBalancedCall(ClientChannelFilter* chand,
456                               const grpc_call_element_args& args,
457                               grpc_polling_entity* pollent,
458                               grpc_closure* on_call_destruction_complete,
459                               absl::AnyInvocable<void()> on_commit,
460                               bool is_transparent_retry);
461   ~FilterBasedLoadBalancedCall() override;
462 
463   void Orphan() override;
464 
465   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
466 
subchannel_call()467   RefCountedPtr<SubchannelCall> subchannel_call() const {
468     return subchannel_call_;
469   }
470 
471  private:
472   class LbQueuedCallCanceller;
473 
474   // Work-around for Windows compilers that don't allow nested classes
475   // to access protected members of the enclosing class's parent class.
476   using LoadBalancedCall::chand;
477   using LoadBalancedCall::Commit;
478 
pollent()479   grpc_polling_entity* pollent() override { return pollent_; }
send_initial_metadata()480   grpc_metadata_batch* send_initial_metadata() const override {
481     return pending_batches_[0]
482         ->payload->send_initial_metadata.send_initial_metadata;
483   }
484 
485   // Returns the index into pending_batches_ to be used for batch.
486   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
487   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
488   static void FailPendingBatchInCallCombiner(void* arg,
489                                              grpc_error_handle error);
490   // A predicate type and some useful implementations for PendingBatchesFail().
491   typedef bool (*YieldCallCombinerPredicate)(
492       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)493   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
494     return true;
495   }
NoYieldCallCombiner(const CallCombinerClosureList &)496   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
497     return false;
498   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)499   static bool YieldCallCombinerIfPendingBatchesFound(
500       const CallCombinerClosureList& closures) {
501     return closures.size() > 0;
502   }
503   // Fails all pending batches.
504   // If yield_call_combiner_predicate returns true, assumes responsibility for
505   // yielding the call combiner.
506   void PendingBatchesFail(
507       grpc_error_handle error,
508       YieldCallCombinerPredicate yield_call_combiner_predicate);
509   static void ResumePendingBatchInCallCombiner(void* arg,
510                                                grpc_error_handle ignored);
511   // Resumes all pending batches on subchannel_call_.
512   void PendingBatchesResume();
513 
514   static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error);
515   static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
516   static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
517 
518   // Called to perform a pick, both when the call is initially started
519   // and when it is queued and the channel gets a new picker.
520   void TryPick(bool was_queued);
521 
522   void OnAddToQueueLocked() override
523       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
524 
525   void RetryPickLocked() override
526       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_);
527 
528   void CreateSubchannelCall();
529 
530   // TODO(roth): Instead of duplicating these fields in every filter
531   // that uses any one of them, we should store them in the call
532   // context.  This will save per-call memory overhead.
533   grpc_call_stack* owning_call_;
534   CallCombiner* call_combiner_;
535   grpc_polling_entity* pollent_;
536   grpc_closure* on_call_destruction_complete_;
537   absl::optional<Slice> peer_string_;
538 
539   // Set when we get a cancel_stream op.
540   grpc_error_handle cancel_error_;
541 
542   // Set when we fail inside the LB call.
543   grpc_error_handle failure_error_;
544 
545   LbQueuedCallCanceller* lb_call_canceller_
546       ABSL_GUARDED_BY(&ClientChannelFilter::lb_mu_) = nullptr;
547 
548   RefCountedPtr<SubchannelCall> subchannel_call_;
549 
550   // For intercepting recv_initial_metadata_ready.
551   grpc_metadata_batch* recv_initial_metadata_ = nullptr;
552   grpc_closure recv_initial_metadata_ready_;
553   grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
554 
555   // For intercepting recv_trailing_metadata_ready.
556   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
557   grpc_transport_stream_stats* transport_stream_stats_ = nullptr;
558   grpc_closure recv_trailing_metadata_ready_;
559   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
560 
561   // Batches are added to this list when received from above.
562   // They are removed when we are done handling the batch (i.e., when
563   // either we have invoked all of the batch's callbacks or we have
564   // passed the batch down to the subchannel call and are not
565   // intercepting any of its callbacks).
566   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
567 };
568 
569 }  // namespace grpc_core
570 
571 #endif  // GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H
572