• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2015 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/client_channel/client_channel.h"
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <grpc/slice.h>
20 #include <grpc/status.h>
21 #include <grpc/support/json.h>
22 #include <grpc/support/metrics.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/string_util.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27 #include <limits.h>
28 
29 #include <algorithm>
30 #include <functional>
31 #include <new>
32 #include <set>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36 
37 #include "absl/cleanup/cleanup.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/status/statusor.h"
41 #include "absl/strings/cord.h"
42 #include "absl/strings/numbers.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_join.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48 #include "src/core/client_channel/client_channel_internal.h"
49 #include "src/core/client_channel/client_channel_service_config.h"
50 #include "src/core/client_channel/config_selector.h"
51 #include "src/core/client_channel/dynamic_filters.h"
52 #include "src/core/client_channel/global_subchannel_pool.h"
53 #include "src/core/client_channel/local_subchannel_pool.h"
54 #include "src/core/client_channel/retry_interceptor.h"
55 #include "src/core/client_channel/subchannel.h"
56 #include "src/core/client_channel/subchannel_interface_internal.h"
57 #include "src/core/config/core_configuration.h"
58 #include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
59 #include "src/core/lib/address_utils/sockaddr_utils.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/status_util.h"
62 #include "src/core/lib/debug/trace.h"
63 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
64 #include "src/core/lib/iomgr/resolved_address.h"
65 #include "src/core/lib/promise/context.h"
66 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
67 #include "src/core/lib/promise/loop.h"
68 #include "src/core/lib/promise/map.h"
69 #include "src/core/lib/promise/poll.h"
70 #include "src/core/lib/promise/sleep.h"
71 #include "src/core/lib/promise/try_seq.h"
72 #include "src/core/lib/resource_quota/arena.h"
73 #include "src/core/lib/security/credentials/credentials.h"
74 #include "src/core/lib/slice/slice.h"
75 #include "src/core/lib/slice/slice_internal.h"
76 #include "src/core/lib/surface/call.h"
77 #include "src/core/lib/surface/channel.h"
78 #include "src/core/lib/surface/client_call.h"
79 #include "src/core/lib/surface/completion_queue.h"
80 #include "src/core/lib/transport/call_spine.h"
81 #include "src/core/lib/transport/connectivity_state.h"
82 #include "src/core/lib/transport/metadata_batch.h"
83 #include "src/core/load_balancing/child_policy_handler.h"
84 #include "src/core/load_balancing/lb_policy.h"
85 #include "src/core/load_balancing/lb_policy_registry.h"
86 #include "src/core/load_balancing/subchannel_interface.h"
87 #include "src/core/resolver/endpoint_addresses.h"
88 #include "src/core/resolver/resolver_registry.h"
89 #include "src/core/service_config/service_config_impl.h"
90 #include "src/core/telemetry/metrics.h"
91 #include "src/core/util/crash.h"
92 #include "src/core/util/debug_location.h"
93 #include "src/core/util/json/json.h"
94 #include "src/core/util/sync.h"
95 #include "src/core/util/useful.h"
96 #include "src/core/util/work_serializer.h"
97 
98 namespace grpc_core {
99 
100 using grpc_event_engine::experimental::EventEngine;
101 
102 using internal::ClientChannelMethodParsedConfig;
103 
104 //
105 // ClientChannel::ResolverResultHandler
106 //
107 
108 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
109  public:
ResolverResultHandler(WeakRefCountedPtr<ClientChannel> client_channel)110   explicit ResolverResultHandler(
111       WeakRefCountedPtr<ClientChannel> client_channel)
112       : client_channel_(std::move(client_channel)) {}
113 
~ResolverResultHandler()114   ~ResolverResultHandler() override {
115     GRPC_TRACE_LOG(client_channel, INFO)
116         << "client_channel=" << client_channel_.get()
117         << ": resolver shutdown complete";
118   }
119 
ReportResult(Resolver::Result result)120   void ReportResult(Resolver::Result result) override
121       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
122     client_channel_->OnResolverResultChangedLocked(std::move(result));
123   }
124 
125  private:
126   WeakRefCountedPtr<ClientChannel> client_channel_;
127 };
128 
129 //
130 // ClientChannel::SubchannelWrapper
131 //
132 
133 // This class is a wrapper for Subchannel that hides details of the
134 // channel's implementation (such as the connected subchannel) from the
135 // LB policy API.
136 //
137 // Note that no synchronization is needed here, because even if the
138 // underlying subchannel is shared between channels, this wrapper will only
139 // be used within one channel, so it will always be synchronized by the
140 // control plane work_serializer.
141 class ClientChannel::SubchannelWrapper
142     : public SubchannelInterfaceWithCallDestination {
143  public:
144   SubchannelWrapper(WeakRefCountedPtr<ClientChannel> client_channel,
145                     RefCountedPtr<Subchannel> subchannel);
146   ~SubchannelWrapper() override;
147 
148   void Orphaned() override;
149   void WatchConnectivityState(
150       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
151       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
152   void CancelConnectivityStateWatch(
153       ConnectivityStateWatcherInterface* watcher) override
154       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
155 
call_destination()156   RefCountedPtr<UnstartedCallDestination> call_destination() override {
157     return subchannel_->call_destination();
158   }
159 
RequestConnection()160   void RequestConnection() override { subchannel_->RequestConnection(); }
161 
ResetBackoff()162   void ResetBackoff() override { subchannel_->ResetBackoff(); }
163 
164   void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
165       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
166   void CancelDataWatcher(DataWatcherInterface* watcher) override
167       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
168   void ThrottleKeepaliveTime(int new_keepalive_time);
address() const169   std::string address() const override { return subchannel_->address(); }
170 
171  private:
172   class WatcherWrapper;
173 
174   // A heterogenous lookup comparator for data watchers that allows
175   // unique_ptr keys to be looked up as raw pointers.
176   struct DataWatcherLessThan {
177     using is_transparent = void;
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan178     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
179                     const std::unique_ptr<DataWatcherInterface>& p2) const {
180       return p1 < p2;
181     }
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan182     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
183                     const DataWatcherInterface* p2) const {
184       return p1.get() < p2;
185     }
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan186     bool operator()(const DataWatcherInterface* p1,
187                     const std::unique_ptr<DataWatcherInterface>& p2) const {
188       return p1 < p2.get();
189     }
190   };
191 
192   WeakRefCountedPtr<ClientChannel> client_channel_;
193   RefCountedPtr<Subchannel> subchannel_;
194   // Maps from the address of the watcher passed to us by the LB policy
195   // to the address of the WrapperWatcher that we passed to the underlying
196   // subchannel.  This is needed so that when the LB policy calls
197   // CancelConnectivityStateWatch() with its watcher, we know the
198   // corresponding WrapperWatcher to cancel on the underlying subchannel.
199   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
200       ABSL_GUARDED_BY(*client_channel_->work_serializer_);
201   std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
202       data_watchers_ ABSL_GUARDED_BY(*client_channel_->work_serializer_);
203 };
204 
205 // This wrapper provides a bridge between the internal Subchannel API
206 // and the SubchannelInterface API that we expose to LB policies.
207 // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
208 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
209 // that was passed in by the LB policy.  We pass an instance of this
210 // class to the underlying Subchannel, and when we get updates from
211 // the subchannel, we pass those on to the wrapped watcher to return
212 // the update to the LB policy.
213 //
214 // This class handles things like hopping into the WorkSerializer
215 // before passing notifications to the LB policy and propagating
216 // keepalive information between subchannels.
217 class ClientChannel::SubchannelWrapper::WatcherWrapper
218     : public Subchannel::ConnectivityStateWatcherInterface {
219  public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> subchannel_wrapper)220   WatcherWrapper(
221       std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
222           watcher,
223       RefCountedPtr<SubchannelWrapper> subchannel_wrapper)
224       : watcher_(std::move(watcher)),
225         subchannel_wrapper_(std::move(subchannel_wrapper)) {}
226 
~WatcherWrapper()227   ~WatcherWrapper() override {
228     subchannel_wrapper_.reset(DEBUG_LOCATION, "WatcherWrapper");
229   }
230 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)231   void OnConnectivityStateChange(
232       RefCountedPtr<ConnectivityStateWatcherInterface> self,
233       grpc_connectivity_state state, const absl::Status& status) override {
234     GRPC_TRACE_LOG(client_channel, INFO)
235         << "client_channel=" << subchannel_wrapper_->client_channel_.get()
236         << ": connectivity change for subchannel wrapper "
237         << subchannel_wrapper_.get() << " subchannel "
238         << subchannel_wrapper_->subchannel_.get()
239         << "; hopping into work_serializer";
240     self.release();  // Held by callback.
241     subchannel_wrapper_->client_channel_->work_serializer_->Run(
242         [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
243             *subchannel_wrapper_->client_channel_->work_serializer_) {
244           ApplyUpdateInControlPlaneWorkSerializer(state, status);
245           Unref();
246         },
247         DEBUG_LOCATION);
248   }
249 
interested_parties()250   grpc_pollset_set* interested_parties() override { return nullptr; }
251 
252  private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)253   void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
254                                                const absl::Status& status)
255       ABSL_EXCLUSIVE_LOCKS_REQUIRED(
256           *subchannel_wrapper_->client_channel_->work_serializer_) {
257     GRPC_TRACE_LOG(client_channel, INFO)
258         << "client_channel=" << subchannel_wrapper_->client_channel_.get()
259         << ": processing connectivity change in work serializer for subchannel "
260            "wrapper "
261         << subchannel_wrapper_.get() << " subchannel "
262         << subchannel_wrapper_->subchannel_.get()
263         << " watcher=" << watcher_.get()
264         << " state=" << ConnectivityStateName(state) << " status=" << status;
265     absl::optional<absl::Cord> keepalive_throttling =
266         status.GetPayload(kKeepaliveThrottlingKey);
267     if (keepalive_throttling.has_value()) {
268       int new_keepalive_time = -1;
269       if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
270                            &new_keepalive_time)) {
271         if (new_keepalive_time >
272             subchannel_wrapper_->client_channel_->keepalive_time_) {
273           subchannel_wrapper_->client_channel_->keepalive_time_ =
274               new_keepalive_time;
275           GRPC_TRACE_LOG(client_channel, INFO)
276               << "client_channel=" << subchannel_wrapper_->client_channel_.get()
277               << ": throttling keepalive time to "
278               << subchannel_wrapper_->client_channel_->keepalive_time_;
279           // Propagate the new keepalive time to all subchannels. This is so
280           // that new transports created by any subchannel (and not just the
281           // subchannel that received the GOAWAY), use the new keepalive time.
282           for (auto* subchannel_wrapper :
283                subchannel_wrapper_->client_channel_->subchannel_wrappers_) {
284             subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
285           }
286         }
287       } else {
288         LOG(ERROR) << "client_channel="
289                    << subchannel_wrapper_->client_channel_.get()
290                    << ": Illegal keepalive throttling value "
291                    << std::string(keepalive_throttling.value());
292       }
293     }
294     // Propagate status only in state TF.
295     // We specifically want to avoid propagating the status for
296     // state IDLE that the real subchannel gave us only for the
297     // purpose of keepalive propagation.
298     watcher_->OnConnectivityStateChange(
299         state,
300         state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
301   }
302 
303   std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
304       watcher_;
305   RefCountedPtr<SubchannelWrapper> subchannel_wrapper_;
306 };
307 
SubchannelWrapper(WeakRefCountedPtr<ClientChannel> client_channel,RefCountedPtr<Subchannel> subchannel)308 ClientChannel::SubchannelWrapper::SubchannelWrapper(
309     WeakRefCountedPtr<ClientChannel> client_channel,
310     RefCountedPtr<Subchannel> subchannel)
311     : SubchannelInterfaceWithCallDestination(
312           GRPC_TRACE_FLAG_ENABLED(client_channel) ? "SubchannelWrapper"
313                                                   : nullptr),
314       client_channel_(std::move(client_channel)),
315       subchannel_(std::move(subchannel)) {
316   GRPC_TRACE_LOG(client_channel, INFO)
317       << "client_channel=" << client_channel_.get()
318       << ": creating subchannel wrapper " << this << " for subchannel "
319       << subchannel_.get();
320 #ifndef NDEBUG
321   DCHECK(client_channel_->work_serializer_->RunningInWorkSerializer());
322 #endif
323   if (client_channel_->channelz_node_ != nullptr) {
324     auto* subchannel_node = subchannel_->channelz_node();
325     if (subchannel_node != nullptr) {
326       auto it =
327           client_channel_->subchannel_refcount_map_.find(subchannel_.get());
328       if (it == client_channel_->subchannel_refcount_map_.end()) {
329         client_channel_->channelz_node_->AddChildSubchannel(
330             subchannel_node->uuid());
331         it = client_channel_->subchannel_refcount_map_
332                  .emplace(subchannel_.get(), 0)
333                  .first;
334       }
335       ++it->second;
336     }
337   }
338   client_channel_->subchannel_wrappers_.insert(this);
339 }
340 
~SubchannelWrapper()341 ClientChannel::SubchannelWrapper::~SubchannelWrapper() {
342   GRPC_TRACE_LOG(client_channel, INFO)
343       << "client_channel=" << client_channel_.get()
344       << ": destroying subchannel wrapper " << this << " for subchannel "
345       << subchannel_.get();
346 }
347 
Orphaned()348 void ClientChannel::SubchannelWrapper::Orphaned() {
349   // Make sure we clean up the channel's subchannel maps inside the
350   // WorkSerializer.
351   auto self = WeakRefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION,
352                                                    "subchannel map cleanup");
353   client_channel_->work_serializer_->Run(
354       [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
355           *self->client_channel_->work_serializer_) {
356         self->client_channel_->subchannel_wrappers_.erase(self.get());
357         if (self->client_channel_->channelz_node_ != nullptr) {
358           auto* subchannel_node = self->subchannel_->channelz_node();
359           if (subchannel_node != nullptr) {
360             auto it = self->client_channel_->subchannel_refcount_map_.find(
361                 self->subchannel_.get());
362             CHECK(it != self->client_channel_->subchannel_refcount_map_.end());
363             --it->second;
364             if (it->second == 0) {
365               self->client_channel_->channelz_node_->RemoveChildSubchannel(
366                   subchannel_node->uuid());
367               self->client_channel_->subchannel_refcount_map_.erase(it);
368             }
369           }
370         }
371       },
372       DEBUG_LOCATION);
373 }
374 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)375 void ClientChannel::SubchannelWrapper::WatchConnectivityState(
376     std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
377   auto& watcher_wrapper = watcher_map_[watcher.get()];
378   CHECK(watcher_wrapper == nullptr);
379   watcher_wrapper = new WatcherWrapper(
380       std::move(watcher),
381       RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
382   subchannel_->WatchConnectivityState(
383       RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
384           watcher_wrapper));
385 }
386 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)387 void ClientChannel::SubchannelWrapper::CancelConnectivityStateWatch(
388     ConnectivityStateWatcherInterface* watcher) {
389   auto it = watcher_map_.find(watcher);
390   CHECK(it != watcher_map_.end());
391   subchannel_->CancelConnectivityStateWatch(it->second);
392   watcher_map_.erase(it);
393 }
394 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)395 void ClientChannel::SubchannelWrapper::AddDataWatcher(
396     std::unique_ptr<DataWatcherInterface> watcher) {
397   static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
398       ->SetSubchannel(subchannel_.get());
399   CHECK(data_watchers_.insert(std::move(watcher)).second);
400 }
401 
CancelDataWatcher(DataWatcherInterface * watcher)402 void ClientChannel::SubchannelWrapper::CancelDataWatcher(
403     DataWatcherInterface* watcher) {
404   auto it = data_watchers_.find(watcher);
405   if (it != data_watchers_.end()) data_watchers_.erase(it);
406 }
407 
ThrottleKeepaliveTime(int new_keepalive_time)408 void ClientChannel::SubchannelWrapper::ThrottleKeepaliveTime(
409     int new_keepalive_time) {
410   subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
411 }
412 
413 //
414 // ClientChannel::ClientChannelControlHelper
415 //
416 
417 class ClientChannel::ClientChannelControlHelper
418     : public LoadBalancingPolicy::ChannelControlHelper {
419  public:
ClientChannelControlHelper(WeakRefCountedPtr<ClientChannel> client_channel)420   explicit ClientChannelControlHelper(
421       WeakRefCountedPtr<ClientChannel> client_channel)
422       : client_channel_(std::move(client_channel)) {}
423 
~ClientChannelControlHelper()424   ~ClientChannelControlHelper() override {
425     client_channel_.reset(DEBUG_LOCATION, "ClientChannelControlHelper");
426   }
427 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)428   RefCountedPtr<SubchannelInterface> CreateSubchannel(
429       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
430       const ChannelArgs& args) override
431       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
432     // If shutting down, do nothing.
433     if (client_channel_->resolver_ == nullptr) return nullptr;
434     ChannelArgs subchannel_args = Subchannel::MakeSubchannelArgs(
435         args, per_address_args, client_channel_->subchannel_pool_,
436         client_channel_->default_authority_);
437     // Create subchannel.
438     RefCountedPtr<Subchannel> subchannel =
439         client_channel_->client_channel_factory_->CreateSubchannel(
440             address, subchannel_args);
441     if (subchannel == nullptr) return nullptr;
442     // Make sure the subchannel has updated keepalive time.
443     subchannel->ThrottleKeepaliveTime(client_channel_->keepalive_time_);
444     // Create and return wrapper for the subchannel.
445     return MakeRefCounted<SubchannelWrapper>(client_channel_,
446                                              std::move(subchannel));
447   }
448 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)449   void UpdateState(
450       grpc_connectivity_state state, const absl::Status& status,
451       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override
452       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
453     if (client_channel_->resolver_ == nullptr) return;  // Shutting down.
454     if (GRPC_TRACE_FLAG_ENABLED(client_channel)) {
455       const char* extra = client_channel_->disconnect_error_.ok()
456                               ? ""
457                               : " (ignoring -- channel shutting down)";
458       LOG(INFO) << "client_channel=" << client_channel_.get()
459                 << ": update: state=" << ConnectivityStateName(state)
460                 << " status=(" << status << ") picker=" << picker.get()
461                 << extra;
462     }
463     // Do update only if not shutting down.
464     if (client_channel_->disconnect_error_.ok()) {
465       client_channel_->UpdateStateAndPickerLocked(state, status, "helper",
466                                                   std::move(picker));
467     }
468   }
469 
RequestReresolution()470   void RequestReresolution() override
471       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
472     if (client_channel_->resolver_ == nullptr) return;  // Shutting down.
473     GRPC_TRACE_LOG(client_channel, INFO)
474         << "client_channel=" << client_channel_.get()
475         << ": started name re-resolving";
476     client_channel_->resolver_->RequestReresolutionLocked();
477   }
478 
GetTarget()479   absl::string_view GetTarget() override { return client_channel_->target(); }
480 
GetAuthority()481   absl::string_view GetAuthority() override {
482     return client_channel_->default_authority_;
483   }
484 
GetChannelCredentials()485   RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
486     return client_channel_->channel_args_.GetObject<grpc_channel_credentials>()
487         ->duplicate_without_call_credentials();
488   }
489 
GetUnsafeChannelCredentials()490   RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
491       override {
492     return client_channel_->channel_args_.GetObject<grpc_channel_credentials>()
493         ->Ref();
494   }
495 
GetEventEngine()496   EventEngine* GetEventEngine() override {
497     return client_channel_->event_engine();
498   }
499 
GetStatsPluginGroup()500   GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
501     return client_channel_->stats_plugin_group_;
502   }
503 
AddTraceEvent(TraceSeverity severity,absl::string_view message)504   void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
505       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
506     if (client_channel_->resolver_ == nullptr) return;  // Shutting down.
507     if (client_channel_->channelz_node_ != nullptr) {
508       client_channel_->channelz_node_->AddTraceEvent(
509           ConvertSeverityEnum(severity),
510           grpc_slice_from_copied_buffer(message.data(), message.size()));
511     }
512   }
513 
514  private:
ConvertSeverityEnum(TraceSeverity severity)515   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
516       TraceSeverity severity) {
517     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
518     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
519     return channelz::ChannelTrace::Error;
520   }
521 
522   WeakRefCountedPtr<ClientChannel> client_channel_;
523 };
524 
525 //
526 // ClientChannel implementation
527 //
528 
529 namespace {
530 
GetSubchannelPool(const ChannelArgs & args)531 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
532     const ChannelArgs& args) {
533   if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
534     return MakeRefCounted<LocalSubchannelPool>();
535   }
536   return GlobalSubchannelPool::instance();
537 }
538 
539 }  // namespace
540 
Create(std::string target,ChannelArgs channel_args)541 absl::StatusOr<RefCountedPtr<Channel>> ClientChannel::Create(
542     std::string target, ChannelArgs channel_args) {
543   // Get URI to resolve, using proxy mapper if needed.
544   if (target.empty()) {
545     return absl::InternalError("target URI is empty in client channel");
546   }
547   std::string uri_to_resolve = CoreConfiguration::Get()
548                                    .proxy_mapper_registry()
549                                    .MapName(target, &channel_args)
550                                    .value_or(target);
551   // Make sure the URI to resolve is valid, so that we know that
552   // resolver creation will succeed later.
553   if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
554           uri_to_resolve)) {
555     return absl::InvalidArgumentError(
556         absl::StrCat("invalid target URI: ", uri_to_resolve));
557   }
558   // Get default service config.  If none is specified via the client API,
559   // we use an empty config.
560   absl::optional<absl::string_view> service_config_json =
561       channel_args.GetString(GRPC_ARG_SERVICE_CONFIG);
562   if (!service_config_json.has_value()) service_config_json = "{}";
563   auto default_service_config =
564       ServiceConfigImpl::Create(channel_args, *service_config_json);
565   if (!default_service_config.ok()) return default_service_config.status();
566   // Strip out service config channel arg, so that it doesn't affect
567   // subchannel uniqueness when the args flow down to that layer.
568   channel_args = channel_args.Remove(GRPC_ARG_SERVICE_CONFIG);
569   // Check client channel factory.
570   auto* client_channel_factory = channel_args.GetObject<ClientChannelFactory>();
571   if (client_channel_factory == nullptr) {
572     return absl::InternalError(
573         "Missing client channel factory in args for client channel");
574   }
575   auto* call_destination_factory =
576       channel_args.GetObject<CallDestinationFactory>();
577   if (call_destination_factory == nullptr) {
578     return absl::InternalError(
579         "Missing call destination factory in args for client channel");
580   }
581   if (channel_args.GetObject<EventEngine>() == nullptr) {
582     return absl::InternalError(
583         "Missing event engine in args for client channel");
584   }
585   // Success.  Construct channel.
586   return MakeRefCounted<ClientChannel>(
587       std::move(target), std::move(channel_args), std::move(uri_to_resolve),
588       std::move(*default_service_config), client_channel_factory,
589       call_destination_factory);
590 }
591 
592 namespace {
GetDefaultAuthorityFromChannelArgs(const ChannelArgs & channel_args,absl::string_view target)593 std::string GetDefaultAuthorityFromChannelArgs(const ChannelArgs& channel_args,
594                                                absl::string_view target) {
595   absl::optional<std::string> default_authority =
596       channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
597   if (!default_authority.has_value()) {
598     return CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
599         target);
600   } else {
601     return std::move(*default_authority);
602   }
603 }
604 }  // namespace
605 
ClientChannel(std::string target,ChannelArgs channel_args,std::string uri_to_resolve,RefCountedPtr<ServiceConfig> default_service_config,ClientChannelFactory * client_channel_factory,CallDestinationFactory * call_destination_factory)606 ClientChannel::ClientChannel(
607     std::string target, ChannelArgs channel_args, std::string uri_to_resolve,
608     RefCountedPtr<ServiceConfig> default_service_config,
609     ClientChannelFactory* client_channel_factory,
610     CallDestinationFactory* call_destination_factory)
611     : Channel(std::move(target), channel_args),
612       channel_args_(std::move(channel_args)),
613       event_engine_(channel_args_.GetObjectRef<EventEngine>()),
614       uri_to_resolve_(std::move(uri_to_resolve)),
615       service_config_parser_index_(
616           internal::ClientChannelServiceConfigParser::ParserIndex()),
617       default_service_config_(std::move(default_service_config)),
618       client_channel_factory_(client_channel_factory),
619       default_authority_(
620           GetDefaultAuthorityFromChannelArgs(channel_args_, this->target())),
621       channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
622       idle_timeout_(GetClientIdleTimeout(channel_args_)),
623       resolver_data_for_calls_(ResolverDataForCalls{}),
624       picker_(nullptr),
625       call_destination_(
626           call_destination_factory->CreateCallDestination(picker_)),
627       work_serializer_(std::make_shared<WorkSerializer>(event_engine_)),
628       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
629       subchannel_pool_(GetSubchannelPool(channel_args_)) {
630   CHECK(event_engine_.get() != nullptr);
631   GRPC_TRACE_LOG(client_channel, INFO)
632       << "client_channel=" << this << ": creating client_channel";
633   // Set initial keepalive time.
634   auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
635   if (keepalive_arg.has_value()) {
636     keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
637   } else {
638     keepalive_time_ = -1;  // unset
639   }
640   // Get stats plugins for channel.
641   grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
642       channel_args_);
643   experimental::StatsPluginChannelScope scope(
644       this->target(), default_authority_, endpoint_config);
645   stats_plugin_group_ =
646       GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
647 }
648 
~ClientChannel()649 ClientChannel::~ClientChannel() {
650   GRPC_TRACE_LOG(client_channel, INFO)
651       << "client_channel=" << this << ": destroying";
652 }
653 
Orphaned()654 void ClientChannel::Orphaned() {
655   GRPC_TRACE_LOG(client_channel, INFO)
656       << "client_channel=" << this << ": shutting down";
657   // Weird capture then copy needed to satisfy thread safety analysis,
658   // otherwise it seems to fail to recognize the correct lock is taken in the
659   // lambda.
660   auto self = WeakRefAsSubclass<ClientChannel>();
661   work_serializer_->Run(
662       [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
663         self->DestroyResolverAndLbPolicyLocked();
664       },
665       DEBUG_LOCATION);
666   // IncreaseCallCount() introduces a phony call and prevents the idle
667   // timer from being reset by other threads.
668   idle_state_.IncreaseCallCount();
669   idle_activity_.Reset();
670 }
671 
CheckConnectivityState(bool try_to_connect)672 grpc_connectivity_state ClientChannel::CheckConnectivityState(
673     bool try_to_connect) {
674   // state_tracker_ is guarded by work_serializer_, which we're not
675   // holding here.  But the one method of state_tracker_ that *is*
676   // thread-safe to call without external synchronization is the state()
677   // method, so we can disable thread-safety analysis for this one read.
678   grpc_connectivity_state state =
679       ABSL_TS_UNCHECKED_READ(state_tracker_).state();
680   if (state == GRPC_CHANNEL_IDLE && try_to_connect) {
681     auto self = WeakRefAsSubclass<ClientChannel>();  // Held by callback.
682     work_serializer_->Run(
683         [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
684           self->TryToConnectLocked();
685         },
686         DEBUG_LOCATION);
687   }
688   return state;
689 }
690 
691 namespace {
692 
693 // A fire-and-forget object to handle external connectivity state watches.
694 class ExternalStateWatcher : public RefCounted<ExternalStateWatcher> {
695  public:
ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)696   ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,
697                        grpc_completion_queue* cq, void* tag,
698                        grpc_connectivity_state last_observed_state,
699                        Timestamp deadline)
700       : channel_(std::move(channel)), cq_(cq), tag_(tag) {
701     MutexLock lock(&mu_);
702     // Start watch.  This inherits the ref from creation.
703     auto watcher =
704         MakeOrphanable<Watcher>(RefCountedPtr<ExternalStateWatcher>(this));
705     watcher_ = watcher.get();
706     channel_->AddConnectivityWatcher(last_observed_state, std::move(watcher));
707     // Start timer.  This takes a second ref.
708     const Duration timeout = deadline - Timestamp::Now();
709     timer_handle_ =
710         channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
711           ApplicationCallbackExecCtx callback_exec_ctx;
712           ExecCtx exec_ctx;
713           self->MaybeStartCompletion(absl::DeadlineExceededError(
714               "Timed out waiting for connection state change"));
715           // ExternalStateWatcher deletion might require an active ExecCtx.
716           self.reset();
717         });
718   }
719 
720  private:
721   class Watcher : public AsyncConnectivityStateWatcherInterface {
722    public:
Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)723     explicit Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)
724         : external_state_watcher_(std::move(external_state_watcher)) {}
725 
OnConnectivityStateChange(grpc_connectivity_state,const absl::Status &)726     void OnConnectivityStateChange(grpc_connectivity_state /*new_state*/,
727                                    const absl::Status& /*status*/) override {
728       external_state_watcher_->MaybeStartCompletion(absl::OkStatus());
729     }
730 
731    private:
732     RefCountedPtr<ExternalStateWatcher> external_state_watcher_;
733   };
734 
735   // This is called both when the watch reports a new connectivity state
736   // and when the timer fires.  It will trigger a CQ notification only
737   // on the first call.  Subsequent calls will be ignored, because
738   // events can come in asynchronously.
MaybeStartCompletion(absl::Status status)739   void MaybeStartCompletion(absl::Status status) {
740     MutexLock lock(&mu_);
741     if (watcher_ == nullptr) return;  // Ignore subsequent notifications.
742     // Cancel watch.
743     channel_->RemoveConnectivityWatcher(watcher_);
744     watcher_ = nullptr;
745     // Cancel timer.
746     channel_->event_engine()->Cancel(timer_handle_);
747     // Send CQ completion.
748     Ref().release();  // Released in FinishedCompletion().
749     grpc_cq_end_op(cq_, tag_, status, FinishedCompletion, this,
750                    &completion_storage_);
751   }
752 
753   // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)754   static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
755     auto* self = static_cast<ExternalStateWatcher*>(arg);
756     self->Unref();
757   }
758 
759   WeakRefCountedPtr<ClientChannel> channel_;
760 
761   Mutex mu_;
762   grpc_completion_queue* cq_ ABSL_GUARDED_BY(&mu_);
763   void* tag_ ABSL_GUARDED_BY(&mu_);
764   grpc_cq_completion completion_storage_ ABSL_GUARDED_BY(&mu_);
765   Watcher* watcher_ ABSL_GUARDED_BY(&mu_) = nullptr;
766   grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_
767       ABSL_GUARDED_BY(&mu_);
768 };
769 
770 }  // namespace
771 
WatchConnectivityState(grpc_connectivity_state state,Timestamp deadline,grpc_completion_queue * cq,void * tag)772 void ClientChannel::WatchConnectivityState(grpc_connectivity_state state,
773                                            Timestamp deadline,
774                                            grpc_completion_queue* cq,
775                                            void* tag) {
776   new ExternalStateWatcher(WeakRefAsSubclass<ClientChannel>(), cq, tag, state,
777                            deadline);
778 }
779 
AddConnectivityWatcher(grpc_connectivity_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface>)780 void ClientChannel::AddConnectivityWatcher(
781     grpc_connectivity_state,
782     OrphanablePtr<AsyncConnectivityStateWatcherInterface>) {
783   Crash("not implemented");
784   // TODO(ctiller): to make this work, need to change WorkSerializer to use
785   // absl::AnyInvocable<> instead of std::function<>
786   //  work_serializer_->Run(
787   //      [self = RefAsSubclass<ClientChannel>(), initial_state,
788   //       watcher = std::move(watcher)]()
789   //            ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) {
790   //        self->state_tracker_.AddWatcher(initial_state, std::move(watcher));
791   //      },
792   //      DEBUG_LOCATION);
793 }
794 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)795 void ClientChannel::RemoveConnectivityWatcher(
796     AsyncConnectivityStateWatcherInterface* watcher) {
797   auto self = RefAsSubclass<ClientChannel>();  // Held by callback.
798   work_serializer_->Run(
799       [self, watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
800         self->state_tracker_.RemoveWatcher(watcher);
801       },
802       DEBUG_LOCATION);
803 }
804 
GetInfo(const grpc_channel_info * info)805 void ClientChannel::GetInfo(const grpc_channel_info* info) {
806   MutexLock lock(&info_mu_);
807   if (info->lb_policy_name != nullptr) {
808     *info->lb_policy_name = gpr_strdup(info_lb_policy_name_.c_str());
809   }
810   if (info->service_config_json != nullptr) {
811     *info->service_config_json = gpr_strdup(info_service_config_json_.c_str());
812   }
813 }
814 
ResetConnectionBackoff()815 void ClientChannel::ResetConnectionBackoff() {
816   auto self = RefAsSubclass<ClientChannel>();
817   work_serializer_->Run(
818       [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
819         if (self->lb_policy_ != nullptr) self->lb_policy_->ResetBackoffLocked();
820       },
821       DEBUG_LOCATION);
822 }
823 
824 namespace {
825 
826 // A class to handle CQ completion for a ping.
827 class PingRequest {
828  public:
PingRequest(grpc_completion_queue * cq,void * tag)829   PingRequest(grpc_completion_queue* cq, void* tag) : cq_(cq), tag_(tag) {
830     grpc_cq_begin_op(cq, tag);
831   }
832 
833   // Triggers CQ completion and eventually deletes the PingRequest object.
Complete(grpc_error_handle error)834   void Complete(grpc_error_handle error) {
835     grpc_cq_end_op(cq_, tag_, error, Destroy, this, &completion_storage_);
836   }
837 
838  private:
Destroy(void * arg,grpc_cq_completion *)839   static void Destroy(void* arg, grpc_cq_completion* /*storage*/) {
840     delete static_cast<PingRequest*>(arg);
841   }
842 
843   grpc_completion_queue* cq_;
844   void* tag_;
845   grpc_cq_completion completion_storage_;
846 };
847 
848 }  // namespace
849 
Ping(grpc_completion_queue *,void *)850 void ClientChannel::Ping(grpc_completion_queue*, void*) {
851   // TODO(ctiller): implement
852   Crash("not implemented");
853 }
854 
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set *,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool)855 grpc_call* ClientChannel::CreateCall(
856     grpc_call* parent_call, uint32_t propagation_mask,
857     grpc_completion_queue* cq, grpc_pollset_set* /*pollset_set_alternative*/,
858     Slice path, absl::optional<Slice> authority, Timestamp deadline, bool) {
859   auto arena = call_arena_allocator()->MakeArena();
860   arena->SetContext<grpc_event_engine::experimental::EventEngine>(
861       event_engine());
862   return MakeClientCall(parent_call, propagation_mask, cq, std::move(path),
863                         std::move(authority), false, deadline,
864                         compression_options(), std::move(arena), Ref());
865 }
866 
StartCall(UnstartedCallHandler unstarted_handler)867 void ClientChannel::StartCall(UnstartedCallHandler unstarted_handler) {
868   // Increment call count.
869   if (idle_timeout_ != Duration::Zero()) idle_state_.IncreaseCallCount();
870   // Exit IDLE if needed.
871   CheckConnectivityState(/*try_to_connect=*/true);
872   // Spawn a promise to wait for the resolver result.
873   // This will eventually start the call.
874   unstarted_handler.SpawnGuardedUntilCallCompletes(
875       "wait-for-name-resolution",
876       [self = RefAsSubclass<ClientChannel>(), unstarted_handler]() mutable {
877         const bool wait_for_ready =
878             unstarted_handler.UnprocessedClientInitialMetadata()
879                 .GetOrCreatePointer(WaitForReady())
880                 ->value;
881         return Map(
882             // Wait for the resolver result.
883             CheckDelayed(self->resolver_data_for_calls_.NextWhen(
884                 [wait_for_ready](
885                     const absl::StatusOr<ResolverDataForCalls> result) {
886                   bool got_result = false;
887                   // If the resolver reports an error but the call is
888                   // wait_for_ready, keep waiting for the next result
889                   // instead of failing the call.
890                   if (!result.ok()) {
891                     got_result = !wait_for_ready;
892                   } else {
893                     // Not an error.  Make sure we actually have a result.
894                     got_result = result->config_selector != nullptr;
895                   }
896                   return got_result;
897                 })),
898             // Handle resolver result.
899             [self, unstarted_handler](
900                 std::tuple<absl::StatusOr<ResolverDataForCalls>, bool>
901                     result_and_delayed) mutable {
902               auto& resolver_data = std::get<0>(result_and_delayed);
903               const bool was_queued = std::get<1>(result_and_delayed);
904               if (!resolver_data.ok()) return resolver_data.status();
905               // Apply service config to call.
906               absl::Status status = self->ApplyServiceConfigToCall(
907                   *resolver_data->config_selector,
908                   unstarted_handler.UnprocessedClientInitialMetadata());
909               if (!status.ok()) return status;
910               // If the call was queued, add trace annotation.
911               if (was_queued) {
912                 auto* call_tracer =
913                     MaybeGetContext<CallTracerAnnotationInterface>();
914                 if (call_tracer != nullptr) {
915                   call_tracer->RecordAnnotation(
916                       "Delayed name resolution complete.");
917                 }
918               }
919               // Start the call on the destination provided by the
920               // resolver.
921               resolver_data->call_destination->StartCall(
922                   std::move(unstarted_handler));
923               return absl::OkStatus();
924             });
925       });
926 }
927 
CreateResolverLocked()928 void ClientChannel::CreateResolverLocked() {
929   GRPC_TRACE_LOG(client_channel, INFO)
930       << "client_channel=" << this << ": starting name resolution for "
931       << uri_to_resolve_;
932   resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
933       uri_to_resolve_, channel_args_, nullptr, work_serializer_,
934       std::make_unique<ResolverResultHandler>(
935           WeakRefAsSubclass<ClientChannel>()));
936   // Since the validity of the args was checked when the channel was created,
937   // CreateResolver() must return a non-null result.
938   CHECK(resolver_ != nullptr);
939   UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
940                     "started resolving");
941   resolver_->StartLocked();
942   GRPC_TRACE_LOG(client_channel, INFO)
943       << "client_channel=" << this << ": created resolver=" << resolver_.get();
944 }
945 
DestroyResolverAndLbPolicyLocked()946 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
947   if (resolver_ != nullptr) {
948     GRPC_TRACE_LOG(client_channel, INFO)
949         << "client_channel=" << this
950         << ": shutting down resolver=" << resolver_.get();
951     resolver_.reset();
952     saved_service_config_.reset();
953     saved_config_selector_.reset();
954     resolver_data_for_calls_.Set(ResolverDataForCalls{nullptr, nullptr});
955     // Clear LB policy if set.
956     if (lb_policy_ != nullptr) {
957       GRPC_TRACE_LOG(client_channel, INFO)
958           << "client_channel=" << this
959           << ": shutting down lb_policy=" << lb_policy_.get();
960       lb_policy_.reset();
961       picker_.Set(MakeRefCounted<LoadBalancingPolicy::DropPicker>(
962           absl::UnavailableError("Channel shutdown")));
963     }
964   }
965 }
966 
TryToConnectLocked()967 void ClientChannel::TryToConnectLocked() {
968   if (disconnect_error_.ok()) {
969     if (lb_policy_ != nullptr) {
970       lb_policy_->ExitIdleLocked();
971     } else if (resolver_ == nullptr) {
972       CreateResolverLocked();
973     }
974   }
975 }
976 
977 namespace {
978 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)979 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
980     const Resolver::Result& resolver_result,
981     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
982   // Prefer the LB policy config found in the service config.
983   if (parsed_service_config->parsed_lb_config() != nullptr) {
984     return parsed_service_config->parsed_lb_config();
985   }
986   // Try the deprecated LB policy name from the service config.
987   // If not, try the setting from channel args.
988   absl::optional<absl::string_view> policy_name;
989   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
990     policy_name = parsed_service_config->parsed_deprecated_lb_policy();
991   } else {
992     policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
993     bool requires_config = false;
994     if (policy_name.has_value() &&
995         (!CoreConfiguration::Get()
996               .lb_policy_registry()
997               .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
998          requires_config)) {
999       if (requires_config) {
1000         LOG(ERROR) << "LB policy: " << *policy_name
1001                    << " passed through channel_args must not "
1002                       "require a config. Using pick_first instead.";
1003       } else {
1004         LOG(ERROR) << "LB policy: " << *policy_name
1005                    << " passed through channel_args does not exist. "
1006                       "Using pick_first instead.";
1007       }
1008       policy_name = "pick_first";
1009     }
1010   }
1011   // Use pick_first if nothing was specified and we didn't select grpclb
1012   // above.
1013   if (!policy_name.has_value()) policy_name = "pick_first";
1014   // Now that we have the policy name, construct an empty config for it.
1015   Json config_json = Json::FromArray({Json::FromObject({
1016       {std::string(*policy_name), Json::FromObject({})},
1017   })});
1018   auto lb_policy_config =
1019       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1020           config_json);
1021   // The policy name came from one of three places:
1022   // - The deprecated loadBalancingPolicy field in the service config,
1023   //   in which case the code in ClientChannelServiceConfigParser
1024   //   already verified that the policy does not require a config.
1025   // - One of the hard-coded values here, all of which are known to not
1026   //   require a config.
1027   // - A channel arg, in which case we check that the specified policy exists
1028   //   and accepts an empty config. If not, we revert to using pick_first
1029   //   lb_policy
1030   CHECK_OK(lb_policy_config);
1031   return std::move(*lb_policy_config);
1032 }
1033 
1034 }  // namespace
1035 
OnResolverResultChangedLocked(Resolver::Result result)1036 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1037   // Handle race conditions.
1038   if (resolver_ == nullptr) return;
1039   GRPC_TRACE_LOG(client_channel, INFO)
1040       << "client_channel=" << this << ": got resolver result";
1041   // Grab resolver result health callback.
1042   auto resolver_callback = std::move(result.result_health_callback);
1043   absl::Status resolver_result_status;
1044   // We only want to trace the address resolution in the follow cases:
1045   // (a) Address resolution resulted in service config change.
1046   // (b) Address resolution that causes number of backends to go from
1047   //     zero to non-zero.
1048   // (c) Address resolution that causes number of backends to go from
1049   //     non-zero to zero.
1050   // (d) Address resolution that causes a new LB policy to be created.
1051   //
1052   // We track a list of strings to eventually be concatenated and traced.
1053   std::vector<const char*> trace_strings;
1054   const bool resolution_contains_addresses =
1055       result.addresses.ok() && !result.addresses->empty();
1056   if (!resolution_contains_addresses &&
1057       previous_resolution_contained_addresses_) {
1058     trace_strings.push_back("Address list became empty");
1059   } else if (resolution_contains_addresses &&
1060              !previous_resolution_contained_addresses_) {
1061     trace_strings.push_back("Address list became non-empty");
1062   }
1063   previous_resolution_contained_addresses_ = resolution_contains_addresses;
1064   std::string service_config_error_string_storage;
1065   if (!result.service_config.ok()) {
1066     service_config_error_string_storage =
1067         result.service_config.status().ToString();
1068     trace_strings.push_back(service_config_error_string_storage.c_str());
1069   }
1070   // Choose the service config.
1071   RefCountedPtr<ServiceConfig> service_config;
1072   RefCountedPtr<ConfigSelector> config_selector;
1073   if (!result.service_config.ok()) {
1074     GRPC_TRACE_LOG(client_channel, INFO)
1075         << "client_channel=" << this
1076         << ": resolver returned service config error: "
1077         << result.service_config.status();
1078     // If the service config was invalid, then fallback to the
1079     // previously returned service config, if any.
1080     if (saved_service_config_ != nullptr) {
1081       GRPC_TRACE_LOG(client_channel, INFO)
1082           << "client_channel=" << this
1083           << ": resolver returned invalid service config; "
1084              "continuing to use previous service config";
1085       service_config = saved_service_config_;
1086       config_selector = saved_config_selector_;
1087     } else {
1088       // We received a service config error and we don't have a
1089       // previous service config to fall back to.  Put the channel into
1090       // TRANSIENT_FAILURE.
1091       OnResolverErrorLocked(result.service_config.status());
1092       trace_strings.push_back("no valid service config");
1093       resolver_result_status =
1094           absl::UnavailableError("no valid service config");
1095     }
1096   } else if (*result.service_config == nullptr) {
1097     // Resolver did not return any service config.
1098     GRPC_TRACE_LOG(client_channel, INFO)
1099         << "client_channel=" << this
1100         << ": resolver returned no service config; using default service "
1101            "config for channel";
1102     service_config = default_service_config_;
1103   } else {
1104     // Use ServiceConfig and ConfigSelector returned by resolver.
1105     service_config = std::move(*result.service_config);
1106     config_selector = result.args.GetObjectRef<ConfigSelector>();
1107   }
1108   // Remove the config selector from channel args so that we're not holding
1109   // unnecessary refs that cause it to be destroyed somewhere other than in
1110   // the WorkSerializer.
1111   result.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1112   // Note: The only case in which service_config is null here is if the
1113   // resolver returned a service config error and we don't have a previous
1114   // service config to fall back to.
1115   if (service_config != nullptr) {
1116     // Extract global config for client channel.
1117     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1118         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1119             service_config->GetGlobalParsedConfig(
1120                 service_config_parser_index_));
1121     // Choose LB policy config.
1122     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1123         ChooseLbPolicy(result, parsed_service_config);
1124     // Check if the ServiceConfig has changed.
1125     const bool service_config_changed =
1126         saved_service_config_ == nullptr ||
1127         service_config->json_string() != saved_service_config_->json_string();
1128     // Check if the ConfigSelector has changed.
1129     const bool config_selector_changed = !ConfigSelector::Equals(
1130         saved_config_selector_.get(), config_selector.get());
1131     // If either has changed, apply the global parameters now.
1132     if (service_config_changed || config_selector_changed) {
1133       // Update service config in control plane.
1134       UpdateServiceConfigInControlPlaneLocked(
1135           std::move(service_config), std::move(config_selector),
1136           std::string(lb_policy_config->name()));
1137       // TODO(ncteisen): might be worth somehow including a snippet of the
1138       // config in the trace, at the risk of bloating the trace logs.
1139       trace_strings.push_back("Service config changed");
1140     } else {
1141       GRPC_TRACE_LOG(client_channel, INFO)
1142           << "client_channel=" << this << ": service config not changed";
1143     }
1144     // Create or update LB policy, as needed.
1145     ChannelArgs new_args = result.args;
1146     resolver_result_status = CreateOrUpdateLbPolicyLocked(
1147         std::move(lb_policy_config),
1148         parsed_service_config->health_check_service_name(), std::move(result));
1149     // Start using new service config for calls.
1150     // This needs to happen after the LB policy has been updated, since
1151     // the ConfigSelector may need the LB policy to know about new
1152     // destinations before it can send RPCs to those destinations.
1153     if (service_config_changed || config_selector_changed) {
1154       UpdateServiceConfigInDataPlaneLocked(new_args);
1155     }
1156   }
1157   // Invoke resolver callback if needed.
1158   if (resolver_callback != nullptr) {
1159     resolver_callback(std::move(resolver_result_status));
1160   }
1161   // Add channel trace event.
1162   if (!trace_strings.empty()) {
1163     std::string message =
1164         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1165     if (channelz_node_ != nullptr) {
1166       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1167                                     grpc_slice_from_cpp_string(message));
1168     }
1169   }
1170 }
1171 
OnResolverErrorLocked(absl::Status status)1172 void ClientChannel::OnResolverErrorLocked(absl::Status status) {
1173   if (resolver_ == nullptr) return;
1174   GRPC_TRACE_LOG(client_channel, INFO)
1175       << "client_channel=" << this
1176       << ": resolver transient failure: " << status;
1177   // If we already have an LB policy from a previous resolution
1178   // result, then we continue to let it set the connectivity state.
1179   // Otherwise, we go into TRANSIENT_FAILURE.
1180   if (lb_policy_ == nullptr) {
1181     // Update connectivity state.
1182     UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1183                       "resolver failure");
1184     // Send updated resolver result.
1185     resolver_data_for_calls_.Set(
1186         MaybeRewriteIllegalStatusCode(status, "resolver"));
1187   }
1188 }
1189 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1190 absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
1191     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1192     const absl::optional<std::string>& health_check_service_name,
1193     Resolver::Result result) {
1194   // Construct update.
1195   LoadBalancingPolicy::UpdateArgs update_args;
1196   if (!result.addresses.ok()) {
1197     update_args.addresses = result.addresses.status();
1198   } else {
1199     update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1200         std::move(*result.addresses));
1201   }
1202   update_args.config = std::move(lb_policy_config);
1203   update_args.resolution_note = std::move(result.resolution_note);
1204   update_args.args = std::move(result.args);
1205   // Add health check service name to channel args.
1206   if (health_check_service_name.has_value()) {
1207     update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1208                                             *health_check_service_name);
1209   }
1210   // Create policy if needed.
1211   if (lb_policy_ == nullptr) {
1212     lb_policy_ = CreateLbPolicyLocked(update_args.args);
1213   }
1214   // Update the policy.
1215   GRPC_TRACE_LOG(client_channel, INFO)
1216       << "client_channel=" << this << ": Updating child policy "
1217       << lb_policy_.get();
1218   return lb_policy_->UpdateLocked(std::move(update_args));
1219 }
1220 
1221 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1222 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1223     const ChannelArgs& args) {
1224   // The LB policy will start in state CONNECTING but will not
1225   // necessarily send us an update synchronously, so set state to
1226   // CONNECTING (in case the resolver had previously failed and put the
1227   // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1228   UpdateStateAndPickerLocked(
1229       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1230       MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1231   // Now create the LB policy.
1232   LoadBalancingPolicy::Args lb_policy_args;
1233   lb_policy_args.work_serializer = work_serializer_;
1234   lb_policy_args.channel_control_helper =
1235       std::make_unique<ClientChannelControlHelper>(
1236           WeakRefAsSubclass<ClientChannel>());
1237   lb_policy_args.args = args;
1238   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1239       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1240                                          &client_channel_trace);
1241   GRPC_TRACE_LOG(client_channel, INFO)
1242       << "client_channel=" << this << ": created new LB policy "
1243       << lb_policy.get();
1244   return lb_policy;
1245 }
1246 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1247 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1248     RefCountedPtr<ServiceConfig> service_config,
1249     RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1250   std::string service_config_json(service_config->json_string());
1251   // Update service config.
1252   GRPC_TRACE_LOG(client_channel, INFO)
1253       << "client_channel=" << this << ": using service config: \""
1254       << service_config_json << "\"";
1255   saved_service_config_ = std::move(service_config);
1256   // Update config selector.
1257   GRPC_TRACE_LOG(client_channel, INFO)
1258       << "client_channel=" << this << ": using ConfigSelector "
1259       << config_selector.get();
1260   saved_config_selector_ = std::move(config_selector);
1261   // Update the data used by GetChannelInfo().
1262   {
1263     MutexLock lock(&info_mu_);
1264     info_lb_policy_name_ = std::move(lb_policy_name);
1265     info_service_config_json_ = std::move(service_config_json);
1266   }
1267 }
1268 
UpdateServiceConfigInDataPlaneLocked(const ChannelArgs & args)1269 void ClientChannel::UpdateServiceConfigInDataPlaneLocked(
1270     const ChannelArgs& args) {
1271   GRPC_TRACE_LOG(client_channel, INFO)
1272       << "client_channel=" << this << ": switching to ConfigSelector "
1273       << saved_config_selector_.get();
1274   // Use default config selector if resolver didn't supply one.
1275   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1276   if (config_selector == nullptr) {
1277     config_selector =
1278         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1279   }
1280   // Modify channel args.
1281   ChannelArgs new_args = args.SetObject(this).SetObject(saved_service_config_);
1282   // Construct filter stack.
1283   auto new_blackboard = MakeRefCounted<Blackboard>();
1284   InterceptionChainBuilder builder(new_args, blackboard_.get(),
1285                                    new_blackboard.get());
1286   if (idle_timeout_ != Duration::Zero()) {
1287     builder.AddOnServerTrailingMetadata([this](ServerMetadata&) {
1288       if (idle_state_.DecreaseCallCount()) StartIdleTimer();
1289     });
1290   }
1291   CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
1292       GRPC_CLIENT_CHANNEL, builder);
1293   // Add filters returned by the config selector (e.g., xDS HTTP filters).
1294   config_selector->AddFilters(builder);
1295   const bool enable_retries =
1296       !channel_args_.WantMinimalStack() &&
1297       channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1298   if (enable_retries) {
1299     builder.Add<RetryInterceptor>();
1300   }
1301   // Create call destination.
1302   auto top_of_stack_call_destination = builder.Build(call_destination_);
1303   blackboard_ = std::move(new_blackboard);
1304   // Send result to data plane.
1305   if (!top_of_stack_call_destination.ok()) {
1306     resolver_data_for_calls_.Set(MaybeRewriteIllegalStatusCode(
1307         top_of_stack_call_destination.status(), "channel construction"));
1308   } else {
1309     resolver_data_for_calls_.Set(ResolverDataForCalls{
1310         std::move(config_selector), std::move(*top_of_stack_call_destination)});
1311   }
1312 }
1313 
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1314 void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
1315                                       const absl::Status& status,
1316                                       const char* reason) {
1317   if (state != GRPC_CHANNEL_SHUTDOWN &&
1318       state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1319     Crash("Illegal transition SHUTDOWN -> anything");
1320   }
1321   state_tracker_.SetState(state, status, reason);
1322   if (channelz_node_ != nullptr) {
1323     channelz_node_->SetConnectivityState(state);
1324     channelz_node_->AddTraceEvent(
1325         channelz::ChannelTrace::Severity::Info,
1326         grpc_slice_from_static_string(
1327             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1328                 state)));
1329   }
1330 }
1331 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1332 void ClientChannel::UpdateStateAndPickerLocked(
1333     grpc_connectivity_state state, const absl::Status& status,
1334     const char* reason,
1335     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1336   UpdateStateLocked(state, status, reason);
1337   picker_.Set(std::move(picker));
1338 }
1339 
StartIdleTimer()1340 void ClientChannel::StartIdleTimer() {
1341   GRPC_TRACE_LOG(client_channel, INFO)
1342       << "client_channel=" << this << ": idle timer started";
1343   auto self = WeakRefAsSubclass<ClientChannel>();
1344   auto promise = Loop([self]() {
1345     return TrySeq(Sleep(Timestamp::Now() + self->idle_timeout_),
1346                   [self]() -> Poll<LoopCtl<absl::Status>> {
1347                     if (self->idle_state_.CheckTimer()) {
1348                       return Continue{};
1349                     } else {
1350                       return absl::OkStatus();
1351                     }
1352                   });
1353   });
1354   auto arena = SimpleArenaAllocator(0)->MakeArena();
1355   arena->SetContext<grpc_event_engine::experimental::EventEngine>(
1356       event_engine());
1357   idle_activity_.Set(MakeActivity(
1358       std::move(promise), ExecCtxWakeupScheduler{},
1359       [self = std::move(self)](absl::Status status) mutable {
1360         if (status.ok()) {
1361           self->work_serializer_->Run(
1362               [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
1363                 self->DestroyResolverAndLbPolicyLocked();
1364                 self->UpdateStateAndPickerLocked(
1365                     GRPC_CHANNEL_IDLE, absl::OkStatus(),
1366                     "channel entering IDLE", nullptr);
1367                 // TODO(roth): In case there's a race condition, we
1368                 // might need to check for any calls that are
1369                 // queued waiting for a resolver result or an LB
1370                 // pick.
1371               },
1372               DEBUG_LOCATION);
1373         }
1374       },
1375       std::move(arena)));
1376 }
1377 
ApplyServiceConfigToCall(ConfigSelector & config_selector,ClientMetadata & client_initial_metadata) const1378 absl::Status ClientChannel::ApplyServiceConfigToCall(
1379     ConfigSelector& config_selector,
1380     ClientMetadata& client_initial_metadata) const {
1381   GRPC_TRACE_LOG(client_channel_call, INFO)
1382       << "client_channel=" << this << ": " << GetContext<Activity>()->DebugTag()
1383       << " service config to call";
1384   // Create a ClientChannelServiceConfigCallData for the call.  This stores
1385   // a ref to the ServiceConfig and caches the right set of parsed configs
1386   // to use for the call.  The ClientChannelServiceConfigCallData will store
1387   // itself in the call context, so that it can be accessed by filters
1388   // below us in the stack, and it will be cleaned up when the call ends.
1389   auto* service_config_call_data =
1390       GetContext<Arena>()->New<ClientChannelServiceConfigCallData>(
1391           GetContext<Arena>());
1392   // Use the ConfigSelector to determine the config for the call.
1393   absl::Status call_config_status = config_selector.GetCallConfig(
1394       {&client_initial_metadata, GetContext<Arena>(),
1395        service_config_call_data});
1396   if (!call_config_status.ok()) {
1397     return MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector");
1398   }
1399   // Apply our own method params to the call.
1400   auto* method_params = DownCast<ClientChannelMethodParsedConfig*>(
1401       service_config_call_data->GetMethodParsedConfig(
1402           service_config_parser_index_));
1403   if (method_params != nullptr) {
1404     // If the service config specifies a deadline, update the call's
1405     // deadline timer.
1406     if (method_params->timeout() != Duration::Zero()) {
1407       Call* call = GetContext<Call>();
1408       const Timestamp per_method_deadline =
1409           Timestamp::FromCycleCounterRoundUp(call->start_time()) +
1410           method_params->timeout();
1411       call->UpdateDeadline(per_method_deadline);
1412     }
1413     // If the service config set wait_for_ready and the application
1414     // did not explicitly set it, use the value from the service config.
1415     auto* wait_for_ready =
1416         client_initial_metadata.GetOrCreatePointer(WaitForReady());
1417     if (method_params->wait_for_ready().has_value() &&
1418         !wait_for_ready->explicitly_set) {
1419       wait_for_ready->value = method_params->wait_for_ready().value();
1420     }
1421   }
1422   return absl::OkStatus();
1423 }
1424 
1425 }  // namespace grpc_core
1426