1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H 20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <deque> 25 26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h" 27 #include "src/core/ext/filters/client_channel/connector.h" 28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" 29 #include "src/core/lib/backoff/backoff.h" 30 #include "src/core/lib/channel/channel_stack.h" 31 #include "src/core/lib/gpr/time_precise.h" 32 #include "src/core/lib/gprpp/arena.h" 33 #include "src/core/lib/gprpp/dual_ref_counted.h" 34 #include "src/core/lib/gprpp/ref_counted.h" 35 #include "src/core/lib/gprpp/ref_counted_ptr.h" 36 #include "src/core/lib/gprpp/sync.h" 37 #include "src/core/lib/iomgr/polling_entity.h" 38 #include "src/core/lib/iomgr/timer.h" 39 #include "src/core/lib/transport/connectivity_state.h" 40 #include "src/core/lib/transport/metadata.h" 41 42 // Channel arg containing a URI indicating the address to connect to. 43 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" 44 45 namespace grpc_core { 46 47 class SubchannelCall; 48 49 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { 50 public: 51 ConnectedSubchannel( 52 grpc_channel_stack* channel_stack, const grpc_channel_args* args, 53 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel); 54 ~ConnectedSubchannel() override; 55 56 void StartWatch(grpc_pollset_set* interested_parties, 57 OrphanablePtr<ConnectivityStateWatcherInterface> watcher); 58 59 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); 60 channel_stack()61 grpc_channel_stack* channel_stack() const { return channel_stack_; } args()62 const grpc_channel_args* args() const { return args_; } channelz_subchannel()63 channelz::SubchannelNode* channelz_subchannel() const { 64 return channelz_subchannel_.get(); 65 } 66 67 size_t GetInitialCallSizeEstimate() const; 68 69 private: 70 grpc_channel_stack* channel_stack_; 71 grpc_channel_args* args_; 72 // ref counted pointer to the channelz node in this connected subchannel's 73 // owning subchannel. 74 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_; 75 }; 76 77 // Implements the interface of RefCounted<>. 78 class SubchannelCall { 79 public: 80 struct Args { 81 RefCountedPtr<ConnectedSubchannel> connected_subchannel; 82 grpc_polling_entity* pollent; 83 grpc_slice path; 84 gpr_cycle_counter start_time; 85 grpc_millis deadline; 86 Arena* arena; 87 grpc_call_context_element* context; 88 CallCombiner* call_combiner; 89 }; 90 static RefCountedPtr<SubchannelCall> Create(Args args, 91 grpc_error_handle* error); 92 93 // Continues processing a transport stream op batch. 94 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); 95 96 // Returns the call stack of the subchannel call. 97 grpc_call_stack* GetCallStack(); 98 99 // Sets the 'then_schedule_closure' argument for call stack destruction. 100 // Must be called once per call. 101 void SetAfterCallStackDestroy(grpc_closure* closure); 102 103 // Interface of RefCounted<>. 104 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT; 105 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location, 106 const char* reason) GRPC_MUST_USE_RESULT; 107 // When refcount drops to 0, destroys itself and the associated call stack, 108 // but does NOT free the memory because it's in the call arena. 109 void Unref(); 110 void Unref(const DebugLocation& location, const char* reason); 111 112 private: 113 // Allow RefCountedPtr<> to access IncrementRefCount(). 114 template <typename T> 115 friend class RefCountedPtr; 116 117 SubchannelCall(Args args, grpc_error_handle* error); 118 119 // If channelz is enabled, intercepts recv_trailing so that we may check the 120 // status and associate it to a subchannel. 121 void MaybeInterceptRecvTrailingMetadata( 122 grpc_transport_stream_op_batch* batch); 123 124 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 125 126 // Interface of RefCounted<>. 127 void IncrementRefCount(); 128 void IncrementRefCount(const DebugLocation& location, const char* reason); 129 130 static void Destroy(void* arg, grpc_error_handle error); 131 132 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 133 grpc_closure* after_call_stack_destroy_ = nullptr; 134 // State needed to support channelz interception of recv trailing metadata. 135 grpc_closure recv_trailing_metadata_ready_; 136 grpc_closure* original_recv_trailing_metadata_ = nullptr; 137 grpc_metadata_batch* recv_trailing_metadata_ = nullptr; 138 grpc_millis deadline_; 139 }; 140 141 // A subchannel that knows how to connect to exactly one target address. It 142 // provides a target for load balancing. 143 // 144 // Note that this is the "real" subchannel implementation, whose API is 145 // different from the SubchannelInterface that is exposed to LB policy 146 // implementations. The client channel provides an adaptor class 147 // (SubchannelWrapper) that "converts" between the two. 148 class Subchannel : public DualRefCounted<Subchannel> { 149 public: 150 class ConnectivityStateWatcherInterface 151 : public RefCounted<ConnectivityStateWatcherInterface> { 152 public: 153 struct ConnectivityStateChange { 154 grpc_connectivity_state state; 155 absl::Status status; 156 RefCountedPtr<ConnectedSubchannel> connected_subchannel; 157 }; 158 159 ~ConnectivityStateWatcherInterface() override = default; 160 161 // Will be invoked whenever the subchannel's connectivity state 162 // changes. There will be only one invocation of this method on a 163 // given watcher instance at any given time. 164 // Implementations should call PopConnectivityStateChange to get the next 165 // connectivity state change. 166 virtual void OnConnectivityStateChange() = 0; 167 168 virtual grpc_pollset_set* interested_parties() = 0; 169 170 // Enqueues connectivity state change notifications. 171 // When the state changes to READY, connected_subchannel will 172 // contain a ref to the connected subchannel. When it changes from 173 // READY to some other state, the implementation must release its 174 // ref to the connected subchannel. 175 // TODO(yashkt): This is currently needed to send the state updates in the 176 // right order when asynchronously notifying. This will no longer be 177 // necessary when we have access to EventManager. 178 void PushConnectivityStateChange(ConnectivityStateChange state_change); 179 180 // Dequeues connectivity state change notifications. 181 ConnectivityStateChange PopConnectivityStateChange(); 182 183 private: 184 Mutex mu_; // protects the queue 185 // Keeps track of the updates that the watcher instance must be notified of. 186 // TODO(yashkt): This is currently needed to send the state updates in the 187 // right order when asynchronously notifying. This will no longer be 188 // necessary when we have access to EventManager. 189 std::deque<ConnectivityStateChange> connectivity_state_queue_ 190 ABSL_GUARDED_BY(&mu_); 191 }; 192 193 // Creates a subchannel given \a connector and \a args. 194 static RefCountedPtr<Subchannel> Create( 195 OrphanablePtr<SubchannelConnector> connector, 196 const grpc_channel_args* args); 197 198 // The ctor and dtor are not intended to use directly. 199 Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector, 200 const grpc_channel_args* args); 201 ~Subchannel() override; 202 203 // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time 204 // is larger than the subchannel's current keepalive time. The updated value 205 // will have an affect when the subchannel creates a new ConnectedSubchannel. 206 void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_); 207 208 // Gets the string representing the subchannel address. 209 // Caller doesn't take ownership. 210 const char* GetTargetAddress(); 211 channel_args()212 const grpc_channel_args* channel_args() const { return args_; } 213 214 channelz::SubchannelNode* channelz_node(); 215 216 // Returns the current connectivity state of the subchannel. 217 // If health_check_service_name is non-null, the returned connectivity 218 // state will be based on the state reported by the backend for that 219 // service name. 220 // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel. 221 grpc_connectivity_state CheckConnectivityState( 222 const absl::optional<std::string>& health_check_service_name, 223 RefCountedPtr<ConnectedSubchannel>* connected_subchannel) 224 ABSL_LOCKS_EXCLUDED(mu_); 225 226 // Starts watching the subchannel's connectivity state. 227 // The first callback to the watcher will be delivered when the 228 // subchannel's connectivity state becomes a value other than 229 // initial_state, which may happen immediately. 230 // Subsequent callbacks will be delivered as the subchannel's state 231 // changes. 232 // The watcher will be destroyed either when the subchannel is 233 // destroyed or when CancelConnectivityStateWatch() is called. 234 void WatchConnectivityState( 235 grpc_connectivity_state initial_state, 236 const absl::optional<std::string>& health_check_service_name, 237 RefCountedPtr<ConnectivityStateWatcherInterface> watcher) 238 ABSL_LOCKS_EXCLUDED(mu_); 239 240 // Cancels a connectivity state watch. 241 // If the watcher has already been destroyed, this is a no-op. 242 void CancelConnectivityStateWatch( 243 const absl::optional<std::string>& health_check_service_name, 244 ConnectivityStateWatcherInterface* watcher) ABSL_LOCKS_EXCLUDED(mu_); 245 246 // Attempt to connect to the backend. Has no effect if already connected. 247 void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_); 248 249 // Resets the connection backoff of the subchannel. 250 // TODO(roth): Move connection backoff out of subchannels and up into LB 251 // policy code (probably by adding a SubchannelGroup between 252 // SubchannelList and SubchannelData), at which point this method can 253 // go away. 254 void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_); 255 256 // Tears down any existing connection, and arranges for destruction 257 void Orphan() override ABSL_LOCKS_EXCLUDED(mu_); 258 259 // Returns a new channel arg encoding the subchannel address as a URI 260 // string. Caller is responsible for freeing the string. 261 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr); 262 263 // Returns the URI string from the subchannel address arg in \a args. 264 static const char* GetUriFromSubchannelAddressArg( 265 const grpc_channel_args* args); 266 267 // Sets \a addr from the subchannel address arg in \a args. 268 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args, 269 grpc_resolved_address* addr); 270 271 private: 272 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring 273 // the subchannel's state. 274 class ConnectivityStateWatcherList { 275 public: ~ConnectivityStateWatcherList()276 ~ConnectivityStateWatcherList() { Clear(); } 277 278 void AddWatcherLocked( 279 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 280 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher); 281 282 // Notifies all watchers in the list about a change to state. 283 void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state, 284 const absl::Status& status); 285 Clear()286 void Clear() { watchers_.clear(); } 287 empty()288 bool empty() const { return watchers_.empty(); } 289 290 private: 291 // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can 292 // be a set instead of a map. 293 std::map<ConnectivityStateWatcherInterface*, 294 RefCountedPtr<ConnectivityStateWatcherInterface>> 295 watchers_; 296 }; 297 298 // A map that tracks ConnectivityStateWatcherInterfaces using a particular 299 // health check service name. 300 // 301 // There is one entry in the map for each health check service name. 302 // Entries exist only as long as there are watchers using the 303 // corresponding service name. 304 // 305 // A health check client is maintained only while the subchannel is in 306 // state READY. 307 class HealthWatcherMap { 308 public: 309 void AddWatcherLocked( 310 WeakRefCountedPtr<Subchannel> subchannel, 311 grpc_connectivity_state initial_state, 312 const std::string& health_check_service_name, 313 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 314 void RemoveWatcherLocked(const std::string& health_check_service_name, 315 ConnectivityStateWatcherInterface* watcher); 316 317 // Notifies the watcher when the subchannel's state changes. 318 void NotifyLocked(grpc_connectivity_state state, const absl::Status& status) 319 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_); 320 321 grpc_connectivity_state CheckConnectivityStateLocked( 322 Subchannel* subchannel, const std::string& health_check_service_name) 323 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_); 324 325 void ShutdownLocked(); 326 327 private: 328 class HealthWatcher; 329 330 std::map<std::string, OrphanablePtr<HealthWatcher>> map_; 331 }; 332 333 class ConnectedSubchannelStateWatcher; 334 335 class AsyncWatcherNotifierLocked; 336 337 // Sets the subchannel's connectivity state to \a state. 338 void SetConnectivityStateLocked(grpc_connectivity_state state, 339 const absl::Status& status) 340 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 341 342 // Methods for connection. 343 void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 344 static void OnRetryAlarm(void* arg, grpc_error_handle error) 345 ABSL_LOCKS_EXCLUDED(mu_); 346 void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 347 static void OnConnectingFinished(void* arg, grpc_error_handle error) 348 ABSL_LOCKS_EXCLUDED(mu_); 349 bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 350 351 // The subchannel pool this subchannel is in. 352 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; 353 // TODO(juanlishen): Consider using args_ as key_ directly. 354 // Subchannel key that identifies this subchannel in the subchannel pool. 355 const SubchannelKey key_; 356 // Channel args. 357 grpc_channel_args* args_; 358 // pollset_set tracking who's interested in a connection being setup. 359 grpc_pollset_set* pollset_set_; 360 // Channelz tracking. 361 RefCountedPtr<channelz::SubchannelNode> channelz_node_; 362 363 // Connection state. 364 OrphanablePtr<SubchannelConnector> connector_; 365 SubchannelConnector::Result connecting_result_; 366 grpc_closure on_connecting_finished_; 367 368 // Protects the other members. 369 Mutex mu_; 370 371 // Active connection, or null. 372 RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_); 373 bool connecting_ ABSL_GUARDED_BY(mu_) = false; 374 bool disconnected_ ABSL_GUARDED_BY(mu_) = false; 375 376 // Connectivity state tracking. 377 grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE; 378 absl::Status status_ ABSL_GUARDED_BY(mu_); 379 // The list of watchers without a health check service name. 380 ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_); 381 // The map of watchers with health check service names. 382 HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_); 383 384 // Backoff state. 385 BackOff backoff_ ABSL_GUARDED_BY(mu_); 386 grpc_millis next_attempt_deadline_ ABSL_GUARDED_BY(mu_); 387 grpc_millis min_connect_timeout_ms_ ABSL_GUARDED_BY(mu_); 388 bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false; 389 390 // Retry alarm. 391 grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_); 392 grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_); 393 bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false; 394 // reset_backoff() was called while alarm was pending. 395 bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false; 396 // Keepalive time period (-1 for unset) 397 int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1; 398 }; 399 400 } // namespace grpc_core 401 402 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */ 403