• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <deque>
25 
26 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
27 #include "src/core/ext/filters/client_channel/connector.h"
28 #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h"
29 #include "src/core/lib/backoff/backoff.h"
30 #include "src/core/lib/channel/channel_stack.h"
31 #include "src/core/lib/gpr/time_precise.h"
32 #include "src/core/lib/gprpp/arena.h"
33 #include "src/core/lib/gprpp/dual_ref_counted.h"
34 #include "src/core/lib/gprpp/ref_counted.h"
35 #include "src/core/lib/gprpp/ref_counted_ptr.h"
36 #include "src/core/lib/gprpp/sync.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/iomgr/timer.h"
39 #include "src/core/lib/transport/connectivity_state.h"
40 #include "src/core/lib/transport/metadata.h"
41 
42 // Channel arg containing a URI indicating the address to connect to.
43 #define GRPC_ARG_SUBCHANNEL_ADDRESS "grpc.subchannel_address"
44 
45 namespace grpc_core {
46 
47 class SubchannelCall;
48 
49 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
50  public:
51   ConnectedSubchannel(
52       grpc_channel_stack* channel_stack, const grpc_channel_args* args,
53       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
54   ~ConnectedSubchannel() override;
55 
56   void StartWatch(grpc_pollset_set* interested_parties,
57                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
58 
59   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
60 
channel_stack()61   grpc_channel_stack* channel_stack() const { return channel_stack_; }
args()62   const grpc_channel_args* args() const { return args_; }
channelz_subchannel()63   channelz::SubchannelNode* channelz_subchannel() const {
64     return channelz_subchannel_.get();
65   }
66 
67   size_t GetInitialCallSizeEstimate() const;
68 
69  private:
70   grpc_channel_stack* channel_stack_;
71   grpc_channel_args* args_;
72   // ref counted pointer to the channelz node in this connected subchannel's
73   // owning subchannel.
74   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
75 };
76 
77 // Implements the interface of RefCounted<>.
78 class SubchannelCall {
79  public:
80   struct Args {
81     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
82     grpc_polling_entity* pollent;
83     grpc_slice path;
84     gpr_cycle_counter start_time;
85     grpc_millis deadline;
86     Arena* arena;
87     grpc_call_context_element* context;
88     CallCombiner* call_combiner;
89   };
90   static RefCountedPtr<SubchannelCall> Create(Args args,
91                                               grpc_error_handle* error);
92 
93   // Continues processing a transport stream op batch.
94   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
95 
96   // Returns the call stack of the subchannel call.
97   grpc_call_stack* GetCallStack();
98 
99   // Sets the 'then_schedule_closure' argument for call stack destruction.
100   // Must be called once per call.
101   void SetAfterCallStackDestroy(grpc_closure* closure);
102 
103   // Interface of RefCounted<>.
104   RefCountedPtr<SubchannelCall> Ref() GRPC_MUST_USE_RESULT;
105   RefCountedPtr<SubchannelCall> Ref(const DebugLocation& location,
106                                     const char* reason) GRPC_MUST_USE_RESULT;
107   // When refcount drops to 0, destroys itself and the associated call stack,
108   // but does NOT free the memory because it's in the call arena.
109   void Unref();
110   void Unref(const DebugLocation& location, const char* reason);
111 
112  private:
113   // Allow RefCountedPtr<> to access IncrementRefCount().
114   template <typename T>
115   friend class RefCountedPtr;
116 
117   SubchannelCall(Args args, grpc_error_handle* error);
118 
119   // If channelz is enabled, intercepts recv_trailing so that we may check the
120   // status and associate it to a subchannel.
121   void MaybeInterceptRecvTrailingMetadata(
122       grpc_transport_stream_op_batch* batch);
123 
124   static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
125 
126   // Interface of RefCounted<>.
127   void IncrementRefCount();
128   void IncrementRefCount(const DebugLocation& location, const char* reason);
129 
130   static void Destroy(void* arg, grpc_error_handle error);
131 
132   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
133   grpc_closure* after_call_stack_destroy_ = nullptr;
134   // State needed to support channelz interception of recv trailing metadata.
135   grpc_closure recv_trailing_metadata_ready_;
136   grpc_closure* original_recv_trailing_metadata_ = nullptr;
137   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
138   grpc_millis deadline_;
139 };
140 
141 // A subchannel that knows how to connect to exactly one target address. It
142 // provides a target for load balancing.
143 //
144 // Note that this is the "real" subchannel implementation, whose API is
145 // different from the SubchannelInterface that is exposed to LB policy
146 // implementations.  The client channel provides an adaptor class
147 // (SubchannelWrapper) that "converts" between the two.
148 class Subchannel : public DualRefCounted<Subchannel> {
149  public:
150   class ConnectivityStateWatcherInterface
151       : public RefCounted<ConnectivityStateWatcherInterface> {
152    public:
153     struct ConnectivityStateChange {
154       grpc_connectivity_state state;
155       absl::Status status;
156       RefCountedPtr<ConnectedSubchannel> connected_subchannel;
157     };
158 
159     ~ConnectivityStateWatcherInterface() override = default;
160 
161     // Will be invoked whenever the subchannel's connectivity state
162     // changes.  There will be only one invocation of this method on a
163     // given watcher instance at any given time.
164     // Implementations should call PopConnectivityStateChange to get the next
165     // connectivity state change.
166     virtual void OnConnectivityStateChange() = 0;
167 
168     virtual grpc_pollset_set* interested_parties() = 0;
169 
170     // Enqueues connectivity state change notifications.
171     // When the state changes to READY, connected_subchannel will
172     // contain a ref to the connected subchannel.  When it changes from
173     // READY to some other state, the implementation must release its
174     // ref to the connected subchannel.
175     // TODO(yashkt): This is currently needed to send the state updates in the
176     // right order when asynchronously notifying. This will no longer be
177     // necessary when we have access to EventManager.
178     void PushConnectivityStateChange(ConnectivityStateChange state_change);
179 
180     // Dequeues connectivity state change notifications.
181     ConnectivityStateChange PopConnectivityStateChange();
182 
183    private:
184     Mutex mu_;  // protects the queue
185     // Keeps track of the updates that the watcher instance must be notified of.
186     // TODO(yashkt): This is currently needed to send the state updates in the
187     // right order when asynchronously notifying. This will no longer be
188     // necessary when we have access to EventManager.
189     std::deque<ConnectivityStateChange> connectivity_state_queue_
190         ABSL_GUARDED_BY(&mu_);
191   };
192 
193   // Creates a subchannel given \a connector and \a args.
194   static RefCountedPtr<Subchannel> Create(
195       OrphanablePtr<SubchannelConnector> connector,
196       const grpc_channel_args* args);
197 
198   // The ctor and dtor are not intended to use directly.
199   Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector,
200              const grpc_channel_args* args);
201   ~Subchannel() override;
202 
203   // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
204   // is larger than the subchannel's current keepalive time. The updated value
205   // will have an affect when the subchannel creates a new ConnectedSubchannel.
206   void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_);
207 
208   // Gets the string representing the subchannel address.
209   // Caller doesn't take ownership.
210   const char* GetTargetAddress();
211 
channel_args()212   const grpc_channel_args* channel_args() const { return args_; }
213 
214   channelz::SubchannelNode* channelz_node();
215 
216   // Returns the current connectivity state of the subchannel.
217   // If health_check_service_name is non-null, the returned connectivity
218   // state will be based on the state reported by the backend for that
219   // service name.
220   // If the return value is GRPC_CHANNEL_READY, also sets *connected_subchannel.
221   grpc_connectivity_state CheckConnectivityState(
222       const absl::optional<std::string>& health_check_service_name,
223       RefCountedPtr<ConnectedSubchannel>* connected_subchannel)
224       ABSL_LOCKS_EXCLUDED(mu_);
225 
226   // Starts watching the subchannel's connectivity state.
227   // The first callback to the watcher will be delivered when the
228   // subchannel's connectivity state becomes a value other than
229   // initial_state, which may happen immediately.
230   // Subsequent callbacks will be delivered as the subchannel's state
231   // changes.
232   // The watcher will be destroyed either when the subchannel is
233   // destroyed or when CancelConnectivityStateWatch() is called.
234   void WatchConnectivityState(
235       grpc_connectivity_state initial_state,
236       const absl::optional<std::string>& health_check_service_name,
237       RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
238       ABSL_LOCKS_EXCLUDED(mu_);
239 
240   // Cancels a connectivity state watch.
241   // If the watcher has already been destroyed, this is a no-op.
242   void CancelConnectivityStateWatch(
243       const absl::optional<std::string>& health_check_service_name,
244       ConnectivityStateWatcherInterface* watcher) ABSL_LOCKS_EXCLUDED(mu_);
245 
246   // Attempt to connect to the backend.  Has no effect if already connected.
247   void AttemptToConnect() ABSL_LOCKS_EXCLUDED(mu_);
248 
249   // Resets the connection backoff of the subchannel.
250   // TODO(roth): Move connection backoff out of subchannels and up into LB
251   // policy code (probably by adding a SubchannelGroup between
252   // SubchannelList and SubchannelData), at which point this method can
253   // go away.
254   void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
255 
256   // Tears down any existing connection, and arranges for destruction
257   void Orphan() override ABSL_LOCKS_EXCLUDED(mu_);
258 
259   // Returns a new channel arg encoding the subchannel address as a URI
260   // string. Caller is responsible for freeing the string.
261   static grpc_arg CreateSubchannelAddressArg(const grpc_resolved_address* addr);
262 
263   // Returns the URI string from the subchannel address arg in \a args.
264   static const char* GetUriFromSubchannelAddressArg(
265       const grpc_channel_args* args);
266 
267   // Sets \a addr from the subchannel address arg in \a args.
268   static void GetAddressFromSubchannelAddressArg(const grpc_channel_args* args,
269                                                  grpc_resolved_address* addr);
270 
271  private:
272   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
273   // the subchannel's state.
274   class ConnectivityStateWatcherList {
275    public:
~ConnectivityStateWatcherList()276     ~ConnectivityStateWatcherList() { Clear(); }
277 
278     void AddWatcherLocked(
279         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
280     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
281 
282     // Notifies all watchers in the list about a change to state.
283     void NotifyLocked(Subchannel* subchannel, grpc_connectivity_state state,
284                       const absl::Status& status);
285 
Clear()286     void Clear() { watchers_.clear(); }
287 
empty()288     bool empty() const { return watchers_.empty(); }
289 
290    private:
291     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
292     // be a set instead of a map.
293     std::map<ConnectivityStateWatcherInterface*,
294              RefCountedPtr<ConnectivityStateWatcherInterface>>
295         watchers_;
296   };
297 
298   // A map that tracks ConnectivityStateWatcherInterfaces using a particular
299   // health check service name.
300   //
301   // There is one entry in the map for each health check service name.
302   // Entries exist only as long as there are watchers using the
303   // corresponding service name.
304   //
305   // A health check client is maintained only while the subchannel is in
306   // state READY.
307   class HealthWatcherMap {
308    public:
309     void AddWatcherLocked(
310         WeakRefCountedPtr<Subchannel> subchannel,
311         grpc_connectivity_state initial_state,
312         const std::string& health_check_service_name,
313         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
314     void RemoveWatcherLocked(const std::string& health_check_service_name,
315                              ConnectivityStateWatcherInterface* watcher);
316 
317     // Notifies the watcher when the subchannel's state changes.
318     void NotifyLocked(grpc_connectivity_state state, const absl::Status& status)
319         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
320 
321     grpc_connectivity_state CheckConnectivityStateLocked(
322         Subchannel* subchannel, const std::string& health_check_service_name)
323         ABSL_EXCLUSIVE_LOCKS_REQUIRED(&Subchannel::mu_);
324 
325     void ShutdownLocked();
326 
327    private:
328     class HealthWatcher;
329 
330     std::map<std::string, OrphanablePtr<HealthWatcher>> map_;
331   };
332 
333   class ConnectedSubchannelStateWatcher;
334 
335   class AsyncWatcherNotifierLocked;
336 
337   // Sets the subchannel's connectivity state to \a state.
338   void SetConnectivityStateLocked(grpc_connectivity_state state,
339                                   const absl::Status& status)
340       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
341 
342   // Methods for connection.
343   void MaybeStartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
344   static void OnRetryAlarm(void* arg, grpc_error_handle error)
345       ABSL_LOCKS_EXCLUDED(mu_);
346   void ContinueConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
347   static void OnConnectingFinished(void* arg, grpc_error_handle error)
348       ABSL_LOCKS_EXCLUDED(mu_);
349   bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
350 
351   // The subchannel pool this subchannel is in.
352   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
353   // TODO(juanlishen): Consider using args_ as key_ directly.
354   // Subchannel key that identifies this subchannel in the subchannel pool.
355   const SubchannelKey key_;
356   // Channel args.
357   grpc_channel_args* args_;
358   // pollset_set tracking who's interested in a connection being setup.
359   grpc_pollset_set* pollset_set_;
360   // Channelz tracking.
361   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
362 
363   // Connection state.
364   OrphanablePtr<SubchannelConnector> connector_;
365   SubchannelConnector::Result connecting_result_;
366   grpc_closure on_connecting_finished_;
367 
368   // Protects the other members.
369   Mutex mu_;
370 
371   // Active connection, or null.
372   RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
373   bool connecting_ ABSL_GUARDED_BY(mu_) = false;
374   bool disconnected_ ABSL_GUARDED_BY(mu_) = false;
375 
376   // Connectivity state tracking.
377   grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
378   absl::Status status_ ABSL_GUARDED_BY(mu_);
379   // The list of watchers without a health check service name.
380   ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
381   // The map of watchers with health check service names.
382   HealthWatcherMap health_watcher_map_ ABSL_GUARDED_BY(mu_);
383 
384   // Backoff state.
385   BackOff backoff_ ABSL_GUARDED_BY(mu_);
386   grpc_millis next_attempt_deadline_ ABSL_GUARDED_BY(mu_);
387   grpc_millis min_connect_timeout_ms_ ABSL_GUARDED_BY(mu_);
388   bool backoff_begun_ ABSL_GUARDED_BY(mu_) = false;
389 
390   // Retry alarm.
391   grpc_timer retry_alarm_ ABSL_GUARDED_BY(mu_);
392   grpc_closure on_retry_alarm_ ABSL_GUARDED_BY(mu_);
393   bool have_retry_alarm_ ABSL_GUARDED_BY(mu_) = false;
394   // reset_backoff() was called while alarm was pending.
395   bool retry_immediately_ ABSL_GUARDED_BY(mu_) = false;
396   // Keepalive time period (-1 for unset)
397   int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
398 };
399 
400 }  // namespace grpc_core
401 
402 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_H */
403