1 // Copyright 2015 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/client_channel/client_channel.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/impl/channel_arg_names.h>
19 #include <grpc/slice.h>
20 #include <grpc/status.h>
21 #include <grpc/support/json.h>
22 #include <grpc/support/metrics.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/string_util.h>
25 #include <grpc/support/time.h>
26 #include <inttypes.h>
27 #include <limits.h>
28
29 #include <algorithm>
30 #include <functional>
31 #include <new>
32 #include <set>
33 #include <type_traits>
34 #include <utility>
35 #include <vector>
36
37 #include "absl/cleanup/cleanup.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/status/statusor.h"
41 #include "absl/strings/cord.h"
42 #include "absl/strings/numbers.h"
43 #include "absl/strings/str_cat.h"
44 #include "absl/strings/str_join.h"
45 #include "absl/strings/string_view.h"
46 #include "absl/types/optional.h"
47 #include "absl/types/variant.h"
48 #include "src/core/client_channel/client_channel_internal.h"
49 #include "src/core/client_channel/client_channel_service_config.h"
50 #include "src/core/client_channel/config_selector.h"
51 #include "src/core/client_channel/dynamic_filters.h"
52 #include "src/core/client_channel/global_subchannel_pool.h"
53 #include "src/core/client_channel/local_subchannel_pool.h"
54 #include "src/core/client_channel/retry_interceptor.h"
55 #include "src/core/client_channel/subchannel.h"
56 #include "src/core/client_channel/subchannel_interface_internal.h"
57 #include "src/core/config/core_configuration.h"
58 #include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
59 #include "src/core/lib/address_utils/sockaddr_utils.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/status_util.h"
62 #include "src/core/lib/debug/trace.h"
63 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
64 #include "src/core/lib/iomgr/resolved_address.h"
65 #include "src/core/lib/promise/context.h"
66 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
67 #include "src/core/lib/promise/loop.h"
68 #include "src/core/lib/promise/map.h"
69 #include "src/core/lib/promise/poll.h"
70 #include "src/core/lib/promise/sleep.h"
71 #include "src/core/lib/promise/try_seq.h"
72 #include "src/core/lib/resource_quota/arena.h"
73 #include "src/core/lib/security/credentials/credentials.h"
74 #include "src/core/lib/slice/slice.h"
75 #include "src/core/lib/slice/slice_internal.h"
76 #include "src/core/lib/surface/call.h"
77 #include "src/core/lib/surface/channel.h"
78 #include "src/core/lib/surface/client_call.h"
79 #include "src/core/lib/surface/completion_queue.h"
80 #include "src/core/lib/transport/call_spine.h"
81 #include "src/core/lib/transport/connectivity_state.h"
82 #include "src/core/lib/transport/metadata_batch.h"
83 #include "src/core/load_balancing/child_policy_handler.h"
84 #include "src/core/load_balancing/lb_policy.h"
85 #include "src/core/load_balancing/lb_policy_registry.h"
86 #include "src/core/load_balancing/subchannel_interface.h"
87 #include "src/core/resolver/endpoint_addresses.h"
88 #include "src/core/resolver/resolver_registry.h"
89 #include "src/core/service_config/service_config_impl.h"
90 #include "src/core/telemetry/metrics.h"
91 #include "src/core/util/crash.h"
92 #include "src/core/util/debug_location.h"
93 #include "src/core/util/json/json.h"
94 #include "src/core/util/sync.h"
95 #include "src/core/util/useful.h"
96 #include "src/core/util/work_serializer.h"
97
98 namespace grpc_core {
99
100 using grpc_event_engine::experimental::EventEngine;
101
102 using internal::ClientChannelMethodParsedConfig;
103
104 //
105 // ClientChannel::ResolverResultHandler
106 //
107
108 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
109 public:
ResolverResultHandler(WeakRefCountedPtr<ClientChannel> client_channel)110 explicit ResolverResultHandler(
111 WeakRefCountedPtr<ClientChannel> client_channel)
112 : client_channel_(std::move(client_channel)) {}
113
~ResolverResultHandler()114 ~ResolverResultHandler() override {
115 GRPC_TRACE_LOG(client_channel, INFO)
116 << "client_channel=" << client_channel_.get()
117 << ": resolver shutdown complete";
118 }
119
ReportResult(Resolver::Result result)120 void ReportResult(Resolver::Result result) override
121 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
122 client_channel_->OnResolverResultChangedLocked(std::move(result));
123 }
124
125 private:
126 WeakRefCountedPtr<ClientChannel> client_channel_;
127 };
128
129 //
130 // ClientChannel::SubchannelWrapper
131 //
132
133 // This class is a wrapper for Subchannel that hides details of the
134 // channel's implementation (such as the connected subchannel) from the
135 // LB policy API.
136 //
137 // Note that no synchronization is needed here, because even if the
138 // underlying subchannel is shared between channels, this wrapper will only
139 // be used within one channel, so it will always be synchronized by the
140 // control plane work_serializer.
141 class ClientChannel::SubchannelWrapper
142 : public SubchannelInterfaceWithCallDestination {
143 public:
144 SubchannelWrapper(WeakRefCountedPtr<ClientChannel> client_channel,
145 RefCountedPtr<Subchannel> subchannel);
146 ~SubchannelWrapper() override;
147
148 void Orphaned() override;
149 void WatchConnectivityState(
150 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
151 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
152 void CancelConnectivityStateWatch(
153 ConnectivityStateWatcherInterface* watcher) override
154 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
155
call_destination()156 RefCountedPtr<UnstartedCallDestination> call_destination() override {
157 return subchannel_->call_destination();
158 }
159
RequestConnection()160 void RequestConnection() override { subchannel_->RequestConnection(); }
161
ResetBackoff()162 void ResetBackoff() override { subchannel_->ResetBackoff(); }
163
164 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
165 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
166 void CancelDataWatcher(DataWatcherInterface* watcher) override
167 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_);
168 void ThrottleKeepaliveTime(int new_keepalive_time);
address() const169 std::string address() const override { return subchannel_->address(); }
170
171 private:
172 class WatcherWrapper;
173
174 // A heterogenous lookup comparator for data watchers that allows
175 // unique_ptr keys to be looked up as raw pointers.
176 struct DataWatcherLessThan {
177 using is_transparent = void;
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan178 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
179 const std::unique_ptr<DataWatcherInterface>& p2) const {
180 return p1 < p2;
181 }
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan182 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
183 const DataWatcherInterface* p2) const {
184 return p1.get() < p2;
185 }
operator ()grpc_core::ClientChannel::SubchannelWrapper::DataWatcherLessThan186 bool operator()(const DataWatcherInterface* p1,
187 const std::unique_ptr<DataWatcherInterface>& p2) const {
188 return p1 < p2.get();
189 }
190 };
191
192 WeakRefCountedPtr<ClientChannel> client_channel_;
193 RefCountedPtr<Subchannel> subchannel_;
194 // Maps from the address of the watcher passed to us by the LB policy
195 // to the address of the WrapperWatcher that we passed to the underlying
196 // subchannel. This is needed so that when the LB policy calls
197 // CancelConnectivityStateWatch() with its watcher, we know the
198 // corresponding WrapperWatcher to cancel on the underlying subchannel.
199 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
200 ABSL_GUARDED_BY(*client_channel_->work_serializer_);
201 std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
202 data_watchers_ ABSL_GUARDED_BY(*client_channel_->work_serializer_);
203 };
204
205 // This wrapper provides a bridge between the internal Subchannel API
206 // and the SubchannelInterface API that we expose to LB policies.
207 // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
208 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
209 // that was passed in by the LB policy. We pass an instance of this
210 // class to the underlying Subchannel, and when we get updates from
211 // the subchannel, we pass those on to the wrapped watcher to return
212 // the update to the LB policy.
213 //
214 // This class handles things like hopping into the WorkSerializer
215 // before passing notifications to the LB policy and propagating
216 // keepalive information between subchannels.
217 class ClientChannel::SubchannelWrapper::WatcherWrapper
218 : public Subchannel::ConnectivityStateWatcherInterface {
219 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> subchannel_wrapper)220 WatcherWrapper(
221 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
222 watcher,
223 RefCountedPtr<SubchannelWrapper> subchannel_wrapper)
224 : watcher_(std::move(watcher)),
225 subchannel_wrapper_(std::move(subchannel_wrapper)) {}
226
~WatcherWrapper()227 ~WatcherWrapper() override {
228 subchannel_wrapper_.reset(DEBUG_LOCATION, "WatcherWrapper");
229 }
230
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)231 void OnConnectivityStateChange(
232 RefCountedPtr<ConnectivityStateWatcherInterface> self,
233 grpc_connectivity_state state, const absl::Status& status) override {
234 GRPC_TRACE_LOG(client_channel, INFO)
235 << "client_channel=" << subchannel_wrapper_->client_channel_.get()
236 << ": connectivity change for subchannel wrapper "
237 << subchannel_wrapper_.get() << " subchannel "
238 << subchannel_wrapper_->subchannel_.get()
239 << "; hopping into work_serializer";
240 self.release(); // Held by callback.
241 subchannel_wrapper_->client_channel_->work_serializer_->Run(
242 [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
243 *subchannel_wrapper_->client_channel_->work_serializer_) {
244 ApplyUpdateInControlPlaneWorkSerializer(state, status);
245 Unref();
246 },
247 DEBUG_LOCATION);
248 }
249
interested_parties()250 grpc_pollset_set* interested_parties() override { return nullptr; }
251
252 private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)253 void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
254 const absl::Status& status)
255 ABSL_EXCLUSIVE_LOCKS_REQUIRED(
256 *subchannel_wrapper_->client_channel_->work_serializer_) {
257 GRPC_TRACE_LOG(client_channel, INFO)
258 << "client_channel=" << subchannel_wrapper_->client_channel_.get()
259 << ": processing connectivity change in work serializer for subchannel "
260 "wrapper "
261 << subchannel_wrapper_.get() << " subchannel "
262 << subchannel_wrapper_->subchannel_.get()
263 << " watcher=" << watcher_.get()
264 << " state=" << ConnectivityStateName(state) << " status=" << status;
265 absl::optional<absl::Cord> keepalive_throttling =
266 status.GetPayload(kKeepaliveThrottlingKey);
267 if (keepalive_throttling.has_value()) {
268 int new_keepalive_time = -1;
269 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
270 &new_keepalive_time)) {
271 if (new_keepalive_time >
272 subchannel_wrapper_->client_channel_->keepalive_time_) {
273 subchannel_wrapper_->client_channel_->keepalive_time_ =
274 new_keepalive_time;
275 GRPC_TRACE_LOG(client_channel, INFO)
276 << "client_channel=" << subchannel_wrapper_->client_channel_.get()
277 << ": throttling keepalive time to "
278 << subchannel_wrapper_->client_channel_->keepalive_time_;
279 // Propagate the new keepalive time to all subchannels. This is so
280 // that new transports created by any subchannel (and not just the
281 // subchannel that received the GOAWAY), use the new keepalive time.
282 for (auto* subchannel_wrapper :
283 subchannel_wrapper_->client_channel_->subchannel_wrappers_) {
284 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
285 }
286 }
287 } else {
288 LOG(ERROR) << "client_channel="
289 << subchannel_wrapper_->client_channel_.get()
290 << ": Illegal keepalive throttling value "
291 << std::string(keepalive_throttling.value());
292 }
293 }
294 // Propagate status only in state TF.
295 // We specifically want to avoid propagating the status for
296 // state IDLE that the real subchannel gave us only for the
297 // purpose of keepalive propagation.
298 watcher_->OnConnectivityStateChange(
299 state,
300 state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
301 }
302
303 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
304 watcher_;
305 RefCountedPtr<SubchannelWrapper> subchannel_wrapper_;
306 };
307
SubchannelWrapper(WeakRefCountedPtr<ClientChannel> client_channel,RefCountedPtr<Subchannel> subchannel)308 ClientChannel::SubchannelWrapper::SubchannelWrapper(
309 WeakRefCountedPtr<ClientChannel> client_channel,
310 RefCountedPtr<Subchannel> subchannel)
311 : SubchannelInterfaceWithCallDestination(
312 GRPC_TRACE_FLAG_ENABLED(client_channel) ? "SubchannelWrapper"
313 : nullptr),
314 client_channel_(std::move(client_channel)),
315 subchannel_(std::move(subchannel)) {
316 GRPC_TRACE_LOG(client_channel, INFO)
317 << "client_channel=" << client_channel_.get()
318 << ": creating subchannel wrapper " << this << " for subchannel "
319 << subchannel_.get();
320 #ifndef NDEBUG
321 DCHECK(client_channel_->work_serializer_->RunningInWorkSerializer());
322 #endif
323 if (client_channel_->channelz_node_ != nullptr) {
324 auto* subchannel_node = subchannel_->channelz_node();
325 if (subchannel_node != nullptr) {
326 auto it =
327 client_channel_->subchannel_refcount_map_.find(subchannel_.get());
328 if (it == client_channel_->subchannel_refcount_map_.end()) {
329 client_channel_->channelz_node_->AddChildSubchannel(
330 subchannel_node->uuid());
331 it = client_channel_->subchannel_refcount_map_
332 .emplace(subchannel_.get(), 0)
333 .first;
334 }
335 ++it->second;
336 }
337 }
338 client_channel_->subchannel_wrappers_.insert(this);
339 }
340
~SubchannelWrapper()341 ClientChannel::SubchannelWrapper::~SubchannelWrapper() {
342 GRPC_TRACE_LOG(client_channel, INFO)
343 << "client_channel=" << client_channel_.get()
344 << ": destroying subchannel wrapper " << this << " for subchannel "
345 << subchannel_.get();
346 }
347
Orphaned()348 void ClientChannel::SubchannelWrapper::Orphaned() {
349 // Make sure we clean up the channel's subchannel maps inside the
350 // WorkSerializer.
351 auto self = WeakRefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION,
352 "subchannel map cleanup");
353 client_channel_->work_serializer_->Run(
354 [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
355 *self->client_channel_->work_serializer_) {
356 self->client_channel_->subchannel_wrappers_.erase(self.get());
357 if (self->client_channel_->channelz_node_ != nullptr) {
358 auto* subchannel_node = self->subchannel_->channelz_node();
359 if (subchannel_node != nullptr) {
360 auto it = self->client_channel_->subchannel_refcount_map_.find(
361 self->subchannel_.get());
362 CHECK(it != self->client_channel_->subchannel_refcount_map_.end());
363 --it->second;
364 if (it->second == 0) {
365 self->client_channel_->channelz_node_->RemoveChildSubchannel(
366 subchannel_node->uuid());
367 self->client_channel_->subchannel_refcount_map_.erase(it);
368 }
369 }
370 }
371 },
372 DEBUG_LOCATION);
373 }
374
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)375 void ClientChannel::SubchannelWrapper::WatchConnectivityState(
376 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) {
377 auto& watcher_wrapper = watcher_map_[watcher.get()];
378 CHECK(watcher_wrapper == nullptr);
379 watcher_wrapper = new WatcherWrapper(
380 std::move(watcher),
381 RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
382 subchannel_->WatchConnectivityState(
383 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
384 watcher_wrapper));
385 }
386
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)387 void ClientChannel::SubchannelWrapper::CancelConnectivityStateWatch(
388 ConnectivityStateWatcherInterface* watcher) {
389 auto it = watcher_map_.find(watcher);
390 CHECK(it != watcher_map_.end());
391 subchannel_->CancelConnectivityStateWatch(it->second);
392 watcher_map_.erase(it);
393 }
394
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)395 void ClientChannel::SubchannelWrapper::AddDataWatcher(
396 std::unique_ptr<DataWatcherInterface> watcher) {
397 static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
398 ->SetSubchannel(subchannel_.get());
399 CHECK(data_watchers_.insert(std::move(watcher)).second);
400 }
401
CancelDataWatcher(DataWatcherInterface * watcher)402 void ClientChannel::SubchannelWrapper::CancelDataWatcher(
403 DataWatcherInterface* watcher) {
404 auto it = data_watchers_.find(watcher);
405 if (it != data_watchers_.end()) data_watchers_.erase(it);
406 }
407
ThrottleKeepaliveTime(int new_keepalive_time)408 void ClientChannel::SubchannelWrapper::ThrottleKeepaliveTime(
409 int new_keepalive_time) {
410 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
411 }
412
413 //
414 // ClientChannel::ClientChannelControlHelper
415 //
416
417 class ClientChannel::ClientChannelControlHelper
418 : public LoadBalancingPolicy::ChannelControlHelper {
419 public:
ClientChannelControlHelper(WeakRefCountedPtr<ClientChannel> client_channel)420 explicit ClientChannelControlHelper(
421 WeakRefCountedPtr<ClientChannel> client_channel)
422 : client_channel_(std::move(client_channel)) {}
423
~ClientChannelControlHelper()424 ~ClientChannelControlHelper() override {
425 client_channel_.reset(DEBUG_LOCATION, "ClientChannelControlHelper");
426 }
427
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)428 RefCountedPtr<SubchannelInterface> CreateSubchannel(
429 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
430 const ChannelArgs& args) override
431 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
432 // If shutting down, do nothing.
433 if (client_channel_->resolver_ == nullptr) return nullptr;
434 ChannelArgs subchannel_args = Subchannel::MakeSubchannelArgs(
435 args, per_address_args, client_channel_->subchannel_pool_,
436 client_channel_->default_authority_);
437 // Create subchannel.
438 RefCountedPtr<Subchannel> subchannel =
439 client_channel_->client_channel_factory_->CreateSubchannel(
440 address, subchannel_args);
441 if (subchannel == nullptr) return nullptr;
442 // Make sure the subchannel has updated keepalive time.
443 subchannel->ThrottleKeepaliveTime(client_channel_->keepalive_time_);
444 // Create and return wrapper for the subchannel.
445 return MakeRefCounted<SubchannelWrapper>(client_channel_,
446 std::move(subchannel));
447 }
448
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)449 void UpdateState(
450 grpc_connectivity_state state, const absl::Status& status,
451 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) override
452 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
453 if (client_channel_->resolver_ == nullptr) return; // Shutting down.
454 if (GRPC_TRACE_FLAG_ENABLED(client_channel)) {
455 const char* extra = client_channel_->disconnect_error_.ok()
456 ? ""
457 : " (ignoring -- channel shutting down)";
458 LOG(INFO) << "client_channel=" << client_channel_.get()
459 << ": update: state=" << ConnectivityStateName(state)
460 << " status=(" << status << ") picker=" << picker.get()
461 << extra;
462 }
463 // Do update only if not shutting down.
464 if (client_channel_->disconnect_error_.ok()) {
465 client_channel_->UpdateStateAndPickerLocked(state, status, "helper",
466 std::move(picker));
467 }
468 }
469
RequestReresolution()470 void RequestReresolution() override
471 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
472 if (client_channel_->resolver_ == nullptr) return; // Shutting down.
473 GRPC_TRACE_LOG(client_channel, INFO)
474 << "client_channel=" << client_channel_.get()
475 << ": started name re-resolving";
476 client_channel_->resolver_->RequestReresolutionLocked();
477 }
478
GetTarget()479 absl::string_view GetTarget() override { return client_channel_->target(); }
480
GetAuthority()481 absl::string_view GetAuthority() override {
482 return client_channel_->default_authority_;
483 }
484
GetChannelCredentials()485 RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
486 return client_channel_->channel_args_.GetObject<grpc_channel_credentials>()
487 ->duplicate_without_call_credentials();
488 }
489
GetUnsafeChannelCredentials()490 RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
491 override {
492 return client_channel_->channel_args_.GetObject<grpc_channel_credentials>()
493 ->Ref();
494 }
495
GetEventEngine()496 EventEngine* GetEventEngine() override {
497 return client_channel_->event_engine();
498 }
499
GetStatsPluginGroup()500 GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
501 return client_channel_->stats_plugin_group_;
502 }
503
AddTraceEvent(TraceSeverity severity,absl::string_view message)504 void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
505 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*client_channel_->work_serializer_) {
506 if (client_channel_->resolver_ == nullptr) return; // Shutting down.
507 if (client_channel_->channelz_node_ != nullptr) {
508 client_channel_->channelz_node_->AddTraceEvent(
509 ConvertSeverityEnum(severity),
510 grpc_slice_from_copied_buffer(message.data(), message.size()));
511 }
512 }
513
514 private:
ConvertSeverityEnum(TraceSeverity severity)515 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
516 TraceSeverity severity) {
517 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
518 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
519 return channelz::ChannelTrace::Error;
520 }
521
522 WeakRefCountedPtr<ClientChannel> client_channel_;
523 };
524
525 //
526 // ClientChannel implementation
527 //
528
529 namespace {
530
GetSubchannelPool(const ChannelArgs & args)531 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
532 const ChannelArgs& args) {
533 if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
534 return MakeRefCounted<LocalSubchannelPool>();
535 }
536 return GlobalSubchannelPool::instance();
537 }
538
539 } // namespace
540
Create(std::string target,ChannelArgs channel_args)541 absl::StatusOr<RefCountedPtr<Channel>> ClientChannel::Create(
542 std::string target, ChannelArgs channel_args) {
543 // Get URI to resolve, using proxy mapper if needed.
544 if (target.empty()) {
545 return absl::InternalError("target URI is empty in client channel");
546 }
547 std::string uri_to_resolve = CoreConfiguration::Get()
548 .proxy_mapper_registry()
549 .MapName(target, &channel_args)
550 .value_or(target);
551 // Make sure the URI to resolve is valid, so that we know that
552 // resolver creation will succeed later.
553 if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
554 uri_to_resolve)) {
555 return absl::InvalidArgumentError(
556 absl::StrCat("invalid target URI: ", uri_to_resolve));
557 }
558 // Get default service config. If none is specified via the client API,
559 // we use an empty config.
560 absl::optional<absl::string_view> service_config_json =
561 channel_args.GetString(GRPC_ARG_SERVICE_CONFIG);
562 if (!service_config_json.has_value()) service_config_json = "{}";
563 auto default_service_config =
564 ServiceConfigImpl::Create(channel_args, *service_config_json);
565 if (!default_service_config.ok()) return default_service_config.status();
566 // Strip out service config channel arg, so that it doesn't affect
567 // subchannel uniqueness when the args flow down to that layer.
568 channel_args = channel_args.Remove(GRPC_ARG_SERVICE_CONFIG);
569 // Check client channel factory.
570 auto* client_channel_factory = channel_args.GetObject<ClientChannelFactory>();
571 if (client_channel_factory == nullptr) {
572 return absl::InternalError(
573 "Missing client channel factory in args for client channel");
574 }
575 auto* call_destination_factory =
576 channel_args.GetObject<CallDestinationFactory>();
577 if (call_destination_factory == nullptr) {
578 return absl::InternalError(
579 "Missing call destination factory in args for client channel");
580 }
581 if (channel_args.GetObject<EventEngine>() == nullptr) {
582 return absl::InternalError(
583 "Missing event engine in args for client channel");
584 }
585 // Success. Construct channel.
586 return MakeRefCounted<ClientChannel>(
587 std::move(target), std::move(channel_args), std::move(uri_to_resolve),
588 std::move(*default_service_config), client_channel_factory,
589 call_destination_factory);
590 }
591
592 namespace {
GetDefaultAuthorityFromChannelArgs(const ChannelArgs & channel_args,absl::string_view target)593 std::string GetDefaultAuthorityFromChannelArgs(const ChannelArgs& channel_args,
594 absl::string_view target) {
595 absl::optional<std::string> default_authority =
596 channel_args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
597 if (!default_authority.has_value()) {
598 return CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
599 target);
600 } else {
601 return std::move(*default_authority);
602 }
603 }
604 } // namespace
605
ClientChannel(std::string target,ChannelArgs channel_args,std::string uri_to_resolve,RefCountedPtr<ServiceConfig> default_service_config,ClientChannelFactory * client_channel_factory,CallDestinationFactory * call_destination_factory)606 ClientChannel::ClientChannel(
607 std::string target, ChannelArgs channel_args, std::string uri_to_resolve,
608 RefCountedPtr<ServiceConfig> default_service_config,
609 ClientChannelFactory* client_channel_factory,
610 CallDestinationFactory* call_destination_factory)
611 : Channel(std::move(target), channel_args),
612 channel_args_(std::move(channel_args)),
613 event_engine_(channel_args_.GetObjectRef<EventEngine>()),
614 uri_to_resolve_(std::move(uri_to_resolve)),
615 service_config_parser_index_(
616 internal::ClientChannelServiceConfigParser::ParserIndex()),
617 default_service_config_(std::move(default_service_config)),
618 client_channel_factory_(client_channel_factory),
619 default_authority_(
620 GetDefaultAuthorityFromChannelArgs(channel_args_, this->target())),
621 channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
622 idle_timeout_(GetClientIdleTimeout(channel_args_)),
623 resolver_data_for_calls_(ResolverDataForCalls{}),
624 picker_(nullptr),
625 call_destination_(
626 call_destination_factory->CreateCallDestination(picker_)),
627 work_serializer_(std::make_shared<WorkSerializer>(event_engine_)),
628 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
629 subchannel_pool_(GetSubchannelPool(channel_args_)) {
630 CHECK(event_engine_.get() != nullptr);
631 GRPC_TRACE_LOG(client_channel, INFO)
632 << "client_channel=" << this << ": creating client_channel";
633 // Set initial keepalive time.
634 auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
635 if (keepalive_arg.has_value()) {
636 keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
637 } else {
638 keepalive_time_ = -1; // unset
639 }
640 // Get stats plugins for channel.
641 grpc_event_engine::experimental::ChannelArgsEndpointConfig endpoint_config(
642 channel_args_);
643 experimental::StatsPluginChannelScope scope(
644 this->target(), default_authority_, endpoint_config);
645 stats_plugin_group_ =
646 GlobalStatsPluginRegistry::GetStatsPluginsForChannel(scope);
647 }
648
~ClientChannel()649 ClientChannel::~ClientChannel() {
650 GRPC_TRACE_LOG(client_channel, INFO)
651 << "client_channel=" << this << ": destroying";
652 }
653
Orphaned()654 void ClientChannel::Orphaned() {
655 GRPC_TRACE_LOG(client_channel, INFO)
656 << "client_channel=" << this << ": shutting down";
657 // Weird capture then copy needed to satisfy thread safety analysis,
658 // otherwise it seems to fail to recognize the correct lock is taken in the
659 // lambda.
660 auto self = WeakRefAsSubclass<ClientChannel>();
661 work_serializer_->Run(
662 [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
663 self->DestroyResolverAndLbPolicyLocked();
664 },
665 DEBUG_LOCATION);
666 // IncreaseCallCount() introduces a phony call and prevents the idle
667 // timer from being reset by other threads.
668 idle_state_.IncreaseCallCount();
669 idle_activity_.Reset();
670 }
671
CheckConnectivityState(bool try_to_connect)672 grpc_connectivity_state ClientChannel::CheckConnectivityState(
673 bool try_to_connect) {
674 // state_tracker_ is guarded by work_serializer_, which we're not
675 // holding here. But the one method of state_tracker_ that *is*
676 // thread-safe to call without external synchronization is the state()
677 // method, so we can disable thread-safety analysis for this one read.
678 grpc_connectivity_state state =
679 ABSL_TS_UNCHECKED_READ(state_tracker_).state();
680 if (state == GRPC_CHANNEL_IDLE && try_to_connect) {
681 auto self = WeakRefAsSubclass<ClientChannel>(); // Held by callback.
682 work_serializer_->Run(
683 [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
684 self->TryToConnectLocked();
685 },
686 DEBUG_LOCATION);
687 }
688 return state;
689 }
690
691 namespace {
692
693 // A fire-and-forget object to handle external connectivity state watches.
694 class ExternalStateWatcher : public RefCounted<ExternalStateWatcher> {
695 public:
ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,grpc_completion_queue * cq,void * tag,grpc_connectivity_state last_observed_state,Timestamp deadline)696 ExternalStateWatcher(WeakRefCountedPtr<ClientChannel> channel,
697 grpc_completion_queue* cq, void* tag,
698 grpc_connectivity_state last_observed_state,
699 Timestamp deadline)
700 : channel_(std::move(channel)), cq_(cq), tag_(tag) {
701 MutexLock lock(&mu_);
702 // Start watch. This inherits the ref from creation.
703 auto watcher =
704 MakeOrphanable<Watcher>(RefCountedPtr<ExternalStateWatcher>(this));
705 watcher_ = watcher.get();
706 channel_->AddConnectivityWatcher(last_observed_state, std::move(watcher));
707 // Start timer. This takes a second ref.
708 const Duration timeout = deadline - Timestamp::Now();
709 timer_handle_ =
710 channel_->event_engine()->RunAfter(timeout, [self = Ref()]() mutable {
711 ApplicationCallbackExecCtx callback_exec_ctx;
712 ExecCtx exec_ctx;
713 self->MaybeStartCompletion(absl::DeadlineExceededError(
714 "Timed out waiting for connection state change"));
715 // ExternalStateWatcher deletion might require an active ExecCtx.
716 self.reset();
717 });
718 }
719
720 private:
721 class Watcher : public AsyncConnectivityStateWatcherInterface {
722 public:
Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)723 explicit Watcher(RefCountedPtr<ExternalStateWatcher> external_state_watcher)
724 : external_state_watcher_(std::move(external_state_watcher)) {}
725
OnConnectivityStateChange(grpc_connectivity_state,const absl::Status &)726 void OnConnectivityStateChange(grpc_connectivity_state /*new_state*/,
727 const absl::Status& /*status*/) override {
728 external_state_watcher_->MaybeStartCompletion(absl::OkStatus());
729 }
730
731 private:
732 RefCountedPtr<ExternalStateWatcher> external_state_watcher_;
733 };
734
735 // This is called both when the watch reports a new connectivity state
736 // and when the timer fires. It will trigger a CQ notification only
737 // on the first call. Subsequent calls will be ignored, because
738 // events can come in asynchronously.
MaybeStartCompletion(absl::Status status)739 void MaybeStartCompletion(absl::Status status) {
740 MutexLock lock(&mu_);
741 if (watcher_ == nullptr) return; // Ignore subsequent notifications.
742 // Cancel watch.
743 channel_->RemoveConnectivityWatcher(watcher_);
744 watcher_ = nullptr;
745 // Cancel timer.
746 channel_->event_engine()->Cancel(timer_handle_);
747 // Send CQ completion.
748 Ref().release(); // Released in FinishedCompletion().
749 grpc_cq_end_op(cq_, tag_, status, FinishedCompletion, this,
750 &completion_storage_);
751 }
752
753 // Called when the completion is returned to the CQ.
FinishedCompletion(void * arg,grpc_cq_completion *)754 static void FinishedCompletion(void* arg, grpc_cq_completion* /*ignored*/) {
755 auto* self = static_cast<ExternalStateWatcher*>(arg);
756 self->Unref();
757 }
758
759 WeakRefCountedPtr<ClientChannel> channel_;
760
761 Mutex mu_;
762 grpc_completion_queue* cq_ ABSL_GUARDED_BY(&mu_);
763 void* tag_ ABSL_GUARDED_BY(&mu_);
764 grpc_cq_completion completion_storage_ ABSL_GUARDED_BY(&mu_);
765 Watcher* watcher_ ABSL_GUARDED_BY(&mu_) = nullptr;
766 grpc_event_engine::experimental::EventEngine::TaskHandle timer_handle_
767 ABSL_GUARDED_BY(&mu_);
768 };
769
770 } // namespace
771
WatchConnectivityState(grpc_connectivity_state state,Timestamp deadline,grpc_completion_queue * cq,void * tag)772 void ClientChannel::WatchConnectivityState(grpc_connectivity_state state,
773 Timestamp deadline,
774 grpc_completion_queue* cq,
775 void* tag) {
776 new ExternalStateWatcher(WeakRefAsSubclass<ClientChannel>(), cq, tag, state,
777 deadline);
778 }
779
AddConnectivityWatcher(grpc_connectivity_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface>)780 void ClientChannel::AddConnectivityWatcher(
781 grpc_connectivity_state,
782 OrphanablePtr<AsyncConnectivityStateWatcherInterface>) {
783 Crash("not implemented");
784 // TODO(ctiller): to make this work, need to change WorkSerializer to use
785 // absl::AnyInvocable<> instead of std::function<>
786 // work_serializer_->Run(
787 // [self = RefAsSubclass<ClientChannel>(), initial_state,
788 // watcher = std::move(watcher)]()
789 // ABSL_EXCLUSIVE_LOCKS_REQUIRED(*work_serializer_) {
790 // self->state_tracker_.AddWatcher(initial_state, std::move(watcher));
791 // },
792 // DEBUG_LOCATION);
793 }
794
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)795 void ClientChannel::RemoveConnectivityWatcher(
796 AsyncConnectivityStateWatcherInterface* watcher) {
797 auto self = RefAsSubclass<ClientChannel>(); // Held by callback.
798 work_serializer_->Run(
799 [self, watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
800 self->state_tracker_.RemoveWatcher(watcher);
801 },
802 DEBUG_LOCATION);
803 }
804
GetInfo(const grpc_channel_info * info)805 void ClientChannel::GetInfo(const grpc_channel_info* info) {
806 MutexLock lock(&info_mu_);
807 if (info->lb_policy_name != nullptr) {
808 *info->lb_policy_name = gpr_strdup(info_lb_policy_name_.c_str());
809 }
810 if (info->service_config_json != nullptr) {
811 *info->service_config_json = gpr_strdup(info_service_config_json_.c_str());
812 }
813 }
814
ResetConnectionBackoff()815 void ClientChannel::ResetConnectionBackoff() {
816 auto self = RefAsSubclass<ClientChannel>();
817 work_serializer_->Run(
818 [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
819 if (self->lb_policy_ != nullptr) self->lb_policy_->ResetBackoffLocked();
820 },
821 DEBUG_LOCATION);
822 }
823
824 namespace {
825
826 // A class to handle CQ completion for a ping.
827 class PingRequest {
828 public:
PingRequest(grpc_completion_queue * cq,void * tag)829 PingRequest(grpc_completion_queue* cq, void* tag) : cq_(cq), tag_(tag) {
830 grpc_cq_begin_op(cq, tag);
831 }
832
833 // Triggers CQ completion and eventually deletes the PingRequest object.
Complete(grpc_error_handle error)834 void Complete(grpc_error_handle error) {
835 grpc_cq_end_op(cq_, tag_, error, Destroy, this, &completion_storage_);
836 }
837
838 private:
Destroy(void * arg,grpc_cq_completion *)839 static void Destroy(void* arg, grpc_cq_completion* /*storage*/) {
840 delete static_cast<PingRequest*>(arg);
841 }
842
843 grpc_completion_queue* cq_;
844 void* tag_;
845 grpc_cq_completion completion_storage_;
846 };
847
848 } // namespace
849
Ping(grpc_completion_queue *,void *)850 void ClientChannel::Ping(grpc_completion_queue*, void*) {
851 // TODO(ctiller): implement
852 Crash("not implemented");
853 }
854
CreateCall(grpc_call * parent_call,uint32_t propagation_mask,grpc_completion_queue * cq,grpc_pollset_set *,Slice path,absl::optional<Slice> authority,Timestamp deadline,bool)855 grpc_call* ClientChannel::CreateCall(
856 grpc_call* parent_call, uint32_t propagation_mask,
857 grpc_completion_queue* cq, grpc_pollset_set* /*pollset_set_alternative*/,
858 Slice path, absl::optional<Slice> authority, Timestamp deadline, bool) {
859 auto arena = call_arena_allocator()->MakeArena();
860 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
861 event_engine());
862 return MakeClientCall(parent_call, propagation_mask, cq, std::move(path),
863 std::move(authority), false, deadline,
864 compression_options(), std::move(arena), Ref());
865 }
866
StartCall(UnstartedCallHandler unstarted_handler)867 void ClientChannel::StartCall(UnstartedCallHandler unstarted_handler) {
868 // Increment call count.
869 if (idle_timeout_ != Duration::Zero()) idle_state_.IncreaseCallCount();
870 // Exit IDLE if needed.
871 CheckConnectivityState(/*try_to_connect=*/true);
872 // Spawn a promise to wait for the resolver result.
873 // This will eventually start the call.
874 unstarted_handler.SpawnGuardedUntilCallCompletes(
875 "wait-for-name-resolution",
876 [self = RefAsSubclass<ClientChannel>(), unstarted_handler]() mutable {
877 const bool wait_for_ready =
878 unstarted_handler.UnprocessedClientInitialMetadata()
879 .GetOrCreatePointer(WaitForReady())
880 ->value;
881 return Map(
882 // Wait for the resolver result.
883 CheckDelayed(self->resolver_data_for_calls_.NextWhen(
884 [wait_for_ready](
885 const absl::StatusOr<ResolverDataForCalls> result) {
886 bool got_result = false;
887 // If the resolver reports an error but the call is
888 // wait_for_ready, keep waiting for the next result
889 // instead of failing the call.
890 if (!result.ok()) {
891 got_result = !wait_for_ready;
892 } else {
893 // Not an error. Make sure we actually have a result.
894 got_result = result->config_selector != nullptr;
895 }
896 return got_result;
897 })),
898 // Handle resolver result.
899 [self, unstarted_handler](
900 std::tuple<absl::StatusOr<ResolverDataForCalls>, bool>
901 result_and_delayed) mutable {
902 auto& resolver_data = std::get<0>(result_and_delayed);
903 const bool was_queued = std::get<1>(result_and_delayed);
904 if (!resolver_data.ok()) return resolver_data.status();
905 // Apply service config to call.
906 absl::Status status = self->ApplyServiceConfigToCall(
907 *resolver_data->config_selector,
908 unstarted_handler.UnprocessedClientInitialMetadata());
909 if (!status.ok()) return status;
910 // If the call was queued, add trace annotation.
911 if (was_queued) {
912 auto* call_tracer =
913 MaybeGetContext<CallTracerAnnotationInterface>();
914 if (call_tracer != nullptr) {
915 call_tracer->RecordAnnotation(
916 "Delayed name resolution complete.");
917 }
918 }
919 // Start the call on the destination provided by the
920 // resolver.
921 resolver_data->call_destination->StartCall(
922 std::move(unstarted_handler));
923 return absl::OkStatus();
924 });
925 });
926 }
927
CreateResolverLocked()928 void ClientChannel::CreateResolverLocked() {
929 GRPC_TRACE_LOG(client_channel, INFO)
930 << "client_channel=" << this << ": starting name resolution for "
931 << uri_to_resolve_;
932 resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
933 uri_to_resolve_, channel_args_, nullptr, work_serializer_,
934 std::make_unique<ResolverResultHandler>(
935 WeakRefAsSubclass<ClientChannel>()));
936 // Since the validity of the args was checked when the channel was created,
937 // CreateResolver() must return a non-null result.
938 CHECK(resolver_ != nullptr);
939 UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
940 "started resolving");
941 resolver_->StartLocked();
942 GRPC_TRACE_LOG(client_channel, INFO)
943 << "client_channel=" << this << ": created resolver=" << resolver_.get();
944 }
945
DestroyResolverAndLbPolicyLocked()946 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
947 if (resolver_ != nullptr) {
948 GRPC_TRACE_LOG(client_channel, INFO)
949 << "client_channel=" << this
950 << ": shutting down resolver=" << resolver_.get();
951 resolver_.reset();
952 saved_service_config_.reset();
953 saved_config_selector_.reset();
954 resolver_data_for_calls_.Set(ResolverDataForCalls{nullptr, nullptr});
955 // Clear LB policy if set.
956 if (lb_policy_ != nullptr) {
957 GRPC_TRACE_LOG(client_channel, INFO)
958 << "client_channel=" << this
959 << ": shutting down lb_policy=" << lb_policy_.get();
960 lb_policy_.reset();
961 picker_.Set(MakeRefCounted<LoadBalancingPolicy::DropPicker>(
962 absl::UnavailableError("Channel shutdown")));
963 }
964 }
965 }
966
TryToConnectLocked()967 void ClientChannel::TryToConnectLocked() {
968 if (disconnect_error_.ok()) {
969 if (lb_policy_ != nullptr) {
970 lb_policy_->ExitIdleLocked();
971 } else if (resolver_ == nullptr) {
972 CreateResolverLocked();
973 }
974 }
975 }
976
977 namespace {
978
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)979 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
980 const Resolver::Result& resolver_result,
981 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
982 // Prefer the LB policy config found in the service config.
983 if (parsed_service_config->parsed_lb_config() != nullptr) {
984 return parsed_service_config->parsed_lb_config();
985 }
986 // Try the deprecated LB policy name from the service config.
987 // If not, try the setting from channel args.
988 absl::optional<absl::string_view> policy_name;
989 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
990 policy_name = parsed_service_config->parsed_deprecated_lb_policy();
991 } else {
992 policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
993 bool requires_config = false;
994 if (policy_name.has_value() &&
995 (!CoreConfiguration::Get()
996 .lb_policy_registry()
997 .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
998 requires_config)) {
999 if (requires_config) {
1000 LOG(ERROR) << "LB policy: " << *policy_name
1001 << " passed through channel_args must not "
1002 "require a config. Using pick_first instead.";
1003 } else {
1004 LOG(ERROR) << "LB policy: " << *policy_name
1005 << " passed through channel_args does not exist. "
1006 "Using pick_first instead.";
1007 }
1008 policy_name = "pick_first";
1009 }
1010 }
1011 // Use pick_first if nothing was specified and we didn't select grpclb
1012 // above.
1013 if (!policy_name.has_value()) policy_name = "pick_first";
1014 // Now that we have the policy name, construct an empty config for it.
1015 Json config_json = Json::FromArray({Json::FromObject({
1016 {std::string(*policy_name), Json::FromObject({})},
1017 })});
1018 auto lb_policy_config =
1019 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1020 config_json);
1021 // The policy name came from one of three places:
1022 // - The deprecated loadBalancingPolicy field in the service config,
1023 // in which case the code in ClientChannelServiceConfigParser
1024 // already verified that the policy does not require a config.
1025 // - One of the hard-coded values here, all of which are known to not
1026 // require a config.
1027 // - A channel arg, in which case we check that the specified policy exists
1028 // and accepts an empty config. If not, we revert to using pick_first
1029 // lb_policy
1030 CHECK_OK(lb_policy_config);
1031 return std::move(*lb_policy_config);
1032 }
1033
1034 } // namespace
1035
OnResolverResultChangedLocked(Resolver::Result result)1036 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1037 // Handle race conditions.
1038 if (resolver_ == nullptr) return;
1039 GRPC_TRACE_LOG(client_channel, INFO)
1040 << "client_channel=" << this << ": got resolver result";
1041 // Grab resolver result health callback.
1042 auto resolver_callback = std::move(result.result_health_callback);
1043 absl::Status resolver_result_status;
1044 // We only want to trace the address resolution in the follow cases:
1045 // (a) Address resolution resulted in service config change.
1046 // (b) Address resolution that causes number of backends to go from
1047 // zero to non-zero.
1048 // (c) Address resolution that causes number of backends to go from
1049 // non-zero to zero.
1050 // (d) Address resolution that causes a new LB policy to be created.
1051 //
1052 // We track a list of strings to eventually be concatenated and traced.
1053 std::vector<const char*> trace_strings;
1054 const bool resolution_contains_addresses =
1055 result.addresses.ok() && !result.addresses->empty();
1056 if (!resolution_contains_addresses &&
1057 previous_resolution_contained_addresses_) {
1058 trace_strings.push_back("Address list became empty");
1059 } else if (resolution_contains_addresses &&
1060 !previous_resolution_contained_addresses_) {
1061 trace_strings.push_back("Address list became non-empty");
1062 }
1063 previous_resolution_contained_addresses_ = resolution_contains_addresses;
1064 std::string service_config_error_string_storage;
1065 if (!result.service_config.ok()) {
1066 service_config_error_string_storage =
1067 result.service_config.status().ToString();
1068 trace_strings.push_back(service_config_error_string_storage.c_str());
1069 }
1070 // Choose the service config.
1071 RefCountedPtr<ServiceConfig> service_config;
1072 RefCountedPtr<ConfigSelector> config_selector;
1073 if (!result.service_config.ok()) {
1074 GRPC_TRACE_LOG(client_channel, INFO)
1075 << "client_channel=" << this
1076 << ": resolver returned service config error: "
1077 << result.service_config.status();
1078 // If the service config was invalid, then fallback to the
1079 // previously returned service config, if any.
1080 if (saved_service_config_ != nullptr) {
1081 GRPC_TRACE_LOG(client_channel, INFO)
1082 << "client_channel=" << this
1083 << ": resolver returned invalid service config; "
1084 "continuing to use previous service config";
1085 service_config = saved_service_config_;
1086 config_selector = saved_config_selector_;
1087 } else {
1088 // We received a service config error and we don't have a
1089 // previous service config to fall back to. Put the channel into
1090 // TRANSIENT_FAILURE.
1091 OnResolverErrorLocked(result.service_config.status());
1092 trace_strings.push_back("no valid service config");
1093 resolver_result_status =
1094 absl::UnavailableError("no valid service config");
1095 }
1096 } else if (*result.service_config == nullptr) {
1097 // Resolver did not return any service config.
1098 GRPC_TRACE_LOG(client_channel, INFO)
1099 << "client_channel=" << this
1100 << ": resolver returned no service config; using default service "
1101 "config for channel";
1102 service_config = default_service_config_;
1103 } else {
1104 // Use ServiceConfig and ConfigSelector returned by resolver.
1105 service_config = std::move(*result.service_config);
1106 config_selector = result.args.GetObjectRef<ConfigSelector>();
1107 }
1108 // Remove the config selector from channel args so that we're not holding
1109 // unnecessary refs that cause it to be destroyed somewhere other than in
1110 // the WorkSerializer.
1111 result.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1112 // Note: The only case in which service_config is null here is if the
1113 // resolver returned a service config error and we don't have a previous
1114 // service config to fall back to.
1115 if (service_config != nullptr) {
1116 // Extract global config for client channel.
1117 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1118 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1119 service_config->GetGlobalParsedConfig(
1120 service_config_parser_index_));
1121 // Choose LB policy config.
1122 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1123 ChooseLbPolicy(result, parsed_service_config);
1124 // Check if the ServiceConfig has changed.
1125 const bool service_config_changed =
1126 saved_service_config_ == nullptr ||
1127 service_config->json_string() != saved_service_config_->json_string();
1128 // Check if the ConfigSelector has changed.
1129 const bool config_selector_changed = !ConfigSelector::Equals(
1130 saved_config_selector_.get(), config_selector.get());
1131 // If either has changed, apply the global parameters now.
1132 if (service_config_changed || config_selector_changed) {
1133 // Update service config in control plane.
1134 UpdateServiceConfigInControlPlaneLocked(
1135 std::move(service_config), std::move(config_selector),
1136 std::string(lb_policy_config->name()));
1137 // TODO(ncteisen): might be worth somehow including a snippet of the
1138 // config in the trace, at the risk of bloating the trace logs.
1139 trace_strings.push_back("Service config changed");
1140 } else {
1141 GRPC_TRACE_LOG(client_channel, INFO)
1142 << "client_channel=" << this << ": service config not changed";
1143 }
1144 // Create or update LB policy, as needed.
1145 ChannelArgs new_args = result.args;
1146 resolver_result_status = CreateOrUpdateLbPolicyLocked(
1147 std::move(lb_policy_config),
1148 parsed_service_config->health_check_service_name(), std::move(result));
1149 // Start using new service config for calls.
1150 // This needs to happen after the LB policy has been updated, since
1151 // the ConfigSelector may need the LB policy to know about new
1152 // destinations before it can send RPCs to those destinations.
1153 if (service_config_changed || config_selector_changed) {
1154 UpdateServiceConfigInDataPlaneLocked(new_args);
1155 }
1156 }
1157 // Invoke resolver callback if needed.
1158 if (resolver_callback != nullptr) {
1159 resolver_callback(std::move(resolver_result_status));
1160 }
1161 // Add channel trace event.
1162 if (!trace_strings.empty()) {
1163 std::string message =
1164 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1165 if (channelz_node_ != nullptr) {
1166 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1167 grpc_slice_from_cpp_string(message));
1168 }
1169 }
1170 }
1171
OnResolverErrorLocked(absl::Status status)1172 void ClientChannel::OnResolverErrorLocked(absl::Status status) {
1173 if (resolver_ == nullptr) return;
1174 GRPC_TRACE_LOG(client_channel, INFO)
1175 << "client_channel=" << this
1176 << ": resolver transient failure: " << status;
1177 // If we already have an LB policy from a previous resolution
1178 // result, then we continue to let it set the connectivity state.
1179 // Otherwise, we go into TRANSIENT_FAILURE.
1180 if (lb_policy_ == nullptr) {
1181 // Update connectivity state.
1182 UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1183 "resolver failure");
1184 // Send updated resolver result.
1185 resolver_data_for_calls_.Set(
1186 MaybeRewriteIllegalStatusCode(status, "resolver"));
1187 }
1188 }
1189
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1190 absl::Status ClientChannel::CreateOrUpdateLbPolicyLocked(
1191 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1192 const absl::optional<std::string>& health_check_service_name,
1193 Resolver::Result result) {
1194 // Construct update.
1195 LoadBalancingPolicy::UpdateArgs update_args;
1196 if (!result.addresses.ok()) {
1197 update_args.addresses = result.addresses.status();
1198 } else {
1199 update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1200 std::move(*result.addresses));
1201 }
1202 update_args.config = std::move(lb_policy_config);
1203 update_args.resolution_note = std::move(result.resolution_note);
1204 update_args.args = std::move(result.args);
1205 // Add health check service name to channel args.
1206 if (health_check_service_name.has_value()) {
1207 update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1208 *health_check_service_name);
1209 }
1210 // Create policy if needed.
1211 if (lb_policy_ == nullptr) {
1212 lb_policy_ = CreateLbPolicyLocked(update_args.args);
1213 }
1214 // Update the policy.
1215 GRPC_TRACE_LOG(client_channel, INFO)
1216 << "client_channel=" << this << ": Updating child policy "
1217 << lb_policy_.get();
1218 return lb_policy_->UpdateLocked(std::move(update_args));
1219 }
1220
1221 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1222 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1223 const ChannelArgs& args) {
1224 // The LB policy will start in state CONNECTING but will not
1225 // necessarily send us an update synchronously, so set state to
1226 // CONNECTING (in case the resolver had previously failed and put the
1227 // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1228 UpdateStateAndPickerLocked(
1229 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1230 MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1231 // Now create the LB policy.
1232 LoadBalancingPolicy::Args lb_policy_args;
1233 lb_policy_args.work_serializer = work_serializer_;
1234 lb_policy_args.channel_control_helper =
1235 std::make_unique<ClientChannelControlHelper>(
1236 WeakRefAsSubclass<ClientChannel>());
1237 lb_policy_args.args = args;
1238 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1239 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1240 &client_channel_trace);
1241 GRPC_TRACE_LOG(client_channel, INFO)
1242 << "client_channel=" << this << ": created new LB policy "
1243 << lb_policy.get();
1244 return lb_policy;
1245 }
1246
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1247 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1248 RefCountedPtr<ServiceConfig> service_config,
1249 RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1250 std::string service_config_json(service_config->json_string());
1251 // Update service config.
1252 GRPC_TRACE_LOG(client_channel, INFO)
1253 << "client_channel=" << this << ": using service config: \""
1254 << service_config_json << "\"";
1255 saved_service_config_ = std::move(service_config);
1256 // Update config selector.
1257 GRPC_TRACE_LOG(client_channel, INFO)
1258 << "client_channel=" << this << ": using ConfigSelector "
1259 << config_selector.get();
1260 saved_config_selector_ = std::move(config_selector);
1261 // Update the data used by GetChannelInfo().
1262 {
1263 MutexLock lock(&info_mu_);
1264 info_lb_policy_name_ = std::move(lb_policy_name);
1265 info_service_config_json_ = std::move(service_config_json);
1266 }
1267 }
1268
UpdateServiceConfigInDataPlaneLocked(const ChannelArgs & args)1269 void ClientChannel::UpdateServiceConfigInDataPlaneLocked(
1270 const ChannelArgs& args) {
1271 GRPC_TRACE_LOG(client_channel, INFO)
1272 << "client_channel=" << this << ": switching to ConfigSelector "
1273 << saved_config_selector_.get();
1274 // Use default config selector if resolver didn't supply one.
1275 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1276 if (config_selector == nullptr) {
1277 config_selector =
1278 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1279 }
1280 // Modify channel args.
1281 ChannelArgs new_args = args.SetObject(this).SetObject(saved_service_config_);
1282 // Construct filter stack.
1283 auto new_blackboard = MakeRefCounted<Blackboard>();
1284 InterceptionChainBuilder builder(new_args, blackboard_.get(),
1285 new_blackboard.get());
1286 if (idle_timeout_ != Duration::Zero()) {
1287 builder.AddOnServerTrailingMetadata([this](ServerMetadata&) {
1288 if (idle_state_.DecreaseCallCount()) StartIdleTimer();
1289 });
1290 }
1291 CoreConfiguration::Get().channel_init().AddToInterceptionChainBuilder(
1292 GRPC_CLIENT_CHANNEL, builder);
1293 // Add filters returned by the config selector (e.g., xDS HTTP filters).
1294 config_selector->AddFilters(builder);
1295 const bool enable_retries =
1296 !channel_args_.WantMinimalStack() &&
1297 channel_args_.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1298 if (enable_retries) {
1299 builder.Add<RetryInterceptor>();
1300 }
1301 // Create call destination.
1302 auto top_of_stack_call_destination = builder.Build(call_destination_);
1303 blackboard_ = std::move(new_blackboard);
1304 // Send result to data plane.
1305 if (!top_of_stack_call_destination.ok()) {
1306 resolver_data_for_calls_.Set(MaybeRewriteIllegalStatusCode(
1307 top_of_stack_call_destination.status(), "channel construction"));
1308 } else {
1309 resolver_data_for_calls_.Set(ResolverDataForCalls{
1310 std::move(config_selector), std::move(*top_of_stack_call_destination)});
1311 }
1312 }
1313
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1314 void ClientChannel::UpdateStateLocked(grpc_connectivity_state state,
1315 const absl::Status& status,
1316 const char* reason) {
1317 if (state != GRPC_CHANNEL_SHUTDOWN &&
1318 state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1319 Crash("Illegal transition SHUTDOWN -> anything");
1320 }
1321 state_tracker_.SetState(state, status, reason);
1322 if (channelz_node_ != nullptr) {
1323 channelz_node_->SetConnectivityState(state);
1324 channelz_node_->AddTraceEvent(
1325 channelz::ChannelTrace::Severity::Info,
1326 grpc_slice_from_static_string(
1327 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1328 state)));
1329 }
1330 }
1331
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1332 void ClientChannel::UpdateStateAndPickerLocked(
1333 grpc_connectivity_state state, const absl::Status& status,
1334 const char* reason,
1335 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1336 UpdateStateLocked(state, status, reason);
1337 picker_.Set(std::move(picker));
1338 }
1339
StartIdleTimer()1340 void ClientChannel::StartIdleTimer() {
1341 GRPC_TRACE_LOG(client_channel, INFO)
1342 << "client_channel=" << this << ": idle timer started";
1343 auto self = WeakRefAsSubclass<ClientChannel>();
1344 auto promise = Loop([self]() {
1345 return TrySeq(Sleep(Timestamp::Now() + self->idle_timeout_),
1346 [self]() -> Poll<LoopCtl<absl::Status>> {
1347 if (self->idle_state_.CheckTimer()) {
1348 return Continue{};
1349 } else {
1350 return absl::OkStatus();
1351 }
1352 });
1353 });
1354 auto arena = SimpleArenaAllocator(0)->MakeArena();
1355 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
1356 event_engine());
1357 idle_activity_.Set(MakeActivity(
1358 std::move(promise), ExecCtxWakeupScheduler{},
1359 [self = std::move(self)](absl::Status status) mutable {
1360 if (status.ok()) {
1361 self->work_serializer_->Run(
1362 [self]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*self->work_serializer_) {
1363 self->DestroyResolverAndLbPolicyLocked();
1364 self->UpdateStateAndPickerLocked(
1365 GRPC_CHANNEL_IDLE, absl::OkStatus(),
1366 "channel entering IDLE", nullptr);
1367 // TODO(roth): In case there's a race condition, we
1368 // might need to check for any calls that are
1369 // queued waiting for a resolver result or an LB
1370 // pick.
1371 },
1372 DEBUG_LOCATION);
1373 }
1374 },
1375 std::move(arena)));
1376 }
1377
ApplyServiceConfigToCall(ConfigSelector & config_selector,ClientMetadata & client_initial_metadata) const1378 absl::Status ClientChannel::ApplyServiceConfigToCall(
1379 ConfigSelector& config_selector,
1380 ClientMetadata& client_initial_metadata) const {
1381 GRPC_TRACE_LOG(client_channel_call, INFO)
1382 << "client_channel=" << this << ": " << GetContext<Activity>()->DebugTag()
1383 << " service config to call";
1384 // Create a ClientChannelServiceConfigCallData for the call. This stores
1385 // a ref to the ServiceConfig and caches the right set of parsed configs
1386 // to use for the call. The ClientChannelServiceConfigCallData will store
1387 // itself in the call context, so that it can be accessed by filters
1388 // below us in the stack, and it will be cleaned up when the call ends.
1389 auto* service_config_call_data =
1390 GetContext<Arena>()->New<ClientChannelServiceConfigCallData>(
1391 GetContext<Arena>());
1392 // Use the ConfigSelector to determine the config for the call.
1393 absl::Status call_config_status = config_selector.GetCallConfig(
1394 {&client_initial_metadata, GetContext<Arena>(),
1395 service_config_call_data});
1396 if (!call_config_status.ok()) {
1397 return MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector");
1398 }
1399 // Apply our own method params to the call.
1400 auto* method_params = DownCast<ClientChannelMethodParsedConfig*>(
1401 service_config_call_data->GetMethodParsedConfig(
1402 service_config_parser_index_));
1403 if (method_params != nullptr) {
1404 // If the service config specifies a deadline, update the call's
1405 // deadline timer.
1406 if (method_params->timeout() != Duration::Zero()) {
1407 Call* call = GetContext<Call>();
1408 const Timestamp per_method_deadline =
1409 Timestamp::FromCycleCounterRoundUp(call->start_time()) +
1410 method_params->timeout();
1411 call->UpdateDeadline(per_method_deadline);
1412 }
1413 // If the service config set wait_for_ready and the application
1414 // did not explicitly set it, use the value from the service config.
1415 auto* wait_for_ready =
1416 client_initial_metadata.GetOrCreatePointer(WaitForReady());
1417 if (method_params->wait_for_ready().has_value() &&
1418 !wait_for_ready->explicitly_set) {
1419 wait_for_ready->value = method_params->wait_for_ready().value();
1420 }
1421 }
1422 return absl::OkStatus();
1423 }
1424
1425 } // namespace grpc_core
1426