• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
19 
20 #include <grpc/event_engine/event_engine.h>
21 #include <grpc/impl/connectivity_state.h>
22 #include <grpc/support/port_platform.h>
23 #include <stddef.h>
24 
25 #include <functional>
26 #include <map>
27 #include <memory>
28 
29 #include "absl/base/thread_annotations.h"
30 #include "absl/status/status.h"
31 #include "src/core/client_channel/connector.h"
32 #include "src/core/client_channel/subchannel_pool_interface.h"
33 #include "src/core/lib/address_utils/sockaddr_utils.h"
34 #include "src/core/lib/channel/channel_args.h"
35 #include "src/core/lib/channel/channel_fwd.h"
36 #include "src/core/lib/iomgr/call_combiner.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/error.h"
39 #include "src/core/lib/iomgr/iomgr_fwd.h"
40 #include "src/core/lib/iomgr/polling_entity.h"
41 #include "src/core/lib/iomgr/resolved_address.h"
42 #include "src/core/lib/promise/arena_promise.h"
43 #include "src/core/lib/resource_quota/arena.h"
44 #include "src/core/lib/slice/slice.h"
45 #include "src/core/lib/transport/connectivity_state.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/transport.h"
48 #include "src/core/util/backoff.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/dual_ref_counted.h"
51 #include "src/core/util/orphanable.h"
52 #include "src/core/util/ref_counted.h"
53 #include "src/core/util/ref_counted_ptr.h"
54 #include "src/core/util/sync.h"
55 #include "src/core/util/time.h"
56 #include "src/core/util/time_precise.h"
57 #include "src/core/util/unique_type_name.h"
58 #include "src/core/util/work_serializer.h"
59 
60 namespace grpc_core {
61 
62 class SubchannelCall;
63 
64 class ConnectedSubchannel : public RefCounted<ConnectedSubchannel> {
65  public:
args()66   const ChannelArgs& args() const { return args_; }
67 
68   virtual void StartWatch(
69       grpc_pollset_set* interested_parties,
70       OrphanablePtr<ConnectivityStateWatcherInterface> watcher) = 0;
71 
72   // Methods for v3 stack.
73   virtual void Ping(absl::AnyInvocable<void(absl::Status)> on_ack) = 0;
74   virtual RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
75       const = 0;
76 
77   // Methods for legacy stack.
78   virtual grpc_channel_stack* channel_stack() const = 0;
79   virtual size_t GetInitialCallSizeEstimate() const = 0;
80   virtual void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) = 0;
81 
82  protected:
83   explicit ConnectedSubchannel(const ChannelArgs& args);
84 
85  private:
86   ChannelArgs args_;
87 };
88 
89 class LegacyConnectedSubchannel;
90 
91 // Implements the interface of RefCounted<>.
92 class SubchannelCall final {
93  public:
94   struct Args {
95     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
96     grpc_polling_entity* pollent;
97     Slice path;
98     gpr_cycle_counter start_time;
99     Timestamp deadline;
100     Arena* arena;
101     CallCombiner* call_combiner;
102   };
103   static RefCountedPtr<SubchannelCall> Create(Args args,
104                                               grpc_error_handle* error);
105 
106   // Continues processing a transport stream op batch.
107   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
108 
109   // Returns the call stack of the subchannel call.
110   grpc_call_stack* GetCallStack();
111 
112   // Sets the 'then_schedule_closure' argument for call stack destruction.
113   // Must be called once per call.
114   void SetAfterCallStackDestroy(grpc_closure* closure);
115 
116   // Interface of RefCounted<>.
117   GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref();
118   GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref(
119       const DebugLocation& location, const char* reason);
120   // When refcount drops to 0, destroys itself and the associated call stack,
121   // but does NOT free the memory because it's in the call arena.
122   void Unref();
123   void Unref(const DebugLocation& location, const char* reason);
124 
125  private:
126   // Allow RefCountedPtr<> to access IncrementRefCount().
127   template <typename T>
128   friend class RefCountedPtr;
129 
130   SubchannelCall(Args args, grpc_error_handle* error);
131 
132   // If channelz is enabled, intercepts recv_trailing so that we may check the
133   // status and associate it to a subchannel.
134   void MaybeInterceptRecvTrailingMetadata(
135       grpc_transport_stream_op_batch* batch);
136 
137   static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
138 
139   // Interface of RefCounted<>.
140   void IncrementRefCount();
141   void IncrementRefCount(const DebugLocation& location, const char* reason);
142 
143   static void Destroy(void* arg, grpc_error_handle error);
144 
145   RefCountedPtr<LegacyConnectedSubchannel> connected_subchannel_;
146   grpc_closure* after_call_stack_destroy_ = nullptr;
147   // State needed to support channelz interception of recv trailing metadata.
148   grpc_closure recv_trailing_metadata_ready_;
149   grpc_closure* original_recv_trailing_metadata_ = nullptr;
150   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
151   Timestamp deadline_;
152 };
153 
154 // A subchannel that knows how to connect to exactly one target address. It
155 // provides a target for load balancing.
156 //
157 // Note that this is the "real" subchannel implementation, whose API is
158 // different from the SubchannelInterface that is exposed to LB policy
159 // implementations.  The client channel provides an adaptor class
160 // (SubchannelWrapper) that "converts" between the two.
161 class Subchannel final : public DualRefCounted<Subchannel> {
162  public:
163   // TODO(roth): Once we remove pollset_set, consider whether this can
164   // just use the normal AsyncConnectivityStateWatcherInterface API.
165   class ConnectivityStateWatcherInterface
166       : public RefCounted<ConnectivityStateWatcherInterface> {
167    public:
168     // Invoked whenever the subchannel's connectivity state changes.
169     // There will be only one invocation of this method on a given watcher
170     // instance at any given time.
171     // A ref to the watcher is passed in here so that the implementation
172     // can unref it in the appropriate synchronization context (e.g.,
173     // inside a WorkSerializer).
174     // TODO(roth): Figure out a cleaner way to guarantee that the ref is
175     // released in the right context.
176     virtual void OnConnectivityStateChange(
177         RefCountedPtr<ConnectivityStateWatcherInterface> self,
178         grpc_connectivity_state state, const absl::Status& status) = 0;
179 
180     virtual grpc_pollset_set* interested_parties() = 0;
181   };
182 
183   // A base class for producers of subchannel-specific data.
184   // Implementations will typically add their own methods as needed.
185   class DataProducerInterface : public DualRefCounted<DataProducerInterface> {
186    public:
187     // A unique identifier for this implementation.
188     // Only one producer may be registered under a given type name on a
189     // given subchannel at any given time.
190     // Note that we use the pointer address instead of the string
191     // contents for uniqueness; all instances for a given implementation
192     // are expected to return the same string *instance*, not just the
193     // same string contents.
194     virtual UniqueTypeName type() const = 0;
195   };
196 
197   // Creates a subchannel.
198   static RefCountedPtr<Subchannel> Create(
199       OrphanablePtr<SubchannelConnector> connector,
200       const grpc_resolved_address& address, const ChannelArgs& args);
201 
202   // The ctor and dtor are not intended to use directly.
203   Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector,
204              const ChannelArgs& args);
205   ~Subchannel() override;
206 
207   // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
208   // is larger than the subchannel's current keepalive time. The updated value
209   // will have an affect when the subchannel creates a new ConnectedSubchannel.
210   void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_);
211 
pollset_set()212   grpc_pollset_set* pollset_set() const { return pollset_set_; }
213 
214   channelz::SubchannelNode* channelz_node();
215 
address()216   std::string address() const {
217     return grpc_sockaddr_to_uri(&key_.address())
218         .value_or("<unknown address type>");
219   }
220 
221   // Starts watching the subchannel's connectivity state.
222   // The first callback to the watcher will be delivered ~immediately.
223   // Subsequent callbacks will be delivered as the subchannel's state
224   // changes.
225   // The watcher will be destroyed either when the subchannel is
226   // destroyed or when CancelConnectivityStateWatch() is called.
227   void WatchConnectivityState(
228       RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
229       ABSL_LOCKS_EXCLUDED(mu_);
230 
231   // Cancels a connectivity state watch.
232   // If the watcher has already been destroyed, this is a no-op.
233   void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
234       ABSL_LOCKS_EXCLUDED(mu_);
235 
connected_subchannel()236   RefCountedPtr<ConnectedSubchannel> connected_subchannel()
237       ABSL_LOCKS_EXCLUDED(mu_) {
238     MutexLock lock(&mu_);
239     return connected_subchannel_;
240   }
241 
call_destination()242   RefCountedPtr<UnstartedCallDestination> call_destination() {
243     MutexLock lock(&mu_);
244     if (connected_subchannel_ == nullptr) return nullptr;
245     return connected_subchannel_->unstarted_call_destination();
246   }
247 
248   // Attempt to connect to the backend.  Has no effect if already connected.
249   void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_);
250 
251   // Resets the connection backoff of the subchannel.
252   void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
253 
254   // Access to data producer map.
255   // We do not hold refs to the data producer; the implementation is
256   // expected to register itself upon construction and remove itself
257   // upon destruction.
258   //
259   // Looks up the current data producer for type and invokes get_or_add()
260   // with a pointer to that producer in the map.  The get_or_add() function
261   // can modify the pointed-to value to update the map.  This provides a
262   // way to either re-use an existing producer or register a new one in
263   // a non-racy way.
264   void GetOrAddDataProducer(
265       UniqueTypeName type,
266       std::function<void(DataProducerInterface**)> get_or_add)
267       ABSL_LOCKS_EXCLUDED(mu_);
268   // Removes the data producer from the map, if the current producer for
269   // this type is the specified producer.
270   void RemoveDataProducer(DataProducerInterface* data_producer)
271       ABSL_LOCKS_EXCLUDED(mu_);
272 
event_engine()273   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine() {
274     return event_engine_;
275   }
276 
277   // Exposed for testing purposes only.
278   static ChannelArgs MakeSubchannelArgs(
279       const ChannelArgs& channel_args, const ChannelArgs& address_args,
280       const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
281       const std::string& channel_default_authority);
282 
283  private:
284   // Tears down any existing connection, and arranges for destruction
285   void Orphaned() override ABSL_LOCKS_EXCLUDED(mu_);
286 
287   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
288   // the subchannel's state.
289   class ConnectivityStateWatcherList final {
290    public:
ConnectivityStateWatcherList(Subchannel * subchannel)291     explicit ConnectivityStateWatcherList(Subchannel* subchannel)
292         : subchannel_(subchannel) {}
293 
~ConnectivityStateWatcherList()294     ~ConnectivityStateWatcherList() { Clear(); }
295 
296     void AddWatcherLocked(
297         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
298     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
299 
300     // Notifies all watchers in the list about a change to state.
301     void NotifyLocked(grpc_connectivity_state state,
302                       const absl::Status& status);
303 
Clear()304     void Clear() { watchers_.clear(); }
305 
empty()306     bool empty() const { return watchers_.empty(); }
307 
308    private:
309     Subchannel* subchannel_;
310     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
311     // be a set instead of a map.
312     std::map<ConnectivityStateWatcherInterface*,
313              RefCountedPtr<ConnectivityStateWatcherInterface>>
314         watchers_;
315   };
316 
317   class ConnectedSubchannelStateWatcher;
318 
319   // Sets the subchannel's connectivity state to \a state.
320   void SetConnectivityStateLocked(grpc_connectivity_state state,
321                                   const absl::Status& status)
322       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
323 
324   // Methods for connection.
325   void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
326   void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
327   void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
328   static void OnConnectingFinished(void* arg, grpc_error_handle error)
329       ABSL_LOCKS_EXCLUDED(mu_);
330   void OnConnectingFinishedLocked(grpc_error_handle error)
331       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
332   bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
333 
334   // The subchannel pool this subchannel is in.
335   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
336   // Subchannel key that identifies this subchannel in the subchannel pool.
337   const SubchannelKey key_;
338   // Actual address to connect to.  May be different than the address in
339   // key_ if overridden by proxy mapper.
340   grpc_resolved_address address_for_connect_;
341   // Channel args.
342   ChannelArgs args_;
343   // pollset_set tracking who's interested in a connection being setup.
344   grpc_pollset_set* pollset_set_;
345   // Channelz tracking.
346   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
347   // Minimum connection timeout.
348   Duration min_connect_timeout_;
349 
350   // Connection state.
351   OrphanablePtr<SubchannelConnector> connector_;
352   SubchannelConnector::Result connecting_result_;
353   grpc_closure on_connecting_finished_;
354 
355   // Protects the other members.
356   Mutex mu_;
357 
358   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
359 
360   // Connectivity state tracking.
361   // Note that the connectivity state implies the state of the
362   // Subchannel object:
363   // - IDLE: no retry timer pending, can start a connection attempt at any time
364   // - CONNECTING: connection attempt in progress
365   // - READY: connection attempt succeeded, connected_subchannel_ created
366   // - TRANSIENT_FAILURE: connection attempt failed, retry timer pending
367   grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
368   absl::Status status_ ABSL_GUARDED_BY(mu_);
369   // The list of connectivity state watchers.
370   ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
371   // Used for sending connectivity state notifications.
372   WorkSerializer work_serializer_;
373 
374   // Active connection, or null.
375   RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
376 
377   // Backoff state.
378   BackOff backoff_ ABSL_GUARDED_BY(mu_);
379   Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
380   grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_
381       ABSL_GUARDED_BY(mu_);
382 
383   // Keepalive time period (-1 for unset)
384   int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
385 
386   // Data producer map.
387   std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_
388       ABSL_GUARDED_BY(mu_);
389   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
390 };
391 
392 }  // namespace grpc_core
393 
394 #endif  // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
395