1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H 20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <deque> 25 26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h" 27 #include "src/core/ext/filters/client_channel/connector.h" 28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" 29 #include "src/core/lib/backoff/backoff.h" 30 #include "src/core/lib/channel/channel_stack.h" 31 #include "src/core/lib/gpr/time_precise.h" 32 #include "src/core/lib/gprpp/arena.h" 33 #include "src/core/lib/gprpp/ref_counted.h" 34 #include "src/core/lib/gprpp/ref_counted_ptr.h" 35 #include "src/core/lib/gprpp/sync.h" 36 #include "src/core/lib/iomgr/polling_entity.h" 37 #include "src/core/lib/iomgr/timer.h" 38 #include "src/core/lib/transport/connectivity_state.h" 39 #include "src/core/lib/transport/metadata.h" 40 41 // Channel arg containing a URI indicating the address to connect to. 42 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address" 43 44 // For debugging refcounting. 45 #ifndef NDEBUG 46 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref(__FILE__, __LINE__, (r)) 47 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() 48 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref(__FILE__, __LINE__, (r)) 49 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef(__FILE__, __LINE__, (r)) 50 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref(__FILE__, __LINE__, (r)) 51 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ 52 const char *file, int line, const char *reason 53 #define GRPC_SUBCHANNEL_REF_REASON reason 54 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS \ 55 , GRPC_SUBCHANNEL_REF_EXTRA_ARGS, const char* purpose 56 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) , file, line, reason, x 57 #else 58 #define GRPC_SUBCHANNEL_REF(p, r) (p)->Ref() 59 #define GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(p, r) (p)->RefFromWeakRef() 60 #define GRPC_SUBCHANNEL_UNREF(p, r) (p)->Unref() 61 #define GRPC_SUBCHANNEL_WEAK_REF(p, r) (p)->WeakRef() 62 #define GRPC_SUBCHANNEL_WEAK_UNREF(p, r) (p)->WeakUnref() 63 #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS 64 #define GRPC_SUBCHANNEL_REF_REASON "" 65 #define GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS 66 #define GRPC_SUBCHANNEL_REF_MUTATE_PURPOSE(x) 67 #endif 68 69 namespace grpc_core { 70 71 class SubchannelCall; 72 73 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> { 74 public: 75 ConnectedSubchannel( 76 grpc_channel_stack* channel_stack, const grpc_channel_args* args, 77 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel); 78 ~ConnectedSubchannel() override; 79 80 void StartWatch(grpc_pollset_set* interested_parties, 81 OrphanablePtr<ConnectivityStateWatcherInterface> watcher); 82 83 void Ping(grpc_closure* on_initiate, grpc_closure* on_ack); 84 channel_stack()85 grpc_channel_stack* channel_stack() const { return channel_stack_; } args()86 const grpc_channel_args* args() const { return args_; } channelz_subchannel()87 channelz::SubchannelNode* channelz_subchannel() const { 88 return channelz_subchannel_.get(); 89 } 90 91 size_t GetInitialCallSizeEstimate() const; 92 93 private: 94 grpc_channel_stack* channel_stack_; 95 grpc_channel_args* args_; 96 // ref counted pointer to the channelz node in this connected subchannel's 97 // owning subchannel. 98 RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_; 99 }; 100 101 // Implements the interface of RefCounted<>. 102 class SubchannelCall { 103 public: 104 struct Args { 105 RefCountedPtr<ConnectedSubchannel> connected_subchannel; 106 grpc_polling_entity* pollent; 107 grpc_slice path; 108 gpr_cycle_counter start_time; 109 grpc_millis deadline; 110 Arena* arena; 111 grpc_call_context_element* context; 112 CallCombiner* call_combiner; 113 }; 114 static RefCountedPtr<SubchannelCall> Create(Args args, grpc_error** error); 115 116 // Continues processing a transport stream op batch. 117 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); 118 119 // Returns the call stack of the subchannel call. 120 grpc_call_stack* GetCallStack(); 121 122 // Sets the 'then_schedule_closure' argument for call stack destruction. 123 // Must be called once per call. 124 void SetAfterCallStackDestroy(grpc_closure* closure); 125 126 // Interface of RefCounted<>. 127 RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT; 128 RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location, 129 const char* reason) GRPC_MUST_USE_RESULT; 130 // When refcount drops to 0, destroys itself and the associated call stack, 131 // but does NOT free the memory because it's in the call arena. 132 void Unref(); 133 void Unref(const DebugLocation& location, const char* reason); 134 135 private: 136 // Allow RefCountedPtr<> to access IncrementRefCount(). 137 template <typename T> 138 friend class RefCountedPtr; 139 140 SubchannelCall(Args args, grpc_error** error); 141 142 // If channelz is enabled, intercepts recv_trailing so that we may check the 143 // status and associate it to a subchannel. 144 void MaybeInterceptRecvTrailingMetadata( 145 grpc_transport_stream_op_batch* batch); 146 147 static void RecvTrailingMetadataReady(void* arg, grpc_error* error); 148 149 // Interface of RefCounted<>. 150 void IncrementRefCount(); 151 void IncrementRefCount(const DebugLocation& location, const char* reason); 152 153 static void Destroy(void* arg, grpc_error* error); 154 155 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 156 grpc_closure* after_call_stack_destroy_ = nullptr; 157 // State needed to support channelz interception of recv trailing metadata. 158 grpc_closure recv_trailing_metadata_ready_; 159 grpc_closure* original_recv_trailing_metadata_ = nullptr; 160 grpc_metadata_batch* recv_trailing_metadata_ = nullptr; 161 grpc_millis deadline_; 162 }; 163 164 // A subchannel that knows how to connect to exactly one target address. It 165 // provides a target for load balancing. 166 // 167 // Note that this is the "real" subchannel implementation, whose API is 168 // different from the SubchannelInterface that is exposed to LB policy 169 // implementations. The client channel provides an adaptor class 170 // (SubchannelWrapper) that "converts" between the two. 171 class Subchannel { 172 public: 173 class ConnectivityStateWatcherInterface 174 : public RefCounted<ConnectivityStateWatcherInterface> { 175 public: 176 struct ConnectivityStateChange { 177 grpc_connectivity_state state; 178 absl::Status status; 179 RefCountedPtr<ConnectedSubchannel> connected_subchannel; 180 }; 181 182 ~ConnectivityStateWatcherInterface() override = default; 183 184 // Will be invoked whenever the subchannel's connectivity state 185 // changes. There will be only one invocation of this method on a 186 // given watcher instance at any given time. 187 // Implementations should call PopConnectivityStateChange to get the next 188 // connectivity state change. 189 virtual void OnConnectivityStateChange() = 0; 190 191 virtual grpc_pollset_set* interested_parties() = 0; 192 193 // Enqueues connectivity state change notifications. 194 // When the state changes to READY, connected_subchannel will 195 // contain a ref to the connected subchannel. When it changes from 196 // READY to some other state, the implementation must release its 197 // ref to the connected subchannel. 198 // TODO(yashkt): This is currently needed to send the state updates in the 199 // right order when asynchronously notifying. This will no longer be 200 // necessary when we have access to EventManager. 201 void PushConnectivityStateChange(ConnectivityStateChange state_change); 202 203 // Dequeues connectivity state change notifications. 204 ConnectivityStateChange PopConnectivityStateChange(); 205 206 private: 207 // Keeps track of the updates that the watcher instance must be notified of. 208 // TODO(yashkt): This is currently needed to send the state updates in the 209 // right order when asynchronously notifying. This will no longer be 210 // necessary when we have access to EventManager. 211 std::deque<ConnectivityStateChange> connectivity_state_queue_; 212 Mutex mu_; // protects the queue 213 }; 214 215 // The ctor and dtor are not intended to use directly. 216 Subchannel(SubchannelKey* key, OrphanablePtr<SubchannelConnector> connector, 217 const grpc_channel_args* args); 218 ~Subchannel(); 219 220 // Creates a subchannel given \a connector and \a args. 221 static Subchannel* Create(OrphanablePtr<SubchannelConnector> connector, 222 const grpc_channel_args* args); 223 224 // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time 225 // is larger than the subchannel's current keepalive time. The updated value 226 // will have an affect when the subchannel creates a new ConnectedSubchannel. 227 void ThrottleKeepaliveTime(int new_keepalive_time); 228 229 // Strong and weak refcounting. 230 Subchannel* Ref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); 231 void Unref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); 232 Subchannel* WeakRef(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); 233 void WeakUnref(GRPC_SUBCHANNEL_REF_EXTRA_ARGS); 234 // Attempts to return a strong ref when only the weak refcount is guaranteed 235 // non-zero. If the strong refcount is zero, does not alter the refcount and 236 // returns null. 237 Subchannel* RefFromWeakRef(); 238 239 // Gets the string representing the subchannel address. 240 // Caller doesn't take ownership. 241 const char* GetTargetAddress(); 242 channel_args()243 const grpc_channel_args* channel_args() const { return args_; } 244 245 channelz::SubchannelNode* channelz_node(); 246 247 // Returns the current connectivity state of the subchannel. 248 // If health_check_service_name is non-null, the returned connectivity 249 // state will be based on the state reported by the backend for that 250 // service name. 251 // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel. 252 grpc_connectivity_state CheckConnectivityState( 253 const absl::optional<std::string>& health_check_service_name, 254 RefCountedPtr<ConnectedSubchannel>* connected_subchannel); 255 256 // Starts watching the subchannel's connectivity state. 257 // The first callback to the watcher will be delivered when the 258 // subchannel's connectivity state becomes a value other than 259 // initial_state, which may happen immediately. 260 // Subsequent callbacks will be delivered as the subchannel's state 261 // changes. 262 // The watcher will be destroyed either when the subchannel is 263 // destroyed or when CancelConnectivityStateWatch() is called. 264 void WatchConnectivityState( 265 grpc_connectivity_state initial_state, 266 const absl::optional<std::string>& health_check_service_name, 267 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 268 269 // Cancels a connectivity state watch. 270 // If the watcher has already been destroyed, this is a no-op. 271 void CancelConnectivityStateWatch( 272 const absl::optional<std::string>& health_check_service_name, 273 ConnectivityStateWatcherInterface* watcher); 274 275 // Attempt to connect to the backend. Has no effect if already connected. 276 void AttemptToConnect(); 277 278 // Resets the connection backoff of the subchannel. 279 // TODO(roth): Move connection backoff out of subchannels and up into LB 280 // policy code (probably by adding a SubchannelGroup between 281 // SubchannelList and SubchannelData), at which point this method can 282 // go away. 283 void ResetBackoff(); 284 285 // Returns a new channel arg encoding the subchannel address as a URI 286 // string. Caller is responsible for freeing the string. 287 static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr); 288 289 // Returns the URI string from the subchannel address arg in \a args. 290 static const char* GetUriFromSubchannelAddressArg( 291 const grpc_channel_args* args); 292 293 // Sets \a addr from the subchannel address arg in \a args. 294 static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args, 295 grpc_resolved_address* addr); 296 297 private: 298 // A linked list of ConnectivityStateWatcherInterfaces that are monitoring 299 // the subchannel's state. 300 class ConnectivityStateWatcherList { 301 public: ~ConnectivityStateWatcherList()302 ~ConnectivityStateWatcherList() { Clear(); } 303 304 void AddWatcherLocked( 305 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 306 void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher); 307 308 // Notifies all watchers in the list about a change to state. 309 void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state, 310 const absl::Status& status); 311 Clear()312 void Clear() { watchers_.clear(); } 313 empty()314 bool empty() const { return watchers_.empty(); } 315 316 private: 317 // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can 318 // be a set instead of a map. 319 std::map<ConnectivityStateWatcherInterface*, 320 RefCountedPtr<ConnectivityStateWatcherInterface>> 321 watchers_; 322 }; 323 324 // A map that tracks ConnectivityStateWatcherInterfaces using a particular 325 // health check service name. 326 // 327 // There is one entry in the map for each health check service name. 328 // Entries exist only as long as there are watchers using the 329 // corresponding service name. 330 // 331 // A health check client is maintained only while the subchannel is in 332 // state READY. 333 class HealthWatcherMap { 334 public: 335 void AddWatcherLocked( 336 Subchannel* subchannel, grpc_connectivity_state initial_state, 337 const std::string& health_check_service_name, 338 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 339 void RemoveWatcherLocked(const std::string& health_check_service_name, 340 ConnectivityStateWatcherInterface* watcher); 341 342 // Notifies the watcher when the subchannel's state changes. 343 void NotifyLocked(grpc_connectivity_state state, 344 const absl::Status& status); 345 346 grpc_connectivity_state CheckConnectivityStateLocked( 347 Subchannel* subchannel, const std::string& health_check_service_name); 348 349 void ShutdownLocked(); 350 351 private: 352 class HealthWatcher; 353 354 std::map<std::string, OrphanablePtr<HealthWatcher>> map_; 355 }; 356 357 class ConnectedSubchannelStateWatcher; 358 359 class AsyncWatcherNotifierLocked; 360 361 // Sets the subchannel's connectivity state to \a state. 362 void SetConnectivityStateLocked(grpc_connectivity_state state, 363 const absl::Status& status); 364 365 // Methods for connection. 366 void MaybeStartConnectingLocked(); 367 static void OnRetryAlarm(void* arg, grpc_error* error); 368 void ContinueConnectingLocked(); 369 static void OnConnectingFinished(void* arg, grpc_error* error); 370 bool PublishTransportLocked(); 371 void Disconnect(); 372 373 gpr_atm RefMutate(gpr_atm delta, 374 int barrier GRPC_SUBCHANNEL_REF_MUTATE_EXTRA_ARGS); 375 376 // The subchannel pool this subchannel is in. 377 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_; 378 // TODO(juanlishen): Consider using args_ as key_ directly. 379 // Subchannel key that identifies this subchannel in the subchannel pool. 380 SubchannelKey* key_; 381 // Channel args. 382 grpc_channel_args* args_; 383 // pollset_set tracking who's interested in a connection being setup. 384 grpc_pollset_set* pollset_set_; 385 // Protects the other members. 386 Mutex mu_; 387 // Refcount 388 // - lower INTERNAL_REF_BITS bits are for internal references: 389 // these do not keep the subchannel open. 390 // - upper remaining bits are for public references: these do 391 // keep the subchannel open 392 gpr_atm ref_pair_; 393 394 // Connection states. 395 OrphanablePtr<SubchannelConnector> connector_; 396 // Set during connection. 397 SubchannelConnector::Result connecting_result_; 398 grpc_closure on_connecting_finished_; 399 // Active connection, or null. 400 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 401 bool connecting_ = false; 402 bool disconnected_ = false; 403 404 // Connectivity state tracking. 405 grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; 406 absl::Status status_; 407 // The list of watchers without a health check service name. 408 ConnectivityStateWatcherList watcher_list_; 409 // The map of watchers with health check service names. 410 HealthWatcherMap health_watcher_map_; 411 412 // Backoff state. 413 BackOff backoff_; 414 grpc_millis next_attempt_deadline_; 415 grpc_millis min_connect_timeout_ms_; 416 bool backoff_begun_ = false; 417 418 // Retry alarm. 419 grpc_timer retry_alarm_; 420 grpc_closure on_retry_alarm_; 421 bool have_retry_alarm_ = false; 422 // reset_backoff() was called while alarm was pending. 423 bool retry_immediately_ = false; 424 // Keepalive time period (-1 for unset) 425 int keepalive_time_ = -1; 426 427 // Channelz tracking. 428 RefCountedPtr<channelz::SubchannelNode> channelz_node_; 429 }; 430 431 } // namespace grpc_core 432 433 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */ 434