1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H 18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H 19 20 #include <grpc/grpc.h> 21 #include <grpc/impl/connectivity_state.h> 22 #include <grpc/support/port_platform.h> 23 #include <stddef.h> 24 25 #include <atomic> 26 #include <map> 27 #include <memory> 28 #include <string> 29 #include <utility> 30 31 #include "absl/base/thread_annotations.h" 32 #include "absl/container/flat_hash_set.h" 33 #include "absl/functional/any_invocable.h" 34 #include "absl/status/status.h" 35 #include "absl/strings/string_view.h" 36 #include "absl/types/optional.h" 37 #include "src/core/channelz/channelz.h" 38 #include "src/core/client_channel/client_channel_args.h" 39 #include "src/core/client_channel/client_channel_factory.h" 40 #include "src/core/client_channel/config_selector.h" 41 #include "src/core/client_channel/dynamic_filters.h" 42 #include "src/core/client_channel/subchannel.h" 43 #include "src/core/client_channel/subchannel_pool_interface.h" 44 #include "src/core/filter/blackboard.h" 45 #include "src/core/lib/channel/channel_args.h" 46 #include "src/core/lib/channel/channel_fwd.h" 47 #include "src/core/lib/channel/channel_stack.h" 48 #include "src/core/lib/iomgr/call_combiner.h" 49 #include "src/core/lib/iomgr/closure.h" 50 #include "src/core/lib/iomgr/error.h" 51 #include "src/core/lib/iomgr/iomgr_fwd.h" 52 #include "src/core/lib/iomgr/polling_entity.h" 53 #include "src/core/lib/resource_quota/arena.h" 54 #include "src/core/lib/slice/slice.h" 55 #include "src/core/lib/transport/connectivity_state.h" 56 #include "src/core/lib/transport/metadata_batch.h" 57 #include "src/core/lib/transport/transport.h" 58 #include "src/core/load_balancing/backend_metric_data.h" 59 #include "src/core/load_balancing/lb_policy.h" 60 #include "src/core/resolver/resolver.h" 61 #include "src/core/service_config/service_config.h" 62 #include "src/core/telemetry/call_tracer.h" 63 #include "src/core/util/orphanable.h" 64 #include "src/core/util/ref_counted.h" 65 #include "src/core/util/ref_counted_ptr.h" 66 #include "src/core/util/sync.h" 67 #include "src/core/util/time.h" 68 #include "src/core/util/time_precise.h" 69 #include "src/core/util/work_serializer.h" 70 71 // 72 // Client channel filter 73 // 74 75 // A client channel is a channel that begins disconnected, and can connect 76 // to some endpoint on demand. If that endpoint disconnects, it will be 77 // connected to again later. 78 // 79 // Calls on a disconnected client channel are queued until a connection is 80 // established. 81 82 // Max number of batches that can be pending on a call at any given 83 // time. This includes one batch for each of the following ops: 84 // recv_initial_metadata 85 // send_initial_metadata 86 // recv_message 87 // send_message 88 // recv_trailing_metadata 89 // send_trailing_metadata 90 #define MAX_PENDING_BATCHES 6 91 92 namespace grpc_core { 93 94 class ClientChannelFilter final { 95 public: 96 static const grpc_channel_filter kFilter; 97 98 class LoadBalancedCall; 99 class FilterBasedLoadBalancedCall; 100 101 // Flag that this object gets stored in channel args as a raw pointer. 102 struct RawPointerChannelArgTag {}; ChannelArgName()103 static absl::string_view ChannelArgName() { 104 return "grpc.internal.client_channel_filter"; 105 } 106 107 grpc_connectivity_state CheckConnectivityState(bool try_to_connect); 108 109 // Starts a one-time connectivity state watch. When the channel's state 110 // becomes different from *state, sets *state to the new state and 111 // schedules on_complete. The watcher_timer_init callback is invoked as 112 // soon as the watch is actually started (i.e., after hopping into the 113 // client channel combiner). I/O will be serviced via pollent. 114 // 115 // This is intended to be used when starting a watch from outside of C-core 116 // via grpc_channel_watch_connectivity_state(). It should not be used 117 // by other callers. AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)118 void AddExternalConnectivityWatcher(grpc_polling_entity pollent, 119 grpc_connectivity_state* state, 120 grpc_closure* on_complete, 121 grpc_closure* watcher_timer_init) { 122 new ExternalConnectivityWatcher(this, pollent, state, on_complete, 123 watcher_timer_init); 124 } 125 126 // Cancels a pending external watcher previously added by 127 // AddExternalConnectivityWatcher(). CancelExternalConnectivityWatcher(grpc_closure * on_complete)128 void CancelExternalConnectivityWatcher(grpc_closure* on_complete) { 129 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( 130 this, on_complete, /*cancel=*/true); 131 } 132 133 // Starts and stops a connectivity watch. The watcher will be initially 134 // notified as soon as the state changes from initial_state and then on 135 // every subsequent state change until either the watch is stopped or 136 // it is notified that the state has changed to SHUTDOWN. 137 // 138 // This is intended to be used when starting watches from code inside of 139 // C-core (e.g., for a nested control plane channel for things like xds). 140 void AddConnectivityWatcher( 141 grpc_connectivity_state initial_state, 142 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher); 143 void RemoveConnectivityWatcher( 144 AsyncConnectivityStateWatcherInterface* watcher); 145 146 OrphanablePtr<FilterBasedLoadBalancedCall> CreateLoadBalancedCall( 147 const grpc_call_element_args& args, grpc_polling_entity* pollent, 148 grpc_closure* on_call_destruction_complete, 149 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry); 150 151 private: 152 class CallData; 153 class FilterBasedCallData; 154 class ResolverResultHandler; 155 class SubchannelWrapper; 156 class ClientChannelControlHelper; 157 class ConnectivityWatcherAdder; 158 class ConnectivityWatcherRemover; 159 160 // Represents a pending connectivity callback from an external caller 161 // via grpc_client_channel_watch_connectivity_state(). 162 class ExternalConnectivityWatcher final 163 : public ConnectivityStateWatcherInterface { 164 public: 165 ExternalConnectivityWatcher(ClientChannelFilter* chand, 166 grpc_polling_entity pollent, 167 grpc_connectivity_state* state, 168 grpc_closure* on_complete, 169 grpc_closure* watcher_timer_init); 170 171 ~ExternalConnectivityWatcher() override; 172 173 // Removes the watcher from the external_watchers_ map. 174 static void RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand, 175 grpc_closure* on_complete, 176 bool cancel); 177 178 void Notify(grpc_connectivity_state state, 179 const absl::Status& /* status */) override; 180 181 void Cancel(); 182 183 private: 184 // Adds the watcher to state_tracker_. Consumes the ref that is passed to it 185 // from Start(). 186 void AddWatcherLocked() 187 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_); 188 void RemoveWatcherLocked() 189 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_); 190 191 ClientChannelFilter* chand_; 192 grpc_polling_entity pollent_; 193 grpc_connectivity_state initial_state_; 194 grpc_connectivity_state* state_; 195 grpc_closure* on_complete_; 196 grpc_closure* watcher_timer_init_; 197 std::atomic<bool> done_{false}; 198 }; 199 200 ClientChannelFilter(grpc_channel_element_args* args, 201 grpc_error_handle* error); 202 ~ClientChannelFilter(); 203 204 // Filter vtable functions. 205 static grpc_error_handle Init(grpc_channel_element* elem, 206 grpc_channel_element_args* args); 207 static void Destroy(grpc_channel_element* elem); 208 static void StartTransportOp(grpc_channel_element* elem, 209 grpc_transport_op* op); 210 static void GetChannelInfo(grpc_channel_element* elem, 211 const grpc_channel_info* info); 212 213 // Note: All methods with "Locked" suffix must be invoked from within 214 // work_serializer_. 215 216 void ReprocessQueuedResolverCalls() 217 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&resolution_mu_); 218 219 void OnResolverResultChangedLocked(Resolver::Result result) 220 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 221 void OnResolverErrorLocked(absl::Status status) 222 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 223 224 absl::Status CreateOrUpdateLbPolicyLocked( 225 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, 226 const absl::optional<std::string>& health_check_service_name, 227 Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 228 OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( 229 const ChannelArgs& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 230 231 void UpdateStateLocked(grpc_connectivity_state state, 232 const absl::Status& status, const char* reason) 233 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 234 235 void UpdateStateAndPickerLocked( 236 grpc_connectivity_state state, const absl::Status& status, 237 const char* reason, 238 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) 239 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 240 241 void UpdateServiceConfigInControlPlaneLocked( 242 RefCountedPtr<ServiceConfig> service_config, 243 RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) 244 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 245 246 void UpdateServiceConfigInDataPlaneLocked(const ChannelArgs& args) 247 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 248 249 void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 250 void DestroyResolverAndLbPolicyLocked() 251 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 252 253 grpc_error_handle DoPingLocked(grpc_transport_op* op) 254 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 255 256 void StartTransportOpLocked(grpc_transport_op* op) 257 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 258 259 void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_); 260 261 // 262 // Fields set at construction and never modified. 263 // 264 ChannelArgs channel_args_; 265 grpc_channel_stack* owning_stack_; 266 ClientChannelFactory* client_channel_factory_; 267 RefCountedPtr<ServiceConfig> default_service_config_; 268 std::string target_uri_; 269 std::string uri_to_resolve_; 270 std::string default_authority_; 271 channelz::ChannelNode* channelz_node_; 272 grpc_pollset_set* interested_parties_; 273 const size_t service_config_parser_index_; 274 275 // 276 // Fields related to name resolution. Guarded by resolution_mu_. 277 // 278 mutable Mutex resolution_mu_; 279 // List of calls queued waiting for resolver result. 280 absl::flat_hash_set<CallData*> resolver_queued_calls_ 281 ABSL_GUARDED_BY(resolution_mu_); 282 // Data from service config. 283 absl::Status resolver_transient_failure_error_ 284 ABSL_GUARDED_BY(resolution_mu_); 285 bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; 286 RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_); 287 RefCountedPtr<ConfigSelector> config_selector_ 288 ABSL_GUARDED_BY(resolution_mu_); 289 RefCountedPtr<DynamicFilters> dynamic_filters_ 290 ABSL_GUARDED_BY(resolution_mu_); 291 292 // 293 // Fields related to LB picks. Guarded by lb_mu_. 294 // 295 mutable Mutex lb_mu_; 296 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker_ 297 ABSL_GUARDED_BY(lb_mu_); 298 absl::flat_hash_set<RefCountedPtr<LoadBalancedCall>, 299 RefCountedPtrHash<LoadBalancedCall>, 300 RefCountedPtrEq<LoadBalancedCall>> 301 lb_queued_calls_ ABSL_GUARDED_BY(lb_mu_); 302 303 // 304 // Fields used in the control plane. Guarded by work_serializer. 305 // 306 std::shared_ptr<WorkSerializer> work_serializer_; 307 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(*work_serializer_); 308 OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(*work_serializer_); 309 bool previous_resolution_contained_addresses_ 310 ABSL_GUARDED_BY(*work_serializer_) = false; 311 RefCountedPtr<ServiceConfig> saved_service_config_ 312 ABSL_GUARDED_BY(*work_serializer_); 313 RefCountedPtr<ConfigSelector> saved_config_selector_ 314 ABSL_GUARDED_BY(*work_serializer_); 315 RefCountedPtr<const Blackboard> blackboard_ 316 ABSL_GUARDED_BY(*work_serializer_); 317 OrphanablePtr<LoadBalancingPolicy> lb_policy_ 318 ABSL_GUARDED_BY(*work_serializer_); 319 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_ 320 ABSL_GUARDED_BY(*work_serializer_); 321 // The number of SubchannelWrapper instances referencing a given Subchannel. 322 std::map<Subchannel*, int> subchannel_refcount_map_ 323 ABSL_GUARDED_BY(*work_serializer_); 324 // The set of SubchannelWrappers that currently exist. 325 // No need to hold a ref, since the map is updated in the control-plane 326 // work_serializer when the SubchannelWrappers are created and destroyed. 327 absl::flat_hash_set<SubchannelWrapper*> subchannel_wrappers_ 328 ABSL_GUARDED_BY(*work_serializer_); 329 int keepalive_time_ ABSL_GUARDED_BY(*work_serializer_) = -1; 330 grpc_error_handle disconnect_error_ ABSL_GUARDED_BY(*work_serializer_); 331 332 // 333 // Fields guarded by a mutex, since they need to be accessed 334 // synchronously via get_channel_info(). 335 // 336 Mutex info_mu_; 337 std::string info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); 338 std::string info_service_config_json_ ABSL_GUARDED_BY(info_mu_); 339 340 // 341 // Fields guarded by a mutex, since they need to be accessed 342 // synchronously via grpc_channel_num_external_connectivity_watchers(). 343 // 344 mutable Mutex external_watchers_mu_; 345 std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>> 346 external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_); 347 }; 348 349 // 350 // ClientChannelFilter::LoadBalancedCall 351 // 352 353 // TODO(roth): As part of simplifying cancellation in the filter stack, 354 // this should no longer need to be ref-counted. 355 class ClientChannelFilter::LoadBalancedCall 356 : public InternallyRefCounted<LoadBalancedCall, UnrefCallDtor> { 357 public: 358 LoadBalancedCall(ClientChannelFilter* chand, Arena* arena, 359 absl::AnyInvocable<void()> on_commit, 360 bool is_transparent_retry); 361 ~LoadBalancedCall() override; 362 Orphan()363 void Orphan() override { Unref(); } 364 365 // Called by channel when removing a call from the list of queued calls. 366 void RemoveCallFromLbQueuedCallsLocked() 367 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); 368 369 // Called by the channel for each queued call when a new picker 370 // becomes available. 371 virtual void RetryPickLocked() 372 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0; 373 374 protected: chand()375 ClientChannelFilter* chand() const { return chand_; } call_attempt_tracer()376 ClientCallTracer::CallAttemptTracer* call_attempt_tracer() const { 377 return DownCast<ClientCallTracer::CallAttemptTracer*>( 378 arena_->GetContext<CallTracerInterface>()); 379 } connected_subchannel()380 ConnectedSubchannel* connected_subchannel() const { 381 return connected_subchannel_.get(); 382 } 383 LoadBalancingPolicy::SubchannelCallTrackerInterface* lb_subchannel_call_tracker()384 lb_subchannel_call_tracker() const { 385 return lb_subchannel_call_tracker_.get(); 386 } arena()387 Arena* arena() const { return arena_; } 388 Commit()389 void Commit() { 390 auto on_commit = std::move(on_commit_); 391 on_commit(); 392 } 393 394 // Attempts an LB pick. The following outcomes are possible: 395 // - No pick result is available yet. The call will be queued and 396 // nullopt will be returned. The channel will later call 397 // RetryPickLocked() when a new picker is available and the pick 398 // should be retried. 399 // - The pick failed. If the call is not wait_for_ready, a non-OK 400 // status will be returned. (If the call *is* wait_for_ready, 401 // it will be queued instead.) 402 // - The pick completed successfully. A connected subchannel is 403 // stored and an OK status will be returned. 404 absl::optional<absl::Status> PickSubchannel(bool was_queued); 405 406 void RecordCallCompletion(absl::Status status, 407 grpc_metadata_batch* recv_trailing_metadata, 408 grpc_transport_stream_stats* transport_stream_stats, 409 absl::string_view peer_address); 410 411 void RecordLatency(); 412 413 private: 414 class LbCallState; 415 class Metadata; 416 class BackendMetricAccessor; 417 418 virtual grpc_polling_entity* pollent() = 0; 419 virtual grpc_metadata_batch* send_initial_metadata() const = 0; 420 421 // Helper function for performing an LB pick with a specified picker. 422 // Returns true if the pick is complete. 423 bool PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker* picker, 424 grpc_error_handle* error); 425 // Adds the call to the channel's list of queued picks if not already present. 426 void AddCallToLbQueuedCallsLocked() 427 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); 428 429 // Called when adding the call to the LB queue. 430 virtual void OnAddToQueueLocked() 431 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_) = 0; 432 433 ClientChannelFilter* chand_; 434 435 absl::AnyInvocable<void()> on_commit_; 436 437 gpr_cycle_counter lb_call_start_time_ = gpr_get_cycle_counter(); 438 439 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 440 const BackendMetricData* backend_metric_data_ = nullptr; 441 std::unique_ptr<LoadBalancingPolicy::SubchannelCallTrackerInterface> 442 lb_subchannel_call_tracker_; 443 Arena* const arena_; 444 }; 445 446 class ClientChannelFilter::FilterBasedLoadBalancedCall final 447 : public ClientChannelFilter::LoadBalancedCall { 448 public: 449 // If on_call_destruction_complete is non-null, then it will be 450 // invoked once the LoadBalancedCall is completely destroyed. 451 // If it is null, then the caller is responsible for checking whether 452 // the LB call has a subchannel call and ensuring that the 453 // on_call_destruction_complete closure passed down from the surface 454 // is not invoked until after the subchannel call stack is destroyed. 455 FilterBasedLoadBalancedCall(ClientChannelFilter* chand, 456 const grpc_call_element_args& args, 457 grpc_polling_entity* pollent, 458 grpc_closure* on_call_destruction_complete, 459 absl::AnyInvocable<void()> on_commit, 460 bool is_transparent_retry); 461 ~FilterBasedLoadBalancedCall() override; 462 463 void Orphan() override; 464 465 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); 466 subchannel_call()467 RefCountedPtr<SubchannelCall> subchannel_call() const { 468 return subchannel_call_; 469 } 470 471 private: 472 class LbQueuedCallCanceller; 473 474 // Work-around for Windows compilers that don't allow nested classes 475 // to access protected members of the enclosing class's parent class. 476 using LoadBalancedCall::chand; 477 using LoadBalancedCall::Commit; 478 pollent()479 grpc_polling_entity* pollent() override { return pollent_; } send_initial_metadata()480 grpc_metadata_batch* send_initial_metadata() const override { 481 return pending_batches_[0] 482 ->payload->send_initial_metadata.send_initial_metadata; 483 } 484 485 // Returns the index into pending_batches_ to be used for batch. 486 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); 487 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch); 488 static void FailPendingBatchInCallCombiner(void* arg, 489 grpc_error_handle error); 490 // A predicate type and some useful implementations for PendingBatchesFail(). 491 typedef bool (*YieldCallCombinerPredicate)( 492 const CallCombinerClosureList& closures); YieldCallCombiner(const CallCombinerClosureList &)493 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) { 494 return true; 495 } NoYieldCallCombiner(const CallCombinerClosureList &)496 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) { 497 return false; 498 } YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)499 static bool YieldCallCombinerIfPendingBatchesFound( 500 const CallCombinerClosureList& closures) { 501 return closures.size() > 0; 502 } 503 // Fails all pending batches. 504 // If yield_call_combiner_predicate returns true, assumes responsibility for 505 // yielding the call combiner. 506 void PendingBatchesFail( 507 grpc_error_handle error, 508 YieldCallCombinerPredicate yield_call_combiner_predicate); 509 static void ResumePendingBatchInCallCombiner(void* arg, 510 grpc_error_handle ignored); 511 // Resumes all pending batches on subchannel_call_. 512 void PendingBatchesResume(); 513 514 static void SendInitialMetadataOnComplete(void* arg, grpc_error_handle error); 515 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); 516 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 517 518 // Called to perform a pick, both when the call is initially started 519 // and when it is queued and the channel gets a new picker. 520 void TryPick(bool was_queued); 521 522 void OnAddToQueueLocked() override 523 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); 524 525 void RetryPickLocked() override 526 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::lb_mu_); 527 528 void CreateSubchannelCall(); 529 530 // TODO(roth): Instead of duplicating these fields in every filter 531 // that uses any one of them, we should store them in the call 532 // context. This will save per-call memory overhead. 533 grpc_call_stack* owning_call_; 534 CallCombiner* call_combiner_; 535 grpc_polling_entity* pollent_; 536 grpc_closure* on_call_destruction_complete_; 537 absl::optional<Slice> peer_string_; 538 539 // Set when we get a cancel_stream op. 540 grpc_error_handle cancel_error_; 541 542 // Set when we fail inside the LB call. 543 grpc_error_handle failure_error_; 544 545 LbQueuedCallCanceller* lb_call_canceller_ 546 ABSL_GUARDED_BY(&ClientChannelFilter::lb_mu_) = nullptr; 547 548 RefCountedPtr<SubchannelCall> subchannel_call_; 549 550 // For intercepting recv_initial_metadata_ready. 551 grpc_metadata_batch* recv_initial_metadata_ = nullptr; 552 grpc_closure recv_initial_metadata_ready_; 553 grpc_closure* original_recv_initial_metadata_ready_ = nullptr; 554 555 // For intercepting recv_trailing_metadata_ready. 556 grpc_metadata_batch* recv_trailing_metadata_ = nullptr; 557 grpc_transport_stream_stats* transport_stream_stats_ = nullptr; 558 grpc_closure recv_trailing_metadata_ready_; 559 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; 560 561 // Batches are added to this list when received from above. 562 // They are removed when we are done handling the batch (i.e., when 563 // either we have invoked all of the batch's callbacks or we have 564 // passed the batch down to the subchannel call and are not 565 // intercepting any of its callbacks). 566 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; 567 }; 568 569 } // namespace grpc_core 570 571 #endif // GRPC_SRC_CORE_CLIENT_CHANNEL_CLIENT_CHANNEL_FILTER_H 572