• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
18 #define GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
19 
20 #include <grpc/support/port_platform.h>
21 
22 #include <stddef.h>
23 
24 #include <functional>
25 #include <map>
26 #include <memory>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 
31 #include <grpc/event_engine/event_engine.h>
32 #include <grpc/impl/connectivity_state.h>
33 
34 #include "src/core/client_channel/client_channel_channelz.h"
35 #include "src/core/client_channel/connector.h"
36 #include "src/core/client_channel/subchannel_pool_interface.h"
37 #include "src/core/lib/backoff/backoff.h"
38 #include "src/core/lib/channel/channel_args.h"
39 #include "src/core/lib/channel/channel_fwd.h"
40 #include "src/core/lib/channel/context.h"
41 #include "src/core/lib/gpr/time_precise.h"
42 #include "src/core/lib/gprpp/debug_location.h"
43 #include "src/core/lib/gprpp/dual_ref_counted.h"
44 #include "src/core/lib/gprpp/orphanable.h"
45 #include "src/core/lib/gprpp/ref_counted.h"
46 #include "src/core/lib/gprpp/ref_counted_ptr.h"
47 #include "src/core/lib/gprpp/sync.h"
48 #include "src/core/lib/gprpp/time.h"
49 #include "src/core/lib/gprpp/unique_type_name.h"
50 #include "src/core/lib/gprpp/work_serializer.h"
51 #include "src/core/lib/iomgr/call_combiner.h"
52 #include "src/core/lib/iomgr/closure.h"
53 #include "src/core/lib/iomgr/error.h"
54 #include "src/core/lib/iomgr/iomgr_fwd.h"
55 #include "src/core/lib/iomgr/polling_entity.h"
56 #include "src/core/lib/iomgr/resolved_address.h"
57 #include "src/core/lib/promise/arena_promise.h"
58 #include "src/core/lib/resource_quota/arena.h"
59 #include "src/core/lib/slice/slice.h"
60 #include "src/core/lib/transport/connectivity_state.h"
61 #include "src/core/lib/transport/metadata_batch.h"
62 #include "src/core/lib/transport/transport.h"
63 
64 namespace grpc_core {
65 
66 class SubchannelCall;
67 
68 class ConnectedSubchannel final : public RefCounted<ConnectedSubchannel> {
69  public:
70   ConnectedSubchannel(
71       grpc_channel_stack* channel_stack, const ChannelArgs& args,
72       RefCountedPtr<channelz::SubchannelNode> channelz_subchannel);
73   ~ConnectedSubchannel() override;
74 
75   void StartWatch(grpc_pollset_set* interested_parties,
76                   OrphanablePtr<ConnectivityStateWatcherInterface> watcher);
77 
78   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack);
79 
channel_stack()80   grpc_channel_stack* channel_stack() const { return channel_stack_; }
args()81   const ChannelArgs& args() const { return args_; }
channelz_subchannel()82   channelz::SubchannelNode* channelz_subchannel() const {
83     return channelz_subchannel_.get();
84   }
85 
86   size_t GetInitialCallSizeEstimate() const;
87 
88   ArenaPromise<ServerMetadataHandle> MakeCallPromise(CallArgs call_args);
89 
90  private:
91   grpc_channel_stack* channel_stack_;
92   ChannelArgs args_;
93   // ref counted pointer to the channelz node in this connected subchannel's
94   // owning subchannel.
95   RefCountedPtr<channelz::SubchannelNode> channelz_subchannel_;
96 };
97 
98 // Implements the interface of RefCounted<>.
99 class SubchannelCall final {
100  public:
101   struct Args {
102     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
103     grpc_polling_entity* pollent;
104     Slice path;
105     gpr_cycle_counter start_time;
106     Timestamp deadline;
107     Arena* arena;
108     grpc_call_context_element* context;
109     CallCombiner* call_combiner;
110   };
111   static RefCountedPtr<SubchannelCall> Create(Args args,
112                                               grpc_error_handle* error);
113 
114   // Continues processing a transport stream op batch.
115   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
116 
117   // Returns the call stack of the subchannel call.
118   grpc_call_stack* GetCallStack();
119 
120   // Sets the 'then_schedule_closure' argument for call stack destruction.
121   // Must be called once per call.
122   void SetAfterCallStackDestroy(grpc_closure* closure);
123 
124   // Interface of RefCounted<>.
125   GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref();
126   GRPC_MUST_USE_RESULT RefCountedPtr<SubchannelCall> Ref(
127       const DebugLocation& location, const char* reason);
128   // When refcount drops to 0, destroys itself and the associated call stack,
129   // but does NOT free the memory because it's in the call arena.
130   void Unref();
131   void Unref(const DebugLocation& location, const char* reason);
132 
133  private:
134   // Allow RefCountedPtr<> to access IncrementRefCount().
135   template <typename T>
136   friend class RefCountedPtr;
137 
138   SubchannelCall(Args args, grpc_error_handle* error);
139 
140   // If channelz is enabled, intercepts recv_trailing so that we may check the
141   // status and associate it to a subchannel.
142   void MaybeInterceptRecvTrailingMetadata(
143       grpc_transport_stream_op_batch* batch);
144 
145   static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
146 
147   // Interface of RefCounted<>.
148   void IncrementRefCount();
149   void IncrementRefCount(const DebugLocation& location, const char* reason);
150 
151   static void Destroy(void* arg, grpc_error_handle error);
152 
153   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
154   grpc_closure* after_call_stack_destroy_ = nullptr;
155   // State needed to support channelz interception of recv trailing metadata.
156   grpc_closure recv_trailing_metadata_ready_;
157   grpc_closure* original_recv_trailing_metadata_ = nullptr;
158   grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
159   Timestamp deadline_;
160 };
161 
162 // A subchannel that knows how to connect to exactly one target address. It
163 // provides a target for load balancing.
164 //
165 // Note that this is the "real" subchannel implementation, whose API is
166 // different from the SubchannelInterface that is exposed to LB policy
167 // implementations.  The client channel provides an adaptor class
168 // (SubchannelWrapper) that "converts" between the two.
169 class Subchannel final : public DualRefCounted<Subchannel> {
170  public:
171   // TODO(roth): Once we remove pollset_set, consider whether this can
172   // just use the normal AsyncConnectivityStateWatcherInterface API.
173   class ConnectivityStateWatcherInterface
174       : public RefCounted<ConnectivityStateWatcherInterface> {
175    public:
176     // Invoked whenever the subchannel's connectivity state changes.
177     // There will be only one invocation of this method on a given watcher
178     // instance at any given time.
179     // A ref to the watcher is passed in here so that the implementation
180     // can unref it in the appropriate synchronization context (e.g.,
181     // inside a WorkSerializer).
182     // TODO(roth): Figure out a cleaner way to guarantee that the ref is
183     // released in the right context.
184     virtual void OnConnectivityStateChange(
185         RefCountedPtr<ConnectivityStateWatcherInterface> self,
186         grpc_connectivity_state state, const absl::Status& status) = 0;
187 
188     virtual grpc_pollset_set* interested_parties() = 0;
189   };
190 
191   // A base class for producers of subchannel-specific data.
192   // Implementations will typically add their own methods as needed.
193   class DataProducerInterface : public DualRefCounted<DataProducerInterface> {
194    public:
195     // A unique identifier for this implementation.
196     // Only one producer may be registered under a given type name on a
197     // given subchannel at any given time.
198     // Note that we use the pointer address instead of the string
199     // contents for uniqueness; all instances for a given implementation
200     // are expected to return the same string *instance*, not just the
201     // same string contents.
202     virtual UniqueTypeName type() const = 0;
203   };
204 
205   // Creates a subchannel.
206   static RefCountedPtr<Subchannel> Create(
207       OrphanablePtr<SubchannelConnector> connector,
208       const grpc_resolved_address& address, const ChannelArgs& args);
209 
210   // The ctor and dtor are not intended to use directly.
211   Subchannel(SubchannelKey key, OrphanablePtr<SubchannelConnector> connector,
212              const ChannelArgs& args);
213   ~Subchannel() override;
214 
215   // Throttles keepalive time to \a new_keepalive_time iff \a new_keepalive_time
216   // is larger than the subchannel's current keepalive time. The updated value
217   // will have an affect when the subchannel creates a new ConnectedSubchannel.
218   void ThrottleKeepaliveTime(int new_keepalive_time) ABSL_LOCKS_EXCLUDED(mu_);
219 
pollset_set()220   grpc_pollset_set* pollset_set() const { return pollset_set_; }
221 
222   channelz::SubchannelNode* channelz_node();
223 
address()224   const grpc_resolved_address& address() const { return key_.address(); }
225 
226   // Starts watching the subchannel's connectivity state.
227   // The first callback to the watcher will be delivered ~immediately.
228   // Subsequent callbacks will be delivered as the subchannel's state
229   // changes.
230   // The watcher will be destroyed either when the subchannel is
231   // destroyed or when CancelConnectivityStateWatch() is called.
232   void WatchConnectivityState(
233       RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
234       ABSL_LOCKS_EXCLUDED(mu_);
235 
236   // Cancels a connectivity state watch.
237   // If the watcher has already been destroyed, this is a no-op.
238   void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
239       ABSL_LOCKS_EXCLUDED(mu_);
240 
connected_subchannel()241   RefCountedPtr<ConnectedSubchannel> connected_subchannel()
242       ABSL_LOCKS_EXCLUDED(mu_) {
243     MutexLock lock(&mu_);
244     return connected_subchannel_;
245   }
246 
247   // Attempt to connect to the backend.  Has no effect if already connected.
248   void RequestConnection() ABSL_LOCKS_EXCLUDED(mu_);
249 
250   // Resets the connection backoff of the subchannel.
251   void ResetBackoff() ABSL_LOCKS_EXCLUDED(mu_);
252 
253   // Access to data producer map.
254   // We do not hold refs to the data producer; the implementation is
255   // expected to register itself upon construction and remove itself
256   // upon destruction.
257   //
258   // Looks up the current data producer for type and invokes get_or_add()
259   // with a pointer to that producer in the map.  The get_or_add() function
260   // can modify the pointed-to value to update the map.  This provides a
261   // way to either re-use an existing producer or register a new one in
262   // a non-racy way.
263   void GetOrAddDataProducer(
264       UniqueTypeName type,
265       std::function<void(DataProducerInterface**)> get_or_add)
266       ABSL_LOCKS_EXCLUDED(mu_);
267   // Removes the data producer from the map, if the current producer for
268   // this type is the specified producer.
269   void RemoveDataProducer(DataProducerInterface* data_producer)
270       ABSL_LOCKS_EXCLUDED(mu_);
271 
event_engine()272   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine() {
273     return event_engine_;
274   }
275 
276  private:
277   // Tears down any existing connection, and arranges for destruction
278   void Orphaned() override ABSL_LOCKS_EXCLUDED(mu_);
279 
280   // A linked list of ConnectivityStateWatcherInterfaces that are monitoring
281   // the subchannel's state.
282   class ConnectivityStateWatcherList final {
283    public:
ConnectivityStateWatcherList(Subchannel * subchannel)284     explicit ConnectivityStateWatcherList(Subchannel* subchannel)
285         : subchannel_(subchannel) {}
286 
~ConnectivityStateWatcherList()287     ~ConnectivityStateWatcherList() { Clear(); }
288 
289     void AddWatcherLocked(
290         RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
291     void RemoveWatcherLocked(ConnectivityStateWatcherInterface* watcher);
292 
293     // Notifies all watchers in the list about a change to state.
294     void NotifyLocked(grpc_connectivity_state state,
295                       const absl::Status& status);
296 
Clear()297     void Clear() { watchers_.clear(); }
298 
empty()299     bool empty() const { return watchers_.empty(); }
300 
301    private:
302     Subchannel* subchannel_;
303     // TODO(roth): Once we can use C++-14 heterogeneous lookups, this can
304     // be a set instead of a map.
305     std::map<ConnectivityStateWatcherInterface*,
306              RefCountedPtr<ConnectivityStateWatcherInterface>>
307         watchers_;
308   };
309 
310   class ConnectedSubchannelStateWatcher;
311 
312   // Sets the subchannel's connectivity state to \a state.
313   void SetConnectivityStateLocked(grpc_connectivity_state state,
314                                   const absl::Status& status)
315       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
316 
317   // Methods for connection.
318   void OnRetryTimer() ABSL_LOCKS_EXCLUDED(mu_);
319   void OnRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
320   void StartConnectingLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
321   static void OnConnectingFinished(void* arg, grpc_error_handle error)
322       ABSL_LOCKS_EXCLUDED(mu_);
323   void OnConnectingFinishedLocked(grpc_error_handle error)
324       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
325   bool PublishTransportLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
326 
327   // The subchannel pool this subchannel is in.
328   RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
329   // Subchannel key that identifies this subchannel in the subchannel pool.
330   const SubchannelKey key_;
331   // Actual address to connect to.  May be different than the address in
332   // key_ if overridden by proxy mapper.
333   grpc_resolved_address address_for_connect_;
334   // Channel args.
335   ChannelArgs args_;
336   // pollset_set tracking who's interested in a connection being setup.
337   grpc_pollset_set* pollset_set_;
338   // Channelz tracking.
339   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
340   // Minimum connection timeout.
341   Duration min_connect_timeout_;
342 
343   // Connection state.
344   OrphanablePtr<SubchannelConnector> connector_;
345   SubchannelConnector::Result connecting_result_;
346   grpc_closure on_connecting_finished_;
347 
348   // Protects the other members.
349   Mutex mu_;
350 
351   bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
352 
353   // Connectivity state tracking.
354   // Note that the connectivity state implies the state of the
355   // Subchannel object:
356   // - IDLE: no retry timer pending, can start a connection attempt at any time
357   // - CONNECTING: connection attempt in progress
358   // - READY: connection attempt succeeded, connected_subchannel_ created
359   // - TRANSIENT_FAILURE: connection attempt failed, retry timer pending
360   grpc_connectivity_state state_ ABSL_GUARDED_BY(mu_) = GRPC_CHANNEL_IDLE;
361   absl::Status status_ ABSL_GUARDED_BY(mu_);
362   // The list of connectivity state watchers.
363   ConnectivityStateWatcherList watcher_list_ ABSL_GUARDED_BY(mu_);
364   // Used for sending connectivity state notifications.
365   WorkSerializer work_serializer_;
366 
367   // Active connection, or null.
368   RefCountedPtr<ConnectedSubchannel> connected_subchannel_ ABSL_GUARDED_BY(mu_);
369 
370   // Backoff state.
371   BackOff backoff_ ABSL_GUARDED_BY(mu_);
372   Timestamp next_attempt_time_ ABSL_GUARDED_BY(mu_);
373   grpc_event_engine::experimental::EventEngine::TaskHandle retry_timer_handle_
374       ABSL_GUARDED_BY(mu_);
375 
376   // Keepalive time period (-1 for unset)
377   int keepalive_time_ ABSL_GUARDED_BY(mu_) = -1;
378 
379   // Data producer map.
380   std::map<UniqueTypeName, DataProducerInterface*> data_producer_map_
381       ABSL_GUARDED_BY(mu_);
382   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
383 };
384 
385 }  // namespace grpc_core
386 
387 #endif  // GRPC_SRC_CORE_CLIENT_CHANNEL_SUBCHANNEL_H
388