1 // 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 // 16 17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H 18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H 19 20 #include <grpc/support/port_platform.h> 21 22 #include <stddef.h> 23 24 #include <functional> 25 #include <map> 26 #include <memory> 27 28 #include "absl/base/thread_annotations.h" 29 #include "absl/status/status.h" 30 31 #include <grpc/event_engine/event_engine.h> 32 #include <grpc/impl/connectivity_state.h> 33 34 #include "src/core/client_channel/client_channel_channelz.h" 35 #include "src/core/client_channel/connector.h" 36 #include "src/core/client_channel/subchannel_pool_interface.h" 37 #include "src/core/lib/backoff/backoff.h" 38 #include "src/core/lib/channel/channel_args.h" 39 #include "src/core/lib/channel/channel_fwd.h" 40 #include "src/core/lib/channel/context.h" 41 #include "src/core/lib/gpr/time_precise.h" 42 #include "src/core/lib/gprpp/debug_location.h" 43 #include "src/core/lib/gprpp/dual_ref_counted.h" 44 #include "src/core/lib/gprpp/orphanable.h" 45 #include "src/core/lib/gprpp/ref_counted.h" 46 #include "src/core/lib/gprpp/ref_counted_ptr.h" 47 #include "src/core/lib/gprpp/sync.h" 48 #include "src/core/lib/gprpp/time.h" 49 #include "src/core/lib/gprpp/unique_type_name.h" 50 #include "src/core/lib/gprpp/work_serializer.h" 51 #include "src/core/lib/iomgr/call_combiner.h" 52 #include "src/core/lib/iomgr/closure.h" 53 #include "src/core/lib/iomgr/error.h" 54 #include "src/core/lib/iomgr/iomgr_fwd.h" 55 #include "src/core/lib/iomgr/polling_entity.h" 56 #include "src/core/lib/iomgr/resolved_address.h" 57 #include "src/core/lib/promise/arena_promise.h" 58 #include "src/core/lib/resource_quota/arena.h" 59 #include "src/core/lib/slice/slice.h" 60 #include "src/core/lib/transport/connectivity_state.h" 61 #include "src/core/lib/transport/metadata_batch.h" 62 #include "src/core/lib/transport/transport.h" 63 64 namespace grpc_core { 65 66 class SubchannelCall; 67 68 class ConnectedSubchannel final : public RefCounted<ConnectedSubchannel> { 69 public: 70 ConnectedSubchannel( 71 grpc_channel_stack* channel_stack, const ChannelArgs& args, 72 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel); 73 ~ConnectedSubchannel() override; 74 75 void StartWatch(grpc_pollset_set* interested_parties, 76 OrphanablePtr<ConnectivityStateWatcherInterface> watcher); 77 78 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); 79 channel_stack()80 grpc_channel_stack* channel_stack() const { return channel_stack_; } args()81 const ChannelArgs& args() const { return args_; } channelz_subchannel()82 channelz::SubchannelNode* channelz_subchannel() const { 83 return channelz_subchannel_.get(); 84 } 85 86 size_t GetInitialCallSizeEstimate() const; 87 88 ArenaPromise<ServerMetadataHandle> MakeCallPromise(CallArgs call_args); 89 90 private: 91 grpc_channel_stack* channel_stack_; 92 ChannelArgs args_; 93 // ref counted pointer to the channelz node in this connected subchannel's 94 // owning subchannel. 95 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_; 96 }; 97 98 // Implements the interface of RefCounted<>. 99 class SubchannelCall final { 100 public: 101 struct Args { 102 RefCountedPtr<ConnectedSubchannel> connected_subchannel; 103 grpc_polling_entity* pollent; 104 Slice path; 105 gpr_cycle_counter start_time; 106 Timestamp deadline; 107 Arena* arena; 108 grpc_call_context_element* context; 109 CallCombiner* call_combiner; 110 }; 111 static RefCountedPtr<SubchannelCall> Create(Args args, 112 grpc_error_handle* error); 113 114 // Continues processing a transport stream op batch. 115 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); 116 117 // Returns the call stack of the subchannel call. 118 grpc_call_stack* GetCallStack(); 119 120 // Sets the 'then_schedule_closure' argument for call stack destruction. 121 // Must be called once per call. 122 void SetAfterCallStackDestroy(grpc_closure* closure); 123 124 // Interface of RefCounted<>. 125 GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref(); 126 GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref( 127 const DebugLocation& location, const char* reason); 128 // When refcount drops to 0, destroys itself and the associated call stack, 129 // but does NOT free the memory because it's in the call arena. 130 void Unref(); 131 void Unref(const DebugLocation& location, const char* reason); 132 133 private: 134 // Allow RefCountedPtr<> to access IncrementRefCount(). 135 template <typename T> 136 friend class RefCountedPtr; 137 138 SubchannelCall(Args args, grpc_error_handle* error); 139 140 // If channelz is enabled, intercepts recv_trailing so that we may check the 141 // status and associate it to a subchannel. 142 void MaybeInterceptRecvTrailingMetadata( 143 grpc_transport_stream_op_batch* batch); 144 145 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 146 147 // Interface of RefCounted<>. 148 void IncrementRefCount(); 149 void IncrementRefCount(const DebugLocation& location, const char* reason); 150 151 static void Destroy(void* arg, grpc_error_handle error); 152 153 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 154 grpc_closure* after_call_stack_destroy_ = nullptr; 155 // State needed to support channelz interception of recv trailing metadata. 156 grpc_closure recv_trailing_metadata_ready_; 157 grpc_closure* original_recv_trailing_metadata_ = nullptr; 158 grpc_metadata_batch* recv_trailing_metadata_ = nullptr; 159 Timestamp deadline_; 160 }; 161 162 // A subchannel that knows how to connect to exactly one target address. It 163 // provides a target for load balancing. 164 // 165 // Note that this is the "real" subchannel implementation, whose API is 166 // different from the SubchannelInterface that is exposed to LB policy 167 // implementations. The client channel provides an adaptor class 168 // (SubchannelWrapper) that "converts" between the two. 169 class Subchannel final : public DualRefCounted<Subchannel> { 170 public: 171 // TODO(roth): Once we remove pollset_set, consider whether this can 172 // just use the normal AsyncConnectivityStateWatcherInterface API. 173 class ConnectivityStateWatcherInterface 174 : public RefCounted<ConnectivityStateWatcherInterface> { 175 public: 176 // Invoked whenever the subchannel's connectivity state changes. 177 // There will be only one invocation of this method on a given watcher 178 // instance at any given time. 179 // A ref to the watcher is passed in here so that the implementation 180 // can unref it in the appropriate synchronization context (e.g., 181 // inside a WorkSerializer). 182 // TODO(roth): Figure out a cleaner way to guarantee that the ref is 183 // released in the right context. 184 virtual void OnConnectivityStateChange( 185 RefCountedPtr<ConnectivityStateWatcherInterface> self, 186 grpc_connectivity_state state, const absl::Status& status) = 0; 187 188 virtual grpc_pollset_set* interested_parties() = 0; 189 }; 190 191 // A base class for producers of subchannel-specific data. 192 // Implementations will typically add their own methods as needed. 193 class DataProducerInterface : public DualRefCounted<DataProducerInterface> { 194 public: 195 // A unique identifier for this implementation. 196 // Only one producer may be registered under a given type name on a 197 // given subchannel at any given time. 198 // Note that we use the pointer address instead of the string 199 // contents for uniqueness; all instances for a given implementation 200 // are expected to return the same string *instance*, not just the 201 // same string contents. 202 virtual UniqueTypeName type() const = 0; 203 }; 204 205 // Creates a subchannel. 206 static RefCountedPtr<Subchannel> Create( 207 OrphanablePtr<SubchannelConnector> connector, 208 const grpc_resolved_address& address, const ChannelArgs& args); 209 210 // The ctor and dtor are not intended to use directly. 211 Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector, 212 const ChannelArgs& args); 213 ~Subchannel() override; 214 215 // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time 216 // is larger than the subchannel's current keepalive time. The updated value 217 // will have an affect when the subchannel creates a new ConnectedSubchannel. 218 void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_); 219 pollset_set()220 grpc_pollset_set* pollset_set() const { return pollset_set_; } 221 222 channelz::SubchannelNode* channelz_node(); 223 address()224 const grpc_resolved_address& address() const { return key_.address(); } 225 226 // Starts watching the subchannel's connectivity state. 227 // The first callback to the watcher will be delivered ~immediately. 228 // Subsequent callbacks will be delivered as the subchannel's state 229 // changes. 230 // The watcher will be destroyed either when the subchannel is 231 // destroyed or when CancelConnectivityStateWatch() is called. 232 void WatchConnectivityState( 233 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) 234 ABSL_LOCKS_EXCLUDED(mu_); 235 236 // Cancels a connectivity state watch. 237 // If the watcher has already been destroyed, this is a no-op. 238 void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher) 239 ABSL_LOCKS_EXCLUDED(mu_); 240 connected_subchannel()241 RefCountedPtr<ConnectedSubchannel> connected_subchannel() 242 ABSL_LOCKS_EXCLUDED(mu_) { 243 MutexLock lock(&mu_); 244 return connected_subchannel_; 245 } 246 247 // Attempt to connect to the backend. Has no effect if already connected. 248 void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_); 249 250 // Resets the connection backoff of the subchannel. 251 void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_); 252 253 // Access to data producer map. 254 // We do not hold refs to the data producer; the implementation is 255 // expected to register itself upon construction and remove itself 256 // upon destruction. 257 // 258 // Looks up the current data producer for type and invokes get_or_add() 259 // with a pointer to that producer in the map. The get_or_add() function 260 // can modify the pointed-to value to update the map. This provides a 261 // way to either re-use an existing producer or register a new one in 262 // a non-racy way. 263 void GetOrAddDataProducer( 264 UniqueTypeName type, 265 std::function<void(DataProducerInterface**)> get_or_add) 266 ABSL_LOCKS_EXCLUDED(mu_); 267 // Removes the data producer from the map, if the current producer for 268 // this type is the specified producer. 269 void RemoveDataProducer(DataProducerInterface* data_producer) 270 ABSL_LOCKS_EXCLUDED(mu_); 271 event_engine()272 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine() { 273 return event_engine_; 274 } 275 276 private: 277 // Tears down any existing connection, and arranges for destruction 278 void Orphaned() override ABSL_LOCKS_EXCLUDED(mu_); 279 280 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring 281 // the subchannel's state. 282 class ConnectivityStateWatcherList final { 283 public: ConnectivityStateWatcherList(Subchannel * subchannel)284 explicit ConnectivityStateWatcherList(Subchannel* subchannel) 285 : subchannel_(subchannel) {} 286 ~ConnectivityStateWatcherList()287 ~ConnectivityStateWatcherList() { Clear(); } 288 289 void AddWatcherLocked( 290 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 291 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher); 292 293 // Notifies all watchers in the list about a change to state. 294 void NotifyLocked(grpc_connectivity_state state, 295 const absl::Status& status); 296 Clear()297 void Clear() { watchers_.clear(); } 298 empty()299 bool empty() const { return watchers_.empty(); } 300 301 private: 302 Subchannel* subchannel_; 303 // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can 304 // be a set instead of a map. 305 std::map<ConnectivityStateWatcherInterface*, 306 RefCountedPtr<ConnectivityStateWatcherInterface>> 307 watchers_; 308 }; 309 310 class ConnectedSubchannelStateWatcher; 311 312 // Sets the subchannel's connectivity state to \a state. 313 void SetConnectivityStateLocked(grpc_connectivity_state state, 314 const absl::Status& status) 315 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 316 317 // Methods for connection. 318 void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_); 319 void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 320 void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 321 static void OnConnectingFinished(void* arg, grpc_error_handle error) 322 ABSL_LOCKS_EXCLUDED(mu_); 323 void OnConnectingFinishedLocked(grpc_error_handle error) 324 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 325 bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 326 327 // The subchannel pool this subchannel is in. 328 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; 329 // Subchannel key that identifies this subchannel in the subchannel pool. 330 const SubchannelKey key_; 331 // Actual address to connect to. May be different than the address in 332 // key_ if overridden by proxy mapper. 333 grpc_resolved_address address_for_connect_; 334 // Channel args. 335 ChannelArgs args_; 336 // pollset_set tracking who's interested in a connection being setup. 337 grpc_pollset_set* pollset_set_; 338 // Channelz tracking. 339 RefCountedPtr<channelz::SubchannelNode> channelz_node_; 340 // Minimum connection timeout. 341 Duration min_connect_timeout_; 342 343 // Connection state. 344 OrphanablePtr<SubchannelConnector> connector_; 345 SubchannelConnector::Result connecting_result_; 346 grpc_closure on_connecting_finished_; 347 348 // Protects the other members. 349 Mutex mu_; 350 351 bool shutdown_ ABSL_GUARDED_BY(mu_) = false; 352 353 // Connectivity state tracking. 354 // Note that the connectivity state implies the state of the 355 // Subchannel object: 356 // - IDLE: no retry timer pending, can start a connection attempt at any time 357 // - CONNECTING: connection attempt in progress 358 // - READY: connection attempt succeeded, connected_subchannel_ created 359 // - TRANSIENT_FAILURE: connection attempt failed, retry timer pending 360 grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE; 361 absl::Status status_ ABSL_GUARDED_BY(mu_); 362 // The list of connectivity state watchers. 363 ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_); 364 // Used for sending connectivity state notifications. 365 WorkSerializer work_serializer_; 366 367 // Active connection, or null. 368 RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_); 369 370 // Backoff state. 371 BackOff backoff_ ABSL_GUARDED_BY(mu_); 372 Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_); 373 grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_ 374 ABSL_GUARDED_BY(mu_); 375 376 // Keepalive time period (-1 for unset) 377 int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1; 378 379 // Data producer map. 380 std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_ 381 ABSL_GUARDED_BY(mu_); 382 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_; 383 }; 384 385 } // namespace grpc_core 386 387 #endif // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H 388