1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H 18 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <map> 23 #include <memory> 24 #include <set> 25 #include <string> 26 27 #include "absl/status/status.h" 28 #include "absl/types/optional.h" 29 30 #include <grpc/support/log.h> 31 32 #include "src/core/ext/filters/client_channel/client_channel_factory.h" 33 #include "src/core/ext/filters/client_channel/config_selector.h" 34 #include "src/core/ext/filters/client_channel/dynamic_filters.h" 35 #include "src/core/ext/filters/client_channel/lb_policy.h" 36 #include "src/core/ext/filters/client_channel/resolver.h" 37 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" 38 #include "src/core/ext/filters/client_channel/retry_throttle.h" 39 #include "src/core/ext/filters/client_channel/service_config.h" 40 #include "src/core/ext/filters/client_channel/subchannel.h" 41 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" 42 #include "src/core/lib/gprpp/sync.h" 43 #include "src/core/lib/iomgr/error.h" 44 #include "src/core/lib/iomgr/polling_entity.h" 45 #include "src/core/lib/iomgr/work_serializer.h" 46 #include "src/core/lib/surface/channel.h" 47 #include "src/core/lib/transport/connectivity_state.h" 48 49 // 50 // Client channel filter 51 // 52 53 // A client channel is a channel that begins disconnected, and can connect 54 // to some endpoint on demand. If that endpoint disconnects, it will be 55 // connected to again later. 56 // 57 // Calls on a disconnected client channel are queued until a connection is 58 // established. 59 60 // Channel arg key for server URI string. 61 #define GRPC_ARG_SERVER_URI "grpc.server_uri" 62 63 // Channel arg containing a pointer to the ClientChannel object. 64 #define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel" 65 66 // Channel arg containing a pointer to the ServiceConfig object. 67 #define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj" 68 69 // Max number of batches that can be pending on a call at any given 70 // time. This includes one batch for each of the following ops: 71 // recv_initial_metadata 72 // send_initial_metadata 73 // recv_message 74 // send_message 75 // recv_trailing_metadata 76 // send_trailing_metadata 77 #define MAX_PENDING_BATCHES 6 78 79 namespace grpc_core { 80 81 class ClientChannel { 82 public: 83 static const grpc_channel_filter kFilterVtable; 84 85 class LoadBalancedCall; 86 87 // Returns the ClientChannel object from channel, or null if channel 88 // is not a client channel. 89 static ClientChannel* GetFromChannel(grpc_channel* channel); 90 91 grpc_connectivity_state CheckConnectivityState(bool try_to_connect); 92 93 // Starts a one-time connectivity state watch. When the channel's state 94 // becomes different from *state, sets *state to the new state and 95 // schedules on_complete. The watcher_timer_init callback is invoked as 96 // soon as the watch is actually started (i.e., after hopping into the 97 // client channel combiner). I/O will be serviced via pollent. 98 // 99 // This is intended to be used when starting a watch from outside of C-core 100 // via grpc_channel_watch_connectivity_state(). It should not be used 101 // by other callers. AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)102 void AddExternalConnectivityWatcher(grpc_polling_entity pollent, 103 grpc_connectivity_state* state, 104 grpc_closure* on_complete, 105 grpc_closure* watcher_timer_init) { 106 new ExternalConnectivityWatcher(this, pollent, state, on_complete, 107 watcher_timer_init); 108 } 109 110 // Cancels a pending external watcher previously added by 111 // AddExternalConnectivityWatcher(). CancelExternalConnectivityWatcher(grpc_closure * on_complete)112 void CancelExternalConnectivityWatcher(grpc_closure* on_complete) { 113 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( 114 this, on_complete, /*cancel=*/true); 115 } 116 NumExternalConnectivityWatchers()117 int NumExternalConnectivityWatchers() const { 118 MutexLock lock(&external_watchers_mu_); 119 return static_cast<int>(external_watchers_.size()); 120 } 121 122 // Starts and stops a connectivity watch. The watcher will be initially 123 // notified as soon as the state changes from initial_state and then on 124 // every subsequent state change until either the watch is stopped or 125 // it is notified that the state has changed to SHUTDOWN. 126 // 127 // This is intended to be used when starting watches from code inside of 128 // C-core (e.g., for a nested control plane channel for things like xds). 129 void AddConnectivityWatcher( 130 grpc_connectivity_state initial_state, 131 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher); 132 void RemoveConnectivityWatcher( 133 AsyncConnectivityStateWatcherInterface* watcher); 134 135 RefCountedPtr<LoadBalancedCall> CreateLoadBalancedCall( 136 const grpc_call_element_args& args, grpc_polling_entity* pollent, 137 grpc_closure* on_call_destruction_complete); 138 139 private: 140 class CallData; 141 class ResolverResultHandler; 142 class SubchannelWrapper; 143 class ClientChannelControlHelper; 144 class ConnectivityWatcherAdder; 145 class ConnectivityWatcherRemover; 146 147 // Represents a pending connectivity callback from an external caller 148 // via grpc_client_channel_watch_connectivity_state(). 149 class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { 150 public: 151 ExternalConnectivityWatcher(ClientChannel* chand, 152 grpc_polling_entity pollent, 153 grpc_connectivity_state* state, 154 grpc_closure* on_complete, 155 grpc_closure* watcher_timer_init); 156 157 ~ExternalConnectivityWatcher() override; 158 159 // Removes the watcher from the external_watchers_ map. 160 static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, 161 grpc_closure* on_complete, 162 bool cancel); 163 164 void Notify(grpc_connectivity_state state, 165 const absl::Status& /* status */) override; 166 167 void Cancel(); 168 169 private: 170 // Adds the watcher to state_tracker_. Consumes the ref that is passed to it 171 // from Start(). 172 void AddWatcherLocked() 173 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); 174 void RemoveWatcherLocked() 175 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); 176 177 ClientChannel* chand_; 178 grpc_polling_entity pollent_; 179 grpc_connectivity_state initial_state_; 180 grpc_connectivity_state* state_; 181 grpc_closure* on_complete_; 182 grpc_closure* watcher_timer_init_; 183 Atomic<bool> done_{false}; 184 }; 185 186 struct ResolverQueuedCall { 187 grpc_call_element* elem; 188 ResolverQueuedCall* next = nullptr; 189 }; 190 struct LbQueuedCall { 191 LoadBalancedCall* lb_call; 192 LbQueuedCall* next = nullptr; 193 }; 194 195 ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); 196 ~ClientChannel(); 197 198 // Filter vtable functions. 199 static grpc_error_handle Init(grpc_channel_element* elem, 200 grpc_channel_element_args* args); 201 static void Destroy(grpc_channel_element* elem); 202 static void StartTransportOp(grpc_channel_element* elem, 203 grpc_transport_op* op); 204 static void GetChannelInfo(grpc_channel_element* elem, 205 const grpc_channel_info* info); 206 207 // Note: Does NOT return a new ref. disconnect_error()208 grpc_error_handle disconnect_error() const { 209 return disconnect_error_.Load(MemoryOrder::ACQUIRE); 210 } 211 212 // Note: All methods with "Locked" suffix must be invoked from within 213 // work_serializer_. 214 215 void OnResolverResultChangedLocked(Resolver::Result result) 216 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 217 void OnResolverErrorLocked(grpc_error_handle error) 218 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 219 220 void CreateOrUpdateLbPolicyLocked( 221 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config, 222 Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 223 OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked( 224 const grpc_channel_args& args) 225 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 226 227 void UpdateStateAndPickerLocked( 228 grpc_connectivity_state state, const absl::Status& status, 229 const char* reason, 230 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) 231 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 232 233 void UpdateServiceConfigInControlPlaneLocked( 234 RefCountedPtr<ServiceConfig> service_config, 235 RefCountedPtr<ConfigSelector> config_selector, 236 const internal::ClientChannelGlobalParsedConfig* parsed_service_config, 237 const char* lb_policy_name) 238 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 239 240 void UpdateServiceConfigInDataPlaneLocked() 241 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 242 243 void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 244 void DestroyResolverAndLbPolicyLocked() 245 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 246 247 grpc_error_handle DoPingLocked(grpc_transport_op* op) 248 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 249 250 void StartTransportOpLocked(grpc_transport_op* op) 251 ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 252 253 void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); 254 255 // These methods all require holding resolution_mu_. 256 void AddResolverQueuedCall(ResolverQueuedCall* call, 257 grpc_polling_entity* pollent) 258 ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); 259 void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, 260 grpc_polling_entity* pollent) 261 ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); 262 263 // These methods all require holding data_plane_mu_. 264 void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) 265 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); 266 void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) 267 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); 268 RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane( 269 SubchannelInterface* subchannel) const 270 ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); 271 272 // 273 // Fields set at construction and never modified. 274 // 275 const bool deadline_checking_enabled_; 276 const bool enable_retries_; 277 grpc_channel_stack* owning_stack_; 278 ClientChannelFactory* client_channel_factory_; 279 const grpc_channel_args* channel_args_; 280 RefCountedPtr<ServiceConfig> default_service_config_; 281 std::string server_name_; 282 UniquePtr<char> target_uri_; 283 channelz::ChannelNode* channelz_node_; 284 grpc_pollset_set* interested_parties_; 285 286 // 287 // Fields related to name resolution. Guarded by resolution_mu_. 288 // 289 mutable Mutex resolution_mu_; 290 // Linked list of calls queued waiting for resolver result. 291 ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = 292 nullptr; 293 // Data from service config. 294 grpc_error_handle resolver_transient_failure_error_ 295 ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE; 296 bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; 297 RefCountedPtr<ServiceConfig> service_config_ ABSL_GUARDED_BY(resolution_mu_); 298 RefCountedPtr<ConfigSelector> config_selector_ 299 ABSL_GUARDED_BY(resolution_mu_); 300 RefCountedPtr<DynamicFilters> dynamic_filters_ 301 ABSL_GUARDED_BY(resolution_mu_); 302 303 // 304 // Fields used in the data plane. Guarded by data_plane_mu_. 305 // 306 mutable Mutex data_plane_mu_; 307 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_ 308 ABSL_GUARDED_BY(data_plane_mu_); 309 // Linked list of calls queued waiting for LB pick. 310 LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; 311 312 // 313 // Fields used in the control plane. Guarded by work_serializer. 314 // 315 std::shared_ptr<WorkSerializer> work_serializer_; 316 ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_); 317 OrphanablePtr<Resolver> resolver_ ABSL_GUARDED_BY(work_serializer_); 318 bool previous_resolution_contained_addresses_ 319 ABSL_GUARDED_BY(work_serializer_) = false; 320 RefCountedPtr<ServiceConfig> saved_service_config_ 321 ABSL_GUARDED_BY(work_serializer_); 322 RefCountedPtr<ConfigSelector> saved_config_selector_ 323 ABSL_GUARDED_BY(work_serializer_); 324 absl::optional<std::string> health_check_service_name_ 325 ABSL_GUARDED_BY(work_serializer_); 326 OrphanablePtr<LoadBalancingPolicy> lb_policy_ 327 ABSL_GUARDED_BY(work_serializer_); 328 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_ 329 ABSL_GUARDED_BY(work_serializer_); 330 // The number of SubchannelWrapper instances referencing a given Subchannel. 331 std::map<Subchannel*, int> subchannel_refcount_map_ 332 ABSL_GUARDED_BY(work_serializer_); 333 // The set of SubchannelWrappers that currently exist. 334 // No need to hold a ref, since the map is updated in the control-plane 335 // work_serializer when the SubchannelWrappers are created and destroyed. 336 std::set<SubchannelWrapper*> subchannel_wrappers_ 337 ABSL_GUARDED_BY(work_serializer_); 338 // Pending ConnectedSubchannel updates for each SubchannelWrapper. 339 // Updates are queued here in the control plane work_serializer and then 340 // applied in the data plane mutex when the picker is updated. 341 std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>> 342 pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_); 343 int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1; 344 345 // 346 // Fields accessed from both data plane mutex and control plane 347 // work_serializer. 348 // 349 Atomic<grpc_error_handle> disconnect_error_; 350 351 // 352 // Fields guarded by a mutex, since they need to be accessed 353 // synchronously via get_channel_info(). 354 // 355 Mutex info_mu_; 356 UniquePtr<char> info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); 357 UniquePtr<char> info_service_config_json_ ABSL_GUARDED_BY(info_mu_); 358 359 // 360 // Fields guarded by a mutex, since they need to be accessed 361 // synchronously via grpc_channel_num_external_connectivity_watchers(). 362 // 363 mutable Mutex external_watchers_mu_; 364 std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>> 365 external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_); 366 }; 367 368 // 369 // ClientChannel::LoadBalancedCall 370 // 371 372 // This object is ref-counted, but it cannot inherit from RefCounted<>, 373 // because it is allocated on the arena and can't free its memory when 374 // its refcount goes to zero. So instead, it manually implements the 375 // same API as RefCounted<>, so that it can be used with RefCountedPtr<>. 376 class ClientChannel::LoadBalancedCall 377 : public RefCounted<LoadBalancedCall, PolymorphicRefCount, kUnrefCallDtor> { 378 public: 379 // If on_call_destruction_complete is non-null, then it will be 380 // invoked once the LoadBalancedCall is completely destroyed. 381 // If it is null, then the caller is responsible for checking whether 382 // the LB call has a subchannel call and ensuring that the 383 // on_call_destruction_complete closure passed down from the surface 384 // is not invoked until after the subchannel call stack is destroyed. 385 LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args, 386 grpc_polling_entity* pollent, 387 grpc_closure* on_call_destruction_complete); 388 ~LoadBalancedCall() override; 389 390 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); 391 392 // Invoked by channel for queued LB picks when the picker is updated. 393 static void PickSubchannel(void* arg, grpc_error_handle error); 394 // Helper function for performing an LB pick while holding the data plane 395 // mutex. Returns true if the pick is complete, in which case the caller 396 // must invoke PickDone() or AsyncPickDone() with the returned error. 397 bool PickSubchannelLocked(grpc_error_handle* error) 398 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); 399 // Schedules a callback to process the completed pick. The callback 400 // will not run until after this method returns. 401 void AsyncPickDone(grpc_error_handle error); 402 subchannel_call()403 RefCountedPtr<SubchannelCall> subchannel_call() const { 404 return subchannel_call_; 405 } 406 407 private: 408 class LbQueuedCallCanceller; 409 class Metadata; 410 class LbCallState; 411 412 // Returns the index into pending_batches_ to be used for batch. 413 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); 414 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch); 415 static void FailPendingBatchInCallCombiner(void* arg, 416 grpc_error_handle error); 417 // A predicate type and some useful implementations for PendingBatchesFail(). 418 typedef bool (*YieldCallCombinerPredicate)( 419 const CallCombinerClosureList& closures); YieldCallCombiner(const CallCombinerClosureList &)420 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) { 421 return true; 422 } NoYieldCallCombiner(const CallCombinerClosureList &)423 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) { 424 return false; 425 } YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)426 static bool YieldCallCombinerIfPendingBatchesFound( 427 const CallCombinerClosureList& closures) { 428 return closures.size() > 0; 429 } 430 // Fails all pending batches. 431 // If yield_call_combiner_predicate returns true, assumes responsibility for 432 // yielding the call combiner. 433 void PendingBatchesFail( 434 grpc_error_handle error, 435 YieldCallCombinerPredicate yield_call_combiner_predicate); 436 static void ResumePendingBatchInCallCombiner(void* arg, 437 grpc_error_handle ignored); 438 // Resumes all pending batches on subchannel_call_. 439 void PendingBatchesResume(); 440 441 static void RecvTrailingMetadataReadyForLoadBalancingPolicy( 442 void* arg, grpc_error_handle error); 443 void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( 444 grpc_transport_stream_op_batch* batch); 445 446 void CreateSubchannelCall(); 447 // Invoked when a pick is completed, on both success or failure. 448 static void PickDone(void* arg, grpc_error_handle error); 449 // Removes the call from the channel's list of queued picks if present. 450 void MaybeRemoveCallFromLbQueuedCallsLocked() 451 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); 452 // Adds the call to the channel's list of queued picks if not already present. 453 void MaybeAddCallToLbQueuedCallsLocked() 454 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); 455 456 ClientChannel* chand_; 457 458 // TODO(roth): Instead of duplicating these fields in every filter 459 // that uses any one of them, we should store them in the call 460 // context. This will save per-call memory overhead. 461 grpc_slice path_; // Request path. 462 gpr_cycle_counter call_start_time_; 463 grpc_millis deadline_; 464 Arena* arena_; 465 grpc_call_stack* owning_call_; 466 CallCombiner* call_combiner_; 467 grpc_call_context_element* call_context_; 468 grpc_polling_entity* pollent_; 469 grpc_closure* on_call_destruction_complete_; 470 471 // Set when we get a cancel_stream op. 472 grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; 473 474 // Set when we fail inside the LB call. 475 grpc_error_handle failure_error_ = GRPC_ERROR_NONE; 476 477 grpc_closure pick_closure_; 478 479 // Accessed while holding ClientChannel::data_plane_mu_. 480 ClientChannel::LbQueuedCall queued_call_ 481 ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_); 482 bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = 483 false; 484 LbQueuedCallCanceller* lb_call_canceller_ 485 ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; 486 487 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 488 const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; 489 std::function<void(grpc_error_handle, LoadBalancingPolicy::MetadataInterface*, 490 LoadBalancingPolicy::CallState*)> 491 lb_recv_trailing_metadata_ready_; 492 493 RefCountedPtr<SubchannelCall> subchannel_call_; 494 495 // For intercepting recv_trailing_metadata_ready for the LB policy. 496 grpc_metadata_batch* recv_trailing_metadata_ = nullptr; 497 grpc_closure recv_trailing_metadata_ready_; 498 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; 499 500 // Batches are added to this list when received from above. 501 // They are removed when we are done handling the batch (i.e., when 502 // either we have invoked all of the batch's callbacks or we have 503 // passed the batch down to the subchannel call and are not 504 // intercepting any of its callbacks). 505 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; 506 }; 507 508 } // namespace grpc_core 509 510 #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H 511