• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/client_channel/subchannel.h"
18 
19 #include <grpc/impl/channel_arg_names.h>
20 #include <grpc/slice.h>
21 #include <grpc/status.h>
22 #include <grpc/support/port_platform.h>
23 #include <inttypes.h>
24 #include <limits.h>
25 
26 #include <algorithm>
27 #include <memory>
28 #include <new>
29 #include <utility>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/cord.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/channelz/channel_trace.h"
39 #include "src/core/channelz/channelz.h"
40 #include "src/core/client_channel/client_channel_internal.h"
41 #include "src/core/client_channel/subchannel_pool_interface.h"
42 #include "src/core/config/core_configuration.h"
43 #include "src/core/handshaker/proxy_mapper_registry.h"
44 #include "src/core/lib/address_utils/sockaddr_utils.h"
45 #include "src/core/lib/channel/channel_args.h"
46 #include "src/core/lib/channel/channel_stack.h"
47 #include "src/core/lib/channel/channel_stack_builder_impl.h"
48 #include "src/core/lib/debug/trace.h"
49 #include "src/core/lib/experiments/experiments.h"
50 #include "src/core/lib/iomgr/exec_ctx.h"
51 #include "src/core/lib/iomgr/pollset_set.h"
52 #include "src/core/lib/promise/cancel_callback.h"
53 #include "src/core/lib/promise/seq.h"
54 #include "src/core/lib/slice/slice_internal.h"
55 #include "src/core/lib/surface/channel_init.h"
56 #include "src/core/lib/surface/channel_stack_type.h"
57 #include "src/core/lib/surface/init_internally.h"
58 #include "src/core/lib/transport/connectivity_state.h"
59 #include "src/core/lib/transport/error_utils.h"
60 #include "src/core/lib/transport/interception_chain.h"
61 #include "src/core/lib/transport/transport.h"
62 #include "src/core/telemetry/stats.h"
63 #include "src/core/telemetry/stats_data.h"
64 #include "src/core/util/alloc.h"
65 #include "src/core/util/backoff.h"
66 #include "src/core/util/debug_location.h"
67 #include "src/core/util/orphanable.h"
68 #include "src/core/util/ref_counted.h"
69 #include "src/core/util/ref_counted_ptr.h"
70 #include "src/core/util/status_helper.h"
71 #include "src/core/util/sync.h"
72 #include "src/core/util/useful.h"
73 
74 // Backoff parameters.
75 #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
76 #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
77 #define GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS 20
78 #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
79 #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
80 
81 // Conversion between subchannel call and call stack.
82 #define SUBCHANNEL_CALL_TO_CALL_STACK(call) \
83   (grpc_call_stack*)((char*)(call) +        \
84                      GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
85 #define CALL_STACK_TO_SUBCHANNEL_CALL(callstack) \
86   (SubchannelCall*)(((char*)(call_stack)) -      \
87                     GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)))
88 
89 namespace grpc_core {
90 
91 using ::grpc_event_engine::experimental::EventEngine;
92 
93 //
94 // ConnectedSubchannel
95 //
96 
ConnectedSubchannel(const ChannelArgs & args)97 ConnectedSubchannel::ConnectedSubchannel(const ChannelArgs& args)
98     : RefCounted<ConnectedSubchannel>(
99           GRPC_TRACE_FLAG_ENABLED(subchannel_refcount) ? "ConnectedSubchannel"
100                                                        : nullptr),
101       args_(args) {}
102 
103 //
104 // LegacyConnectedSubchannel
105 //
106 
107 class LegacyConnectedSubchannel : public ConnectedSubchannel {
108  public:
LegacyConnectedSubchannel(RefCountedPtr<grpc_channel_stack> channel_stack,const ChannelArgs & args,RefCountedPtr<channelz::SubchannelNode> channelz_node)109   LegacyConnectedSubchannel(
110       RefCountedPtr<grpc_channel_stack> channel_stack, const ChannelArgs& args,
111       RefCountedPtr<channelz::SubchannelNode> channelz_node)
112       : ConnectedSubchannel(args),
113         channelz_node_(std::move(channelz_node)),
114         channel_stack_(std::move(channel_stack)) {}
115 
~LegacyConnectedSubchannel()116   ~LegacyConnectedSubchannel() override {
117     channel_stack_.reset(DEBUG_LOCATION, "ConnectedSubchannel");
118   }
119 
channelz_node() const120   channelz::SubchannelNode* channelz_node() const {
121     return channelz_node_.get();
122   }
123 
StartWatch(grpc_pollset_set * interested_parties,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)124   void StartWatch(
125       grpc_pollset_set* interested_parties,
126       OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
127     grpc_transport_op* op = grpc_make_transport_op(nullptr);
128     op->start_connectivity_watch = std::move(watcher);
129     op->start_connectivity_watch_state = GRPC_CHANNEL_READY;
130     op->bind_pollset_set = interested_parties;
131     grpc_channel_element* elem =
132         grpc_channel_stack_element(channel_stack_.get(), 0);
133     elem->filter->start_transport_op(elem, op);
134   }
135 
Ping(absl::AnyInvocable<void (absl::Status)>)136   void Ping(absl::AnyInvocable<void(absl::Status)>) override {
137     Crash("call v3 ping method called in legacy impl");
138   }
139 
unstarted_call_destination() const140   RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
141       const override {
142     Crash("call v3 unstarted_call_destination method called in legacy impl");
143   }
144 
channel_stack() const145   grpc_channel_stack* channel_stack() const override {
146     return channel_stack_.get();
147   }
148 
GetInitialCallSizeEstimate() const149   size_t GetInitialCallSizeEstimate() const override {
150     return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(SubchannelCall)) +
151            channel_stack_->call_stack_size;
152   }
153 
Ping(grpc_closure * on_initiate,grpc_closure * on_ack)154   void Ping(grpc_closure* on_initiate, grpc_closure* on_ack) override {
155     grpc_transport_op* op = grpc_make_transport_op(nullptr);
156     op->send_ping.on_initiate = on_initiate;
157     op->send_ping.on_ack = on_ack;
158     grpc_channel_element* elem =
159         grpc_channel_stack_element(channel_stack_.get(), 0);
160     elem->filter->start_transport_op(elem, op);
161   }
162 
163  private:
164   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
165   RefCountedPtr<grpc_channel_stack> channel_stack_;
166 };
167 
168 //
169 // NewConnectedSubchannel
170 //
171 
172 class NewConnectedSubchannel : public ConnectedSubchannel {
173  public:
174   class TransportCallDestination final : public CallDestination {
175    public:
TransportCallDestination(OrphanablePtr<ClientTransport> transport)176     explicit TransportCallDestination(OrphanablePtr<ClientTransport> transport)
177         : transport_(std::move(transport)) {}
178 
transport()179     ClientTransport* transport() { return transport_.get(); }
180 
HandleCall(CallHandler handler)181     void HandleCall(CallHandler handler) override {
182       transport_->StartCall(std::move(handler));
183     }
184 
Orphaned()185     void Orphaned() override { transport_.reset(); }
186 
187    private:
188     OrphanablePtr<ClientTransport> transport_;
189   };
190 
NewConnectedSubchannel(RefCountedPtr<UnstartedCallDestination> call_destination,RefCountedPtr<TransportCallDestination> transport,const ChannelArgs & args)191   NewConnectedSubchannel(
192       RefCountedPtr<UnstartedCallDestination> call_destination,
193       RefCountedPtr<TransportCallDestination> transport,
194       const ChannelArgs& args)
195       : ConnectedSubchannel(args),
196         call_destination_(std::move(call_destination)),
197         transport_(std::move(transport)) {}
198 
StartWatch(grpc_pollset_set *,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)199   void StartWatch(
200       grpc_pollset_set*,
201       OrphanablePtr<ConnectivityStateWatcherInterface> watcher) override {
202     transport_->transport()->StartConnectivityWatch(std::move(watcher));
203   }
204 
Ping(absl::AnyInvocable<void (absl::Status)>)205   void Ping(absl::AnyInvocable<void(absl::Status)>) override {
206     // TODO(ctiller): add new transport API for this in v3 stack
207     Crash("not implemented");
208   }
209 
unstarted_call_destination() const210   RefCountedPtr<UnstartedCallDestination> unstarted_call_destination()
211       const override {
212     return call_destination_;
213   }
214 
channel_stack() const215   grpc_channel_stack* channel_stack() const override { return nullptr; }
216 
GetInitialCallSizeEstimate() const217   size_t GetInitialCallSizeEstimate() const override { return 0; }
218 
Ping(grpc_closure *,grpc_closure *)219   void Ping(grpc_closure*, grpc_closure*) override {
220     Crash("legacy ping method called in call v3 impl");
221   }
222 
223  private:
224   RefCountedPtr<UnstartedCallDestination> call_destination_;
225   RefCountedPtr<TransportCallDestination> transport_;
226 };
227 
228 //
229 // SubchannelCall
230 //
231 
Create(Args args,grpc_error_handle * error)232 RefCountedPtr<SubchannelCall> SubchannelCall::Create(Args args,
233                                                      grpc_error_handle* error) {
234   const size_t allocation_size =
235       args.connected_subchannel->GetInitialCallSizeEstimate();
236   Arena* arena = args.arena;
237   return RefCountedPtr<SubchannelCall>(new (
238       arena->Alloc(allocation_size)) SubchannelCall(std::move(args), error));
239 }
240 
SubchannelCall(Args args,grpc_error_handle * error)241 SubchannelCall::SubchannelCall(Args args, grpc_error_handle* error)
242     : connected_subchannel_(args.connected_subchannel
243                                 .TakeAsSubclass<LegacyConnectedSubchannel>()),
244       deadline_(args.deadline) {
245   grpc_call_stack* callstk = SUBCHANNEL_CALL_TO_CALL_STACK(this);
246   const grpc_call_element_args call_args = {
247       callstk,              // call_stack
248       nullptr,              // server_transport_data
249       args.path.c_slice(),  // path
250       args.start_time,      // start_time
251       args.deadline,        // deadline
252       args.arena,           // arena
253       args.call_combiner    // call_combiner
254   };
255   *error = grpc_call_stack_init(connected_subchannel_->channel_stack(), 1,
256                                 SubchannelCall::Destroy, this, &call_args);
257   if (GPR_UNLIKELY(!error->ok())) {
258     LOG(ERROR) << "error: " << StatusToString(*error);
259     return;
260   }
261   grpc_call_stack_set_pollset_or_pollset_set(callstk, args.pollent);
262   auto* channelz_node = connected_subchannel_->channelz_node();
263   if (channelz_node != nullptr) {
264     channelz_node->RecordCallStarted();
265   }
266 }
267 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)268 void SubchannelCall::StartTransportStreamOpBatch(
269     grpc_transport_stream_op_batch* batch) {
270   MaybeInterceptRecvTrailingMetadata(batch);
271   grpc_call_stack* call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(this);
272   grpc_call_element* top_elem = grpc_call_stack_element(call_stack, 0);
273   GRPC_TRACE_LOG(channel, INFO)
274       << "OP[" << top_elem->filter->name << ":" << top_elem
275       << "]: " << grpc_transport_stream_op_batch_string(batch, false);
276   top_elem->filter->start_transport_stream_op_batch(top_elem, batch);
277 }
278 
GetCallStack()279 grpc_call_stack* SubchannelCall::GetCallStack() {
280   return SUBCHANNEL_CALL_TO_CALL_STACK(this);
281 }
282 
SetAfterCallStackDestroy(grpc_closure * closure)283 void SubchannelCall::SetAfterCallStackDestroy(grpc_closure* closure) {
284   CHECK_EQ(after_call_stack_destroy_, nullptr);
285   CHECK_NE(closure, nullptr);
286   after_call_stack_destroy_ = closure;
287 }
288 
Ref()289 RefCountedPtr<SubchannelCall> SubchannelCall::Ref() {
290   IncrementRefCount();
291   return RefCountedPtr<SubchannelCall>(this);
292 }
293 
Ref(const DebugLocation & location,const char * reason)294 RefCountedPtr<SubchannelCall> SubchannelCall::Ref(const DebugLocation& location,
295                                                   const char* reason) {
296   IncrementRefCount(location, reason);
297   return RefCountedPtr<SubchannelCall>(this);
298 }
299 
Unref()300 void SubchannelCall::Unref() {
301   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
302 }
303 
Unref(const DebugLocation &,const char * reason)304 void SubchannelCall::Unref(const DebugLocation& /*location*/,
305                            const char* reason) {
306   GRPC_CALL_STACK_UNREF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
307 }
308 
Destroy(void * arg,grpc_error_handle)309 void SubchannelCall::Destroy(void* arg, grpc_error_handle /*error*/) {
310   SubchannelCall* self = static_cast<SubchannelCall*>(arg);
311   // Keep some members before destroying the subchannel call.
312   grpc_closure* after_call_stack_destroy = self->after_call_stack_destroy_;
313   RefCountedPtr<ConnectedSubchannel> connected_subchannel =
314       std::move(self->connected_subchannel_);
315   // Destroy the subchannel call.
316   self->~SubchannelCall();
317   // Destroy the call stack. This should be after destroying the subchannel
318   // call, because call->after_call_stack_destroy(), if not null, will free
319   // the call arena.
320   grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(self), nullptr,
321                           after_call_stack_destroy);
322   // Automatically reset connected_subchannel. This should be after destroying
323   // the call stack, because destroying call stack needs access to the channel
324   // stack.
325 }
326 
MaybeInterceptRecvTrailingMetadata(grpc_transport_stream_op_batch * batch)327 void SubchannelCall::MaybeInterceptRecvTrailingMetadata(
328     grpc_transport_stream_op_batch* batch) {
329   // only intercept payloads with recv trailing.
330   if (!batch->recv_trailing_metadata) return;
331   // only add interceptor is channelz is enabled.
332   if (connected_subchannel_->channelz_node() == nullptr) return;
333   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
334                     this, grpc_schedule_on_exec_ctx);
335   // save some state needed for the interception callback.
336   CHECK_EQ(recv_trailing_metadata_, nullptr);
337   recv_trailing_metadata_ =
338       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
339   original_recv_trailing_metadata_ =
340       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
341   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
342       &recv_trailing_metadata_ready_;
343 }
344 
345 namespace {
346 
347 // Sets *status based on the rest of the parameters.
GetCallStatus(grpc_status_code * status,Timestamp deadline,grpc_metadata_batch * md_batch,grpc_error_handle error)348 void GetCallStatus(grpc_status_code* status, Timestamp deadline,
349                    grpc_metadata_batch* md_batch, grpc_error_handle error) {
350   if (!error.ok()) {
351     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
352   } else {
353     *status = md_batch->get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
354   }
355 }
356 
357 }  // namespace
358 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)359 void SubchannelCall::RecvTrailingMetadataReady(void* arg,
360                                                grpc_error_handle error) {
361   SubchannelCall* call = static_cast<SubchannelCall*>(arg);
362   CHECK_NE(call->recv_trailing_metadata_, nullptr);
363   grpc_status_code status = GRPC_STATUS_OK;
364   GetCallStatus(&status, call->deadline_, call->recv_trailing_metadata_, error);
365   channelz::SubchannelNode* channelz_node =
366       call->connected_subchannel_->channelz_node();
367   CHECK_NE(channelz_node, nullptr);
368   if (status == GRPC_STATUS_OK) {
369     channelz_node->RecordCallSucceeded();
370   } else {
371     channelz_node->RecordCallFailed();
372   }
373   Closure::Run(DEBUG_LOCATION, call->original_recv_trailing_metadata_, error);
374 }
375 
IncrementRefCount()376 void SubchannelCall::IncrementRefCount() {
377   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), "");
378 }
379 
IncrementRefCount(const DebugLocation &,const char * reason)380 void SubchannelCall::IncrementRefCount(const DebugLocation& /*location*/,
381                                        const char* reason) {
382   GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(this), reason);
383 }
384 
385 //
386 // Subchannel::ConnectedSubchannelStateWatcher
387 //
388 
389 class Subchannel::ConnectedSubchannelStateWatcher final
390     : public AsyncConnectivityStateWatcherInterface {
391  public:
392   // Must be instantiated while holding c->mu.
ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)393   explicit ConnectedSubchannelStateWatcher(WeakRefCountedPtr<Subchannel> c)
394       : subchannel_(std::move(c)) {}
395 
~ConnectedSubchannelStateWatcher()396   ~ConnectedSubchannelStateWatcher() override {
397     subchannel_.reset(DEBUG_LOCATION, "state_watcher");
398   }
399 
400  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)401   void OnConnectivityStateChange(grpc_connectivity_state new_state,
402                                  const absl::Status& status) override {
403     Subchannel* c = subchannel_.get();
404     {
405       MutexLock lock(&c->mu_);
406       // If we're either shutting down or have already seen this connection
407       // failure (i.e., c->connected_subchannel_ is null), do nothing.
408       //
409       // The transport reports TRANSIENT_FAILURE upon GOAWAY but SHUTDOWN
410       // upon connection close.  So if the server gracefully shuts down,
411       // we will see TRANSIENT_FAILURE followed by SHUTDOWN, but if not, we
412       // will see only SHUTDOWN.  Either way, we react to the first one we
413       // see, ignoring anything that happens after that.
414       if (c->connected_subchannel_ == nullptr) return;
415       if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE ||
416           new_state == GRPC_CHANNEL_SHUTDOWN) {
417         GRPC_TRACE_LOG(subchannel, INFO)
418             << "subchannel " << c << " " << c->key_.ToString()
419             << ": Connected subchannel " << c->connected_subchannel_.get()
420             << " reports " << ConnectivityStateName(new_state) << ": "
421             << status;
422         c->connected_subchannel_.reset();
423         if (c->channelz_node() != nullptr) {
424           c->channelz_node()->SetChildSocket(nullptr);
425         }
426         // Even though we're reporting IDLE instead of TRANSIENT_FAILURE here,
427         // pass along the status from the transport, since it may have
428         // keepalive info attached to it that the channel needs.
429         // TODO(roth): Consider whether there's a cleaner way to do this.
430         c->SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, status);
431         c->backoff_.Reset();
432       }
433     }
434     // Drain any connectivity state notifications after releasing the mutex.
435     c->work_serializer_.DrainQueue();
436   }
437 
438   WeakRefCountedPtr<Subchannel> subchannel_;
439 };
440 
441 //
442 // Subchannel::ConnectivityStateWatcherList
443 //
444 
AddWatcherLocked(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)445 void Subchannel::ConnectivityStateWatcherList::AddWatcherLocked(
446     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
447   watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
448 }
449 
RemoveWatcherLocked(ConnectivityStateWatcherInterface * watcher)450 void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
451     ConnectivityStateWatcherInterface* watcher) {
452   watchers_.erase(watcher);
453 }
454 
NotifyLocked(grpc_connectivity_state state,const absl::Status & status)455 void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
456     grpc_connectivity_state state, const absl::Status& status) {
457   for (const auto& p : watchers_) {
458     subchannel_->work_serializer_.Schedule(
459         [watcher = p.second->Ref(), state, status]() mutable {
460           auto* watcher_ptr = watcher.get();
461           watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
462                                                  status);
463         },
464         DEBUG_LOCATION);
465   }
466 }
467 
468 //
469 // Subchannel
470 //
471 
472 namespace {
473 
ParseArgsForBackoffValues(const ChannelArgs & args,Duration * min_connect_timeout)474 BackOff::Options ParseArgsForBackoffValues(const ChannelArgs& args,
475                                            Duration* min_connect_timeout) {
476   const absl::optional<Duration> fixed_reconnect_backoff =
477       args.GetDurationFromIntMillis("grpc.testing.fixed_reconnect_backoff_ms");
478   if (fixed_reconnect_backoff.has_value()) {
479     const Duration backoff =
480         std::max(Duration::Milliseconds(100), *fixed_reconnect_backoff);
481     *min_connect_timeout = backoff;
482     return BackOff::Options()
483         .set_initial_backoff(backoff)
484         .set_multiplier(1.0)
485         .set_jitter(0.0)
486         .set_max_backoff(backoff);
487   }
488   const Duration initial_backoff = std::max(
489       Duration::Milliseconds(100),
490       args.GetDurationFromIntMillis(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS)
491           .value_or(Duration::Seconds(
492               GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS)));
493   *min_connect_timeout =
494       std::max(Duration::Milliseconds(100),
495                args.GetDurationFromIntMillis(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS)
496                    .value_or(Duration::Seconds(
497                        GRPC_SUBCHANNEL_RECONNECT_MIN_TIMEOUT_SECONDS)));
498   const Duration max_backoff =
499       std::max(Duration::Milliseconds(100),
500                args.GetDurationFromIntMillis(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)
501                    .value_or(Duration::Seconds(
502                        GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS)));
503   return BackOff::Options()
504       .set_initial_backoff(initial_backoff)
505       .set_multiplier(GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER)
506       .set_jitter(GRPC_SUBCHANNEL_RECONNECT_JITTER)
507       .set_max_backoff(max_backoff);
508 }
509 
510 }  // namespace
511 
Subchannel(SubchannelKey key,OrphanablePtr<SubchannelConnector> connector,const ChannelArgs & args)512 Subchannel::Subchannel(SubchannelKey key,
513                        OrphanablePtr<SubchannelConnector> connector,
514                        const ChannelArgs& args)
515     : DualRefCounted<Subchannel>(GRPC_TRACE_FLAG_ENABLED(subchannel_refcount)
516                                      ? "Subchannel"
517                                      : nullptr),
518       key_(std::move(key)),
519       args_(args),
520       pollset_set_(grpc_pollset_set_create()),
521       connector_(std::move(connector)),
522       watcher_list_(this),
523       work_serializer_(args_.GetObjectRef<EventEngine>()),
524       backoff_(ParseArgsForBackoffValues(args_, &min_connect_timeout_)),
525       event_engine_(args_.GetObjectRef<EventEngine>()) {
526   // A grpc_init is added here to ensure that grpc_shutdown does not happen
527   // until the subchannel is destroyed. Subchannels can persist longer than
528   // channels because they maybe reused/shared among multiple channels. As a
529   // result the subchannel destruction happens asynchronously to channel
530   // destruction. If the last channel destruction triggers a grpc_shutdown
531   // before the last subchannel destruction, then there maybe race conditions
532   // triggering segmentation faults. To prevent this issue, we call a
533   // grpc_init here and a grpc_shutdown in the subchannel destructor.
534   InitInternally();
535   global_stats().IncrementClientSubchannelsCreated();
536   GRPC_CLOSURE_INIT(&on_connecting_finished_, OnConnectingFinished, this,
537                     grpc_schedule_on_exec_ctx);
538   // Check proxy mapper to determine address to connect to and channel
539   // args to use.
540   address_for_connect_ = CoreConfiguration::Get()
541                              .proxy_mapper_registry()
542                              .MapAddress(key_.address(), &args_)
543                              .value_or(key_.address());
544   // Initialize channelz.
545   const bool channelz_enabled = args_.GetBool(GRPC_ARG_ENABLE_CHANNELZ)
546                                     .value_or(GRPC_ENABLE_CHANNELZ_DEFAULT);
547   if (channelz_enabled) {
548     const size_t channel_tracer_max_memory = Clamp(
549         args_.GetInt(GRPC_ARG_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE)
550             .value_or(GRPC_MAX_CHANNEL_TRACE_EVENT_MEMORY_PER_NODE_DEFAULT),
551         0, INT_MAX);
552     channelz_node_ = MakeRefCounted<channelz::SubchannelNode>(
553         grpc_sockaddr_to_uri(&key_.address())
554             .value_or("<unknown address type>"),
555         channel_tracer_max_memory);
556     channelz_node_->AddTraceEvent(
557         channelz::ChannelTrace::Severity::Info,
558         grpc_slice_from_static_string("subchannel created"));
559   }
560 }
561 
~Subchannel()562 Subchannel::~Subchannel() {
563   if (channelz_node_ != nullptr) {
564     channelz_node_->AddTraceEvent(
565         channelz::ChannelTrace::Severity::Info,
566         grpc_slice_from_static_string("Subchannel destroyed"));
567     channelz_node_->UpdateConnectivityState(GRPC_CHANNEL_SHUTDOWN);
568   }
569   connector_.reset();
570   grpc_pollset_set_destroy(pollset_set_);
571   // grpc_shutdown is called here because grpc_init is called in the ctor.
572   ShutdownInternally();
573 }
574 
Create(OrphanablePtr<SubchannelConnector> connector,const grpc_resolved_address & address,const ChannelArgs & args)575 RefCountedPtr<Subchannel> Subchannel::Create(
576     OrphanablePtr<SubchannelConnector> connector,
577     const grpc_resolved_address& address, const ChannelArgs& args) {
578   SubchannelKey key(address, args);
579   auto* subchannel_pool = args.GetObject<SubchannelPoolInterface>();
580   CHECK_NE(subchannel_pool, nullptr);
581   RefCountedPtr<Subchannel> c = subchannel_pool->FindSubchannel(key);
582   if (c != nullptr) {
583     return c;
584   }
585   c = MakeRefCounted<Subchannel>(std::move(key), std::move(connector), args);
586   // Try to register the subchannel before setting the subchannel pool.
587   // Otherwise, in case of a registration race, unreffing c in
588   // RegisterSubchannel() will cause c to be tried to be unregistered, while
589   // its key maps to a different subchannel.
590   RefCountedPtr<Subchannel> registered =
591       subchannel_pool->RegisterSubchannel(c->key_, c);
592   if (registered == c) c->subchannel_pool_ = subchannel_pool->Ref();
593   return registered;
594 }
595 
ThrottleKeepaliveTime(int new_keepalive_time)596 void Subchannel::ThrottleKeepaliveTime(int new_keepalive_time) {
597   MutexLock lock(&mu_);
598   // Only update the value if the new keepalive time is larger.
599   if (new_keepalive_time > keepalive_time_) {
600     keepalive_time_ = new_keepalive_time;
601     GRPC_TRACE_LOG(subchannel, INFO)
602         << "subchannel " << this << " " << key_.ToString()
603         << ": throttling keepalive time to " << new_keepalive_time;
604     args_ = args_.Set(GRPC_ARG_KEEPALIVE_TIME_MS, new_keepalive_time);
605   }
606 }
607 
channelz_node()608 channelz::SubchannelNode* Subchannel::channelz_node() {
609   return channelz_node_.get();
610 }
611 
WatchConnectivityState(RefCountedPtr<ConnectivityStateWatcherInterface> watcher)612 void Subchannel::WatchConnectivityState(
613     RefCountedPtr<ConnectivityStateWatcherInterface> watcher) {
614   {
615     MutexLock lock(&mu_);
616     grpc_pollset_set* interested_parties = watcher->interested_parties();
617     if (interested_parties != nullptr) {
618       grpc_pollset_set_add_pollset_set(pollset_set_, interested_parties);
619     }
620     work_serializer_.Schedule(
621         [watcher = watcher->Ref(), state = state_, status = status_]() mutable {
622           auto* watcher_ptr = watcher.get();
623           watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
624                                                  status);
625         },
626         DEBUG_LOCATION);
627     watcher_list_.AddWatcherLocked(std::move(watcher));
628   }
629   // Drain any connectivity state notifications after releasing the mutex.
630   work_serializer_.DrainQueue();
631 }
632 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)633 void Subchannel::CancelConnectivityStateWatch(
634     ConnectivityStateWatcherInterface* watcher) {
635   {
636     MutexLock lock(&mu_);
637     grpc_pollset_set* interested_parties = watcher->interested_parties();
638     if (interested_parties != nullptr) {
639       grpc_pollset_set_del_pollset_set(pollset_set_, interested_parties);
640     }
641     watcher_list_.RemoveWatcherLocked(watcher);
642   }
643   // Drain any connectivity state notifications after releasing the mutex.
644   // (Shouldn't actually be necessary in this case, but better safe than
645   // sorry.)
646   work_serializer_.DrainQueue();
647 }
648 
RequestConnection()649 void Subchannel::RequestConnection() {
650   {
651     MutexLock lock(&mu_);
652     if (state_ == GRPC_CHANNEL_IDLE) {
653       StartConnectingLocked();
654     }
655   }
656   // Drain any connectivity state notifications after releasing the mutex.
657   work_serializer_.DrainQueue();
658 }
659 
ResetBackoff()660 void Subchannel::ResetBackoff() {
661   // Hold a ref to ensure cancellation and subsequent deletion of the closure
662   // does not eliminate the last ref and destroy the Subchannel before the
663   // method returns.
664   auto self = WeakRef(DEBUG_LOCATION, "ResetBackoff");
665   {
666     MutexLock lock(&mu_);
667     backoff_.Reset();
668     if (state_ == GRPC_CHANNEL_TRANSIENT_FAILURE &&
669         event_engine_->Cancel(retry_timer_handle_)) {
670       OnRetryTimerLocked();
671     } else if (state_ == GRPC_CHANNEL_CONNECTING) {
672       next_attempt_time_ = Timestamp::Now();
673     }
674   }
675   // Drain any connectivity state notifications after releasing the mutex.
676   work_serializer_.DrainQueue();
677 }
678 
Orphaned()679 void Subchannel::Orphaned() {
680   // The subchannel_pool is only used once here in this subchannel, so the
681   // access can be outside of the lock.
682   if (subchannel_pool_ != nullptr) {
683     subchannel_pool_->UnregisterSubchannel(key_, this);
684     subchannel_pool_.reset();
685   }
686   {
687     MutexLock lock(&mu_);
688     CHECK(!shutdown_);
689     shutdown_ = true;
690     connector_.reset();
691     connected_subchannel_.reset();
692   }
693   // Drain any connectivity state notifications after releasing the mutex.
694   work_serializer_.DrainQueue();
695 }
696 
GetOrAddDataProducer(UniqueTypeName type,std::function<void (DataProducerInterface **)> get_or_add)697 void Subchannel::GetOrAddDataProducer(
698     UniqueTypeName type,
699     std::function<void(DataProducerInterface**)> get_or_add) {
700   MutexLock lock(&mu_);
701   auto it = data_producer_map_.emplace(type, nullptr).first;
702   get_or_add(&it->second);
703 }
704 
RemoveDataProducer(DataProducerInterface * data_producer)705 void Subchannel::RemoveDataProducer(DataProducerInterface* data_producer) {
706   MutexLock lock(&mu_);
707   auto it = data_producer_map_.find(data_producer->type());
708   if (it != data_producer_map_.end() && it->second == data_producer) {
709     data_producer_map_.erase(it);
710   }
711 }
712 
713 // Note: Must be called with a state that is different from the current state.
SetConnectivityStateLocked(grpc_connectivity_state state,const absl::Status & status)714 void Subchannel::SetConnectivityStateLocked(grpc_connectivity_state state,
715                                             const absl::Status& status) {
716   state_ = state;
717   if (status.ok()) {
718     status_ = status;
719   } else {
720     // Augment status message to include IP address.
721     status_ = absl::Status(status.code(),
722                            absl::StrCat(grpc_sockaddr_to_uri(&key_.address())
723                                             .value_or("<unknown address type>"),
724                                         ": ", status.message()));
725     status.ForEachPayload(
726         [this](absl::string_view key, const absl::Cord& value)
727         // Want to use ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) here,
728         // but that won't work, because we can't pass the lock
729         // annotation through absl::Status::ForEachPayload().
730         ABSL_NO_THREAD_SAFETY_ANALYSIS { status_.SetPayload(key, value); });
731   }
732   if (channelz_node_ != nullptr) {
733     channelz_node_->UpdateConnectivityState(state);
734     channelz_node_->AddTraceEvent(
735         channelz::ChannelTrace::Severity::Info,
736         grpc_slice_from_cpp_string(absl::StrCat(
737             "Subchannel connectivity state changed to ",
738             ConnectivityStateName(state),
739             status.ok() ? "" : absl::StrCat(": ", status_.ToString()))));
740   }
741   // Notify watchers.
742   watcher_list_.NotifyLocked(state, status_);
743 }
744 
OnRetryTimer()745 void Subchannel::OnRetryTimer() {
746   {
747     MutexLock lock(&mu_);
748     OnRetryTimerLocked();
749   }
750   // Drain any connectivity state notifications after releasing the mutex.
751   work_serializer_.DrainQueue();
752 }
753 
OnRetryTimerLocked()754 void Subchannel::OnRetryTimerLocked() {
755   if (shutdown_) return;
756   GRPC_TRACE_LOG(subchannel, INFO)
757       << "subchannel " << this << " " << key_.ToString()
758       << ": backoff delay elapsed, reporting IDLE";
759   SetConnectivityStateLocked(GRPC_CHANNEL_IDLE, absl::OkStatus());
760 }
761 
StartConnectingLocked()762 void Subchannel::StartConnectingLocked() {
763   // Set next attempt time.
764   const Timestamp now = Timestamp::Now();
765   const Timestamp min_deadline = now + min_connect_timeout_;
766   next_attempt_time_ = now + backoff_.NextAttemptDelay();
767   // Report CONNECTING.
768   SetConnectivityStateLocked(GRPC_CHANNEL_CONNECTING, absl::OkStatus());
769   // Start connection attempt.
770   SubchannelConnector::Args args;
771   args.address = &address_for_connect_;
772   args.interested_parties = pollset_set_;
773   args.deadline = std::max(next_attempt_time_, min_deadline);
774   args.channel_args = args_;
775   WeakRef(DEBUG_LOCATION, "Connect").release();  // Ref held by callback.
776   connector_->Connect(args, &connecting_result_, &on_connecting_finished_);
777 }
778 
OnConnectingFinished(void * arg,grpc_error_handle error)779 void Subchannel::OnConnectingFinished(void* arg, grpc_error_handle error) {
780   WeakRefCountedPtr<Subchannel> c(static_cast<Subchannel*>(arg));
781   {
782     MutexLock lock(&c->mu_);
783     c->OnConnectingFinishedLocked(error);
784   }
785   // Drain any connectivity state notifications after releasing the mutex.
786   c->work_serializer_.DrainQueue();
787   c.reset(DEBUG_LOCATION, "Connect");
788 }
789 
OnConnectingFinishedLocked(grpc_error_handle error)790 void Subchannel::OnConnectingFinishedLocked(grpc_error_handle error) {
791   if (shutdown_) {
792     connecting_result_.Reset();
793     return;
794   }
795   // If we didn't get a transport or we fail to publish it, report
796   // TRANSIENT_FAILURE and start the retry timer.
797   // Note that if the connection attempt took longer than the backoff
798   // time, then the timer will fire immediately, and we will quickly
799   // transition back to IDLE.
800   if (connecting_result_.transport == nullptr || !PublishTransportLocked()) {
801     const Duration time_until_next_attempt =
802         next_attempt_time_ - Timestamp::Now();
803     GRPC_TRACE_LOG(subchannel, INFO)
804         << "subchannel " << this << " " << key_.ToString()
805         << ": connect failed (" << StatusToString(error)
806         << "), backing off for " << time_until_next_attempt.millis() << " ms";
807     SetConnectivityStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
808                                grpc_error_to_absl_status(error));
809     retry_timer_handle_ = event_engine_->RunAfter(
810         time_until_next_attempt,
811         [self = WeakRef(DEBUG_LOCATION, "RetryTimer")]() mutable {
812           {
813             ApplicationCallbackExecCtx callback_exec_ctx;
814             ExecCtx exec_ctx;
815             self->OnRetryTimer();
816             // Subchannel deletion might require an active ExecCtx. So if
817             // self.reset() is not called here, the WeakRefCountedPtr
818             // destructor may run after the ExecCtx declared in the callback
819             // is destroyed. Since subchannel may get destroyed when the
820             // WeakRefCountedPtr destructor runs, it may not have an active
821             // ExecCtx - thus leading to crashes.
822             self.reset();
823           }
824         });
825   }
826 }
827 
PublishTransportLocked()828 bool Subchannel::PublishTransportLocked() {
829   auto socket_node = std::move(connecting_result_.socket_node);
830   if (connecting_result_.transport->filter_stack_transport() != nullptr) {
831     // Construct channel stack.
832     // Builder takes ownership of transport.
833     ChannelStackBuilderImpl builder(
834         "subchannel", GRPC_CLIENT_SUBCHANNEL,
835         connecting_result_.channel_args.SetObject(
836             std::exchange(connecting_result_.transport, nullptr)));
837     if (!CoreConfiguration::Get().channel_init().CreateStack(&builder)) {
838       return false;
839     }
840     absl::StatusOr<RefCountedPtr<grpc_channel_stack>> stack = builder.Build();
841     if (!stack.ok()) {
842       connecting_result_.Reset();
843       LOG(ERROR) << "subchannel " << this << " " << key_.ToString()
844                  << ": error initializing subchannel stack: " << stack.status();
845       return false;
846     }
847     connected_subchannel_ = MakeRefCounted<LegacyConnectedSubchannel>(
848         std::move(*stack), args_, channelz_node_);
849   } else {
850     OrphanablePtr<ClientTransport> transport(
851         std::exchange(connecting_result_.transport, nullptr)
852             ->client_transport());
853     InterceptionChainBuilder builder(
854         connecting_result_.channel_args.SetObject(transport.get()));
855     if (channelz_node_ != nullptr) {
856       // TODO(ctiller): If/when we have a good way to access the subchannel
857       // from a filter (maybe GetContext<Subchannel>?), consider replacing
858       // these two hooks with a filter so that we can avoid storing two
859       // separate refs to the channelz node in each connection.
860       builder.AddOnClientInitialMetadata(
861           [channelz_node = channelz_node_](ClientMetadata&) {
862             channelz_node->RecordCallStarted();
863           });
864       builder.AddOnServerTrailingMetadata(
865           [channelz_node = channelz_node_](ServerMetadata& metadata) {
866             if (IsStatusOk(metadata)) {
867               channelz_node->RecordCallSucceeded();
868             } else {
869               channelz_node->RecordCallFailed();
870             }
871           });
872     }
873     CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
874         GRPC_CLIENT_SUBCHANNEL, builder);
875     auto transport_destination =
876         MakeRefCounted<NewConnectedSubchannel::TransportCallDestination>(
877             std::move(transport));
878     auto call_destination = builder.Build(transport_destination);
879     if (!call_destination.ok()) {
880       connecting_result_.Reset();
881       LOG(ERROR) << "subchannel " << this << " " << key_.ToString()
882                  << ": error initializing subchannel stack: "
883                  << call_destination.status();
884       return false;
885     }
886     connected_subchannel_ = MakeRefCounted<NewConnectedSubchannel>(
887         std::move(*call_destination), std::move(transport_destination), args_);
888   }
889   connecting_result_.Reset();
890   // Publish.
891   GRPC_TRACE_LOG(subchannel, INFO)
892       << "subchannel " << this << " " << key_.ToString()
893       << ": new connected subchannel at " << connected_subchannel_.get();
894   if (channelz_node_ != nullptr) {
895     channelz_node_->SetChildSocket(std::move(socket_node));
896   }
897   // Start watching connected subchannel.
898   connected_subchannel_->StartWatch(
899       pollset_set_, MakeOrphanable<ConnectedSubchannelStateWatcher>(
900                         WeakRef(DEBUG_LOCATION, "state_watcher")));
901   // Report initial state.
902   SetConnectivityStateLocked(GRPC_CHANNEL_READY, absl::Status());
903   return true;
904 }
905 
MakeSubchannelArgs(const ChannelArgs & channel_args,const ChannelArgs & address_args,const RefCountedPtr<SubchannelPoolInterface> & subchannel_pool,const std::string & channel_default_authority)906 ChannelArgs Subchannel::MakeSubchannelArgs(
907     const ChannelArgs& channel_args, const ChannelArgs& address_args,
908     const RefCountedPtr<SubchannelPoolInterface>& subchannel_pool,
909     const std::string& channel_default_authority) {
910   // Note that we start with the channel-level args and then apply the
911   // per-address args, so that if a value is present in both, the one
912   // in the channel-level args is used.  This is particularly important
913   // for the GRPC_ARG_DEFAULT_AUTHORITY arg, which we want to allow
914   // resolvers to set on a per-address basis only if the application
915   // did not explicitly set it at the channel level.
916   return channel_args.UnionWith(address_args)
917       .SetObject(subchannel_pool)
918       // If we haven't already set the default authority arg (i.e., it
919       // was not explicitly set by the application nor overridden by
920       // the resolver), add it from the channel's default.
921       .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, channel_default_authority)
922       // Remove channel args that should not affect subchannel
923       // uniqueness.
924       .Remove(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME)
925       .Remove(GRPC_ARG_INHIBIT_HEALTH_CHECKING)
926       .Remove(GRPC_ARG_CHANNELZ_CHANNEL_NODE)
927       // Remove all keys with the no-subchannel prefix.
928       .RemoveAllKeysWithPrefix(GRPC_ARG_NO_SUBCHANNEL_PREFIX);
929 }
930 
931 }  // namespace grpc_core
932