1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/client_channel/client_channel_filter.h"
18
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/slice.h>
22 #include <grpc/status.h>
23 #include <grpc/support/json.h>
24 #include <grpc/support/port_platform.h>
25 #include <grpc/support/string_util.h>
26 #include <grpc/support/time.h>
27 #include <inttypes.h>
28 #include <limits.h>
29
30 #include <algorithm>
31 #include <functional>
32 #include <new>
33 #include <set>
34 #include <type_traits>
35 #include <utility>
36 #include <vector>
37
38 #include "absl/cleanup/cleanup.h"
39 #include "absl/log/check.h"
40 #include "absl/log/log.h"
41 #include "absl/status/status.h"
42 #include "absl/status/statusor.h"
43 #include "absl/strings/cord.h"
44 #include "absl/strings/numbers.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/str_join.h"
47 #include "absl/strings/string_view.h"
48 #include "absl/types/optional.h"
49 #include "absl/types/variant.h"
50 #include "src/core/channelz/channel_trace.h"
51 #include "src/core/client_channel/backup_poller.h"
52 #include "src/core/client_channel/client_channel_internal.h"
53 #include "src/core/client_channel/client_channel_service_config.h"
54 #include "src/core/client_channel/config_selector.h"
55 #include "src/core/client_channel/dynamic_filters.h"
56 #include "src/core/client_channel/global_subchannel_pool.h"
57 #include "src/core/client_channel/lb_metadata.h"
58 #include "src/core/client_channel/local_subchannel_pool.h"
59 #include "src/core/client_channel/retry_filter.h"
60 #include "src/core/client_channel/subchannel.h"
61 #include "src/core/client_channel/subchannel_interface_internal.h"
62 #include "src/core/config/core_configuration.h"
63 #include "src/core/handshaker/proxy_mapper_registry.h"
64 #include "src/core/lib/address_utils/sockaddr_utils.h"
65 #include "src/core/lib/channel/channel_args.h"
66 #include "src/core/lib/channel/channel_stack.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/debug/trace.h"
69 #include "src/core/lib/experiments/experiments.h"
70 #include "src/core/lib/iomgr/exec_ctx.h"
71 #include "src/core/lib/iomgr/polling_entity.h"
72 #include "src/core/lib/iomgr/pollset_set.h"
73 #include "src/core/lib/iomgr/resolved_address.h"
74 #include "src/core/lib/promise/cancel_callback.h"
75 #include "src/core/lib/promise/context.h"
76 #include "src/core/lib/promise/latch.h"
77 #include "src/core/lib/promise/map.h"
78 #include "src/core/lib/promise/pipe.h"
79 #include "src/core/lib/promise/poll.h"
80 #include "src/core/lib/promise/promise.h"
81 #include "src/core/lib/promise/try_seq.h"
82 #include "src/core/lib/security/credentials/credentials.h"
83 #include "src/core/lib/slice/slice.h"
84 #include "src/core/lib/slice/slice_internal.h"
85 #include "src/core/lib/surface/call.h"
86 #include "src/core/lib/transport/connectivity_state.h"
87 #include "src/core/lib/transport/error_utils.h"
88 #include "src/core/lib/transport/metadata_batch.h"
89 #include "src/core/load_balancing/backend_metric_parser.h"
90 #include "src/core/load_balancing/child_policy_handler.h"
91 #include "src/core/load_balancing/lb_policy_registry.h"
92 #include "src/core/load_balancing/subchannel_interface.h"
93 #include "src/core/resolver/endpoint_addresses.h"
94 #include "src/core/resolver/resolver_registry.h"
95 #include "src/core/service_config/service_config_call_data.h"
96 #include "src/core/service_config/service_config_impl.h"
97 #include "src/core/util/crash.h"
98 #include "src/core/util/debug_location.h"
99 #include "src/core/util/json/json.h"
100 #include "src/core/util/manual_constructor.h"
101 #include "src/core/util/status_helper.h"
102 #include "src/core/util/sync.h"
103 #include "src/core/util/unique_type_name.h"
104 #include "src/core/util/useful.h"
105 #include "src/core/util/work_serializer.h"
106
107 //
108 // Client channel filter
109 //
110
111 namespace grpc_core {
112
113 using internal::ClientChannelMethodParsedConfig;
114
115 //
116 // ClientChannelFilter::CallData definition
117 //
118
119 class ClientChannelFilter::CallData {
120 public:
121 // Removes the call from the channel's list of calls queued
122 // for name resolution.
123 void RemoveCallFromResolverQueuedCallsLocked()
124 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
125
126 // Called by the channel for each queued call when a new resolution
127 // result becomes available.
128 virtual void RetryCheckResolutionLocked()
129 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) = 0;
130
dynamic_filters() const131 RefCountedPtr<DynamicFilters> dynamic_filters() const {
132 return dynamic_filters_;
133 }
134
135 protected:
136 CallData() = default;
137 virtual ~CallData() = default;
138
139 // Checks whether a resolver result is available. The following
140 // outcomes are possible:
141 // - No resolver result is available yet. The call will be queued and
142 // absl::nullopt will be returned. Later, when a resolver result
143 // becomes available, RetryCheckResolutionLocked() will be called.
144 // - The resolver has returned a transient failure. If the call is
145 // not wait_for_ready, a non-OK status will be returned. (If the
146 // call *is* wait_for_ready, it will be queued instead.)
147 // - There is a valid resolver result. The service config will be
148 // stored in the call context and an OK status will be returned.
149 absl::optional<absl::Status> CheckResolution(bool was_queued);
150
151 private:
152 // Accessors for data stored in the subclass.
153 virtual ClientChannelFilter* chand() const = 0;
154 virtual Arena* arena() const = 0;
155 virtual grpc_polling_entity* pollent() = 0;
156 virtual grpc_metadata_batch* send_initial_metadata() = 0;
157
158 // Helper function for CheckResolution(). Returns true if the call
159 // can continue (i.e., there is a valid resolution result, or there is
160 // an invalid resolution result but the call is not wait_for_ready).
161 bool CheckResolutionLocked(
162 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
163 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
164
165 // Adds the call to the channel's list of calls queued for name resolution.
166 void AddCallToResolverQueuedCallsLocked()
167 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
168
169 // Called when adding the call to the resolver queue.
OnAddToQueueLocked()170 virtual void OnAddToQueueLocked()
171 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {}
172
173 // Applies service config to the call. Must be invoked once we know
174 // that the resolver has returned results to the channel.
175 // If an error is returned, the error indicates the status with which
176 // the call should be failed.
177 grpc_error_handle ApplyServiceConfigToCallLocked(
178 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
179
180 // Called to reset the deadline based on the service config obtained
181 // from the resolver.
182 virtual void ResetDeadline(Duration timeout) = 0;
183
184 RefCountedPtr<DynamicFilters> dynamic_filters_;
185 };
186
187 class ClientChannelFilter::FilterBasedCallData final
188 : public ClientChannelFilter::CallData {
189 public:
190 static grpc_error_handle Init(grpc_call_element* elem,
191 const grpc_call_element_args* args);
192 static void Destroy(grpc_call_element* elem,
193 const grpc_call_final_info* final_info,
194 grpc_closure* then_schedule_closure);
195 static void StartTransportStreamOpBatch(
196 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
197 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
198
199 private:
200 class ResolverQueuedCallCanceller;
201
202 FilterBasedCallData(grpc_call_element* elem,
203 const grpc_call_element_args& args);
204 ~FilterBasedCallData() override;
205
elem() const206 grpc_call_element* elem() const { return elem_; }
owning_call() const207 grpc_call_stack* owning_call() const { return owning_call_; }
call_combiner() const208 CallCombiner* call_combiner() const { return call_combiner_; }
209
chand() const210 ClientChannelFilter* chand() const override {
211 return static_cast<ClientChannelFilter*>(elem()->channel_data);
212 }
arena() const213 Arena* arena() const override { return arena_; }
pollent()214 grpc_polling_entity* pollent() override { return pollent_; }
send_initial_metadata()215 grpc_metadata_batch* send_initial_metadata() override {
216 return pending_batches_[0]
217 ->payload->send_initial_metadata.send_initial_metadata;
218 }
219
220 // Returns the index into pending_batches_ to be used for batch.
221 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
222 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
223 static void FailPendingBatchInCallCombiner(void* arg,
224 grpc_error_handle error);
225 // A predicate type and some useful implementations for PendingBatchesFail().
226 typedef bool (*YieldCallCombinerPredicate)(
227 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)228 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
229 return true;
230 }
NoYieldCallCombiner(const CallCombinerClosureList &)231 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
232 return false;
233 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)234 static bool YieldCallCombinerIfPendingBatchesFound(
235 const CallCombinerClosureList& closures) {
236 return closures.size() > 0;
237 }
238 // Fails all pending batches.
239 // If yield_call_combiner_predicate returns true, assumes responsibility for
240 // yielding the call combiner.
241 void PendingBatchesFail(
242 grpc_error_handle error,
243 YieldCallCombinerPredicate yield_call_combiner_predicate);
244 static void ResumePendingBatchInCallCombiner(void* arg,
245 grpc_error_handle ignored);
246 // Resumes all pending batches on dynamic_call_.
247 void PendingBatchesResume();
248
249 // Called to check for a resolution result, both when the call is
250 // initially started and when it is queued and the channel gets a new
251 // resolution result.
252 void TryCheckResolution(bool was_queued);
253
254 void OnAddToQueueLocked() override
255 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
256
257 void RetryCheckResolutionLocked() override
258 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
259
ResetDeadline(Duration timeout)260 void ResetDeadline(Duration timeout) override {
261 const Timestamp per_method_deadline =
262 Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
263 arena_->GetContext<Call>()->UpdateDeadline(per_method_deadline);
264 }
265
266 void CreateDynamicCall();
267
268 static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
269 void* arg, grpc_error_handle error);
270
271 grpc_slice path_; // Request path.
272 gpr_cycle_counter call_start_time_;
273 Timestamp deadline_;
274
275 Arena* const arena_;
276 grpc_call_element* const elem_;
277 grpc_call_stack* const owning_call_;
278 CallCombiner* const call_combiner_;
279
280 grpc_polling_entity* pollent_ = nullptr;
281
282 // Accessed while holding ClientChannelFilter::resolution_mu_.
283 ResolverQueuedCallCanceller* resolver_call_canceller_
284 ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_) = nullptr;
285
286 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
287 grpc_closure recv_trailing_metadata_ready_;
288
289 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
290
291 // Batches are added to this list when received from above.
292 // They are removed when we are done handling the batch (i.e., when
293 // either we have invoked all of the batch's callbacks or we have
294 // passed the batch down to the LB call and are not intercepting any of
295 // its callbacks).
296 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
297
298 // Set when we get a cancel_stream op.
299 grpc_error_handle cancel_error_;
300 };
301
302 //
303 // Filter vtable
304 //
305
306 const grpc_channel_filter ClientChannelFilter::kFilter = {
307 ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
308 ClientChannelFilter::StartTransportOp,
309 sizeof(ClientChannelFilter::FilterBasedCallData),
310 ClientChannelFilter::FilterBasedCallData::Init,
311 ClientChannelFilter::FilterBasedCallData::SetPollent,
312 ClientChannelFilter::FilterBasedCallData::Destroy,
313 sizeof(ClientChannelFilter),
314 ClientChannelFilter::Init,
315 grpc_channel_stack_no_post_init,
316 ClientChannelFilter::Destroy,
317 ClientChannelFilter::GetChannelInfo,
318 GRPC_UNIQUE_TYPE_NAME_HERE("client-channel"),
319 };
320
321 //
322 // dynamic termination filter
323 //
324
325 namespace {
326
GetServiceConfigCallData(Arena * arena)327 ClientChannelServiceConfigCallData* GetServiceConfigCallData(Arena* arena) {
328 return DownCast<ClientChannelServiceConfigCallData*>(
329 arena->GetContext<ServiceConfigCallData>());
330 }
331
332 class DynamicTerminationFilter final {
333 public:
334 class CallData;
335
336 static const grpc_channel_filter kFilterVtable;
337
Init(grpc_channel_element * elem,grpc_channel_element_args * args)338 static grpc_error_handle Init(grpc_channel_element* elem,
339 grpc_channel_element_args* args) {
340 CHECK(args->is_last);
341 CHECK(elem->filter == &kFilterVtable);
342 new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
343 return absl::OkStatus();
344 }
345
Destroy(grpc_channel_element * elem)346 static void Destroy(grpc_channel_element* elem) {
347 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
348 chand->~DynamicTerminationFilter();
349 }
350
351 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)352 static void StartTransportOp(grpc_channel_element* /*elem*/,
353 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)354 static void GetChannelInfo(grpc_channel_element* /*elem*/,
355 const grpc_channel_info* /*info*/) {}
356
357 private:
DynamicTerminationFilter(const ChannelArgs & args)358 explicit DynamicTerminationFilter(const ChannelArgs& args)
359 : chand_(args.GetObject<ClientChannelFilter>()) {}
360
361 ClientChannelFilter* chand_;
362 };
363
364 class DynamicTerminationFilter::CallData final {
365 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)366 static grpc_error_handle Init(grpc_call_element* elem,
367 const grpc_call_element_args* args) {
368 new (elem->call_data) CallData(*args);
369 return absl::OkStatus();
370 }
371
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)372 static void Destroy(grpc_call_element* elem,
373 const grpc_call_final_info* /*final_info*/,
374 grpc_closure* then_schedule_closure) {
375 auto* calld = static_cast<CallData*>(elem->call_data);
376 RefCountedPtr<SubchannelCall> subchannel_call;
377 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
378 subchannel_call = calld->lb_call_->subchannel_call();
379 }
380 calld->~CallData();
381 if (GPR_LIKELY(subchannel_call != nullptr)) {
382 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
383 } else {
384 // TODO(yashkt) : This can potentially be a Closure::Run
385 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
386 }
387 }
388
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)389 static void StartTransportStreamOpBatch(
390 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
391 auto* calld = static_cast<CallData*>(elem->call_data);
392 calld->lb_call_->StartTransportStreamOpBatch(batch);
393 }
394
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)395 static void SetPollent(grpc_call_element* elem,
396 grpc_polling_entity* pollent) {
397 auto* calld = static_cast<CallData*>(elem->call_data);
398 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
399 ClientChannelFilter* client_channel = chand->chand_;
400 grpc_call_element_args args = {calld->owning_call_, nullptr,
401 calld->path_,
402 /*start_time=*/0, calld->deadline_,
403 calld->arena_, calld->call_combiner_};
404 auto* service_config_call_data = GetServiceConfigCallData(calld->arena_);
405 calld->lb_call_ = client_channel->CreateLoadBalancedCall(
406 args, pollent, nullptr,
407 [service_config_call_data]() { service_config_call_data->Commit(); },
408 /*is_transparent_retry=*/false);
409 GRPC_TRACE_LOG(client_channel_call, INFO)
410 << "chand=" << chand << " dynamic_termination_calld=" << client_channel
411 << ": create lb_call=" << calld->lb_call_.get();
412 }
413
414 private:
CallData(const grpc_call_element_args & args)415 explicit CallData(const grpc_call_element_args& args)
416 : path_(CSliceRef(args.path)),
417 deadline_(args.deadline),
418 arena_(args.arena),
419 owning_call_(args.call_stack),
420 call_combiner_(args.call_combiner) {}
421
~CallData()422 ~CallData() { CSliceUnref(path_); }
423
424 grpc_slice path_; // Request path.
425 Timestamp deadline_;
426 Arena* arena_;
427 grpc_call_stack* owning_call_;
428 CallCombiner* call_combiner_;
429
430 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
431 };
432
433 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
434 DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
435 DynamicTerminationFilter::StartTransportOp,
436 sizeof(DynamicTerminationFilter::CallData),
437 DynamicTerminationFilter::CallData::Init,
438 DynamicTerminationFilter::CallData::SetPollent,
439 DynamicTerminationFilter::CallData::Destroy,
440 sizeof(DynamicTerminationFilter),
441 DynamicTerminationFilter::Init,
442 grpc_channel_stack_no_post_init,
443 DynamicTerminationFilter::Destroy,
444 DynamicTerminationFilter::GetChannelInfo,
445 GRPC_UNIQUE_TYPE_NAME_HERE("dynamic_filter_termination"),
446 };
447
448 } // namespace
449
450 //
451 // ClientChannelFilter::ResolverResultHandler
452 //
453
454 class ClientChannelFilter::ResolverResultHandler final
455 : public Resolver::ResultHandler {
456 public:
ResolverResultHandler(ClientChannelFilter * chand)457 explicit ResolverResultHandler(ClientChannelFilter* chand) : chand_(chand) {
458 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
459 }
460
~ResolverResultHandler()461 ~ResolverResultHandler() override {
462 GRPC_TRACE_LOG(client_channel, INFO)
463 << "chand=" << chand_ << ": resolver shutdown complete";
464 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
465 }
466
ReportResult(Resolver::Result result)467 void ReportResult(Resolver::Result result) override
468 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
469 chand_->OnResolverResultChangedLocked(std::move(result));
470 }
471
472 private:
473 ClientChannelFilter* chand_;
474 };
475
476 //
477 // ClientChannelFilter::SubchannelWrapper
478 //
479
480 // This class is a wrapper for Subchannel that hides details of the
481 // channel's implementation (such as the connected subchannel) from the
482 // LB policy API.
483 //
484 // Note that no synchronization is needed here, because even if the
485 // underlying subchannel is shared between channels, this wrapper will only
486 // be used within one channel, so it will always be synchronized by the
487 // control plane work_serializer.
488 class ClientChannelFilter::SubchannelWrapper final
489 : public SubchannelInterface {
490 public:
SubchannelWrapper(ClientChannelFilter * chand,RefCountedPtr<Subchannel> subchannel)491 SubchannelWrapper(ClientChannelFilter* chand,
492 RefCountedPtr<Subchannel> subchannel)
493 : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(client_channel)
494 ? "SubchannelWrapper"
495 : nullptr),
496 chand_(chand),
497 subchannel_(std::move(subchannel)) {
498 GRPC_TRACE_LOG(client_channel, INFO)
499 << "chand=" << chand << ": creating subchannel wrapper " << this
500 << " for subchannel " << subchannel_.get();
501 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
502 #ifndef NDEBUG
503 DCHECK(chand_->work_serializer_->RunningInWorkSerializer());
504 #endif
505 if (chand_->channelz_node_ != nullptr) {
506 auto* subchannel_node = subchannel_->channelz_node();
507 if (subchannel_node != nullptr) {
508 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
509 if (it == chand_->subchannel_refcount_map_.end()) {
510 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
511 it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
512 .first;
513 }
514 ++it->second;
515 }
516 }
517 chand_->subchannel_wrappers_.insert(this);
518 }
519
~SubchannelWrapper()520 ~SubchannelWrapper() override {
521 GRPC_TRACE_LOG(client_channel, INFO)
522 << "chand=" << chand_ << ": destroying subchannel wrapper " << this
523 << "for subchannel " << subchannel_.get();
524 if (!IsWorkSerializerDispatchEnabled()) {
525 chand_->subchannel_wrappers_.erase(this);
526 if (chand_->channelz_node_ != nullptr) {
527 auto* subchannel_node = subchannel_->channelz_node();
528 if (subchannel_node != nullptr) {
529 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
530 CHECK(it != chand_->subchannel_refcount_map_.end());
531 --it->second;
532 if (it->second == 0) {
533 chand_->channelz_node_->RemoveChildSubchannel(
534 subchannel_node->uuid());
535 chand_->subchannel_refcount_map_.erase(it);
536 }
537 }
538 }
539 }
540 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
541 }
542
Orphaned()543 void Orphaned() override {
544 if (!IsWorkSerializerDispatchEnabled()) return;
545 // Make sure we clean up the channel's subchannel maps inside the
546 // WorkSerializer.
547 // Ref held by callback.
548 WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
549 chand_->work_serializer_->Run(
550 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
551 chand_->subchannel_wrappers_.erase(this);
552 if (chand_->channelz_node_ != nullptr) {
553 auto* subchannel_node = subchannel_->channelz_node();
554 if (subchannel_node != nullptr) {
555 auto it =
556 chand_->subchannel_refcount_map_.find(subchannel_.get());
557 CHECK(it != chand_->subchannel_refcount_map_.end());
558 --it->second;
559 if (it->second == 0) {
560 chand_->channelz_node_->RemoveChildSubchannel(
561 subchannel_node->uuid());
562 chand_->subchannel_refcount_map_.erase(it);
563 }
564 }
565 }
566 WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
567 },
568 DEBUG_LOCATION);
569 }
570
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)571 void WatchConnectivityState(
572 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
573 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
574 auto& watcher_wrapper = watcher_map_[watcher.get()];
575 CHECK_EQ(watcher_wrapper, nullptr);
576 watcher_wrapper = new WatcherWrapper(
577 std::move(watcher),
578 RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
579 subchannel_->WatchConnectivityState(
580 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
581 watcher_wrapper));
582 }
583
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)584 void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
585 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
586 auto it = watcher_map_.find(watcher);
587 CHECK(it != watcher_map_.end());
588 subchannel_->CancelConnectivityStateWatch(it->second);
589 watcher_map_.erase(it);
590 }
591
connected_subchannel() const592 RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
593 return subchannel_->connected_subchannel();
594 }
595
RequestConnection()596 void RequestConnection() override { subchannel_->RequestConnection(); }
597
ResetBackoff()598 void ResetBackoff() override { subchannel_->ResetBackoff(); }
599
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)600 void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
601 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
602 static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
603 ->SetSubchannel(subchannel_.get());
604 CHECK(data_watchers_.insert(std::move(watcher)).second);
605 }
606
CancelDataWatcher(DataWatcherInterface * watcher)607 void CancelDataWatcher(DataWatcherInterface* watcher) override
608 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
609 auto it = data_watchers_.find(watcher);
610 if (it != data_watchers_.end()) data_watchers_.erase(it);
611 }
612
ThrottleKeepaliveTime(int new_keepalive_time)613 void ThrottleKeepaliveTime(int new_keepalive_time) {
614 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
615 }
616
address() const617 std::string address() const override { return subchannel_->address(); }
618
619 private:
620 // This wrapper provides a bridge between the internal Subchannel API
621 // and the SubchannelInterface API that we expose to LB policies.
622 // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
623 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
624 // that was passed in by the LB policy. We pass an instance of this
625 // class to the underlying Subchannel, and when we get updates from
626 // the subchannel, we pass those on to the wrapped watcher to return
627 // the update to the LB policy.
628 //
629 // This class handles things like hopping into the WorkSerializer
630 // before passing notifications to the LB policy and propagating
631 // keepalive information between subchannels.
632 class WatcherWrapper final
633 : public Subchannel::ConnectivityStateWatcherInterface {
634 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)635 WatcherWrapper(
636 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
637 watcher,
638 RefCountedPtr<SubchannelWrapper> parent)
639 : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
640
~WatcherWrapper()641 ~WatcherWrapper() override {
642 if (!IsWorkSerializerDispatchEnabled()) {
643 auto* parent = parent_.release(); // ref owned by lambda
644 parent->chand_->work_serializer_->Run(
645 [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
646 *parent_->chand_->work_serializer_) {
647 parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
648 },
649 DEBUG_LOCATION);
650 return;
651 }
652 parent_.reset(DEBUG_LOCATION, "WatcherWrapper");
653 }
654
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)655 void OnConnectivityStateChange(
656 RefCountedPtr<ConnectivityStateWatcherInterface> self,
657 grpc_connectivity_state state, const absl::Status& status) override {
658 GRPC_TRACE_LOG(client_channel, INFO)
659 << "chand=" << parent_->chand_
660 << ": connectivity change for subchannel wrapper " << parent_.get()
661 << " subchannel " << parent_->subchannel_.get()
662 << "hopping into work_serializer";
663 self.release(); // Held by callback.
664 parent_->chand_->work_serializer_->Run(
665 [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
666 *parent_->chand_->work_serializer_) {
667 ApplyUpdateInControlPlaneWorkSerializer(state, status);
668 Unref();
669 },
670 DEBUG_LOCATION);
671 }
672
interested_parties()673 grpc_pollset_set* interested_parties() override {
674 return watcher_->interested_parties();
675 }
676
677 private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)678 void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
679 const absl::Status& status)
680 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
681 GRPC_TRACE_LOG(client_channel, INFO)
682 << "chand=" << parent_->chand_
683 << ": processing connectivity change in work serializer for "
684 "subchannel wrapper "
685 << parent_.get() << " subchannel " << parent_->subchannel_.get()
686 << " watcher=" << watcher_.get()
687 << " state=" << ConnectivityStateName(state) << " status=" << status;
688 absl::optional<absl::Cord> keepalive_throttling =
689 status.GetPayload(kKeepaliveThrottlingKey);
690 if (keepalive_throttling.has_value()) {
691 int new_keepalive_time = -1;
692 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
693 &new_keepalive_time)) {
694 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
695 parent_->chand_->keepalive_time_ = new_keepalive_time;
696 GRPC_TRACE_LOG(client_channel, INFO)
697 << "chand=" << parent_->chand_
698 << ": throttling keepalive time to "
699 << parent_->chand_->keepalive_time_;
700 // Propagate the new keepalive time to all subchannels. This is so
701 // that new transports created by any subchannel (and not just the
702 // subchannel that received the GOAWAY), use the new keepalive time.
703 for (auto* subchannel_wrapper :
704 parent_->chand_->subchannel_wrappers_) {
705 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
706 }
707 }
708 } else {
709 LOG(ERROR) << "chand=" << parent_->chand_
710 << ": Illegal keepalive throttling value "
711 << std::string(keepalive_throttling.value());
712 }
713 }
714 // Propagate status only in state TF.
715 // We specifically want to avoid propagating the status for
716 // state IDLE that the real subchannel gave us only for the
717 // purpose of keepalive propagation.
718 watcher_->OnConnectivityStateChange(
719 state,
720 state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
721 }
722
723 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
724 watcher_;
725 RefCountedPtr<SubchannelWrapper> parent_;
726 };
727
728 // A heterogenous lookup comparator for data watchers that allows
729 // unique_ptr keys to be looked up as raw pointers.
730 struct DataWatcherLessThan {
731 using is_transparent = void;
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan732 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
733 const std::unique_ptr<DataWatcherInterface>& p2) const {
734 return p1 < p2;
735 }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan736 bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
737 const DataWatcherInterface* p2) const {
738 return p1.get() < p2;
739 }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan740 bool operator()(const DataWatcherInterface* p1,
741 const std::unique_ptr<DataWatcherInterface>& p2) const {
742 return p1 < p2.get();
743 }
744 };
745
746 ClientChannelFilter* chand_;
747 RefCountedPtr<Subchannel> subchannel_;
748 // Maps from the address of the watcher passed to us by the LB policy
749 // to the address of the WrapperWatcher that we passed to the underlying
750 // subchannel. This is needed so that when the LB policy calls
751 // CancelConnectivityStateWatch() with its watcher, we know the
752 // corresponding WrapperWatcher to cancel on the underlying subchannel.
753 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
754 ABSL_GUARDED_BY(*chand_->work_serializer_);
755 std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
756 data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
757 };
758
759 //
760 // ClientChannelFilter::ExternalConnectivityWatcher
761 //
762
ExternalConnectivityWatcher(ClientChannelFilter * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)763 ClientChannelFilter::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
764 ClientChannelFilter* chand, grpc_polling_entity pollent,
765 grpc_connectivity_state* state, grpc_closure* on_complete,
766 grpc_closure* watcher_timer_init)
767 : chand_(chand),
768 pollent_(pollent),
769 initial_state_(*state),
770 state_(state),
771 on_complete_(on_complete),
772 watcher_timer_init_(watcher_timer_init) {
773 grpc_polling_entity_add_to_pollset_set(&pollent_,
774 chand_->interested_parties_);
775 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
776 {
777 MutexLock lock(&chand_->external_watchers_mu_);
778 // Will be deleted when the watch is complete.
779 CHECK(chand->external_watchers_[on_complete] == nullptr);
780 // Store a ref to the watcher in the external_watchers_ map.
781 chand->external_watchers_[on_complete] =
782 RefAsSubclass<ExternalConnectivityWatcher>(
783 DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
784 }
785 // Pass the ref from creating the object to Start().
786 chand_->work_serializer_->Run(
787 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
788 // The ref is passed to AddWatcherLocked().
789 AddWatcherLocked();
790 },
791 DEBUG_LOCATION);
792 }
793
794 ClientChannelFilter::ExternalConnectivityWatcher::
~ExternalConnectivityWatcher()795 ~ExternalConnectivityWatcher() {
796 grpc_polling_entity_del_from_pollset_set(&pollent_,
797 chand_->interested_parties_);
798 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
799 "ExternalConnectivityWatcher");
800 }
801
802 void ClientChannelFilter::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannelFilter * chand,grpc_closure * on_complete,bool cancel)803 RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
804 grpc_closure* on_complete,
805 bool cancel) {
806 RefCountedPtr<ExternalConnectivityWatcher> watcher;
807 {
808 MutexLock lock(&chand->external_watchers_mu_);
809 auto it = chand->external_watchers_.find(on_complete);
810 if (it != chand->external_watchers_.end()) {
811 watcher = std::move(it->second);
812 chand->external_watchers_.erase(it);
813 }
814 }
815 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
816 // the mutex before calling it.
817 if (watcher != nullptr && cancel) watcher->Cancel();
818 }
819
Notify(grpc_connectivity_state state,const absl::Status &)820 void ClientChannelFilter::ExternalConnectivityWatcher::Notify(
821 grpc_connectivity_state state, const absl::Status& /* status */) {
822 bool done = false;
823 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
824 std::memory_order_relaxed)) {
825 return; // Already done.
826 }
827 // Remove external watcher.
828 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
829 chand_, on_complete_, /*cancel=*/false);
830 // Report new state to the user.
831 *state_ = state;
832 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
833 // Hop back into the work_serializer to clean up.
834 // Not needed in state SHUTDOWN, because the tracker will
835 // automatically remove all watchers in that case.
836 // Note: The callback takes a ref in case the ref inside the state tracker
837 // gets removed before the callback runs via a SHUTDOWN notification.
838 if (state != GRPC_CHANNEL_SHUTDOWN) {
839 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
840 chand_->work_serializer_->Run(
841 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
842 RemoveWatcherLocked();
843 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
844 },
845 DEBUG_LOCATION);
846 }
847 }
848
Cancel()849 void ClientChannelFilter::ExternalConnectivityWatcher::Cancel() {
850 bool done = false;
851 if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
852 std::memory_order_relaxed)) {
853 return; // Already done.
854 }
855 ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
856 // Hop back into the work_serializer to clean up.
857 // Note: The callback takes a ref in case the ref inside the state tracker
858 // gets removed before the callback runs via a SHUTDOWN notification.
859 Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
860 chand_->work_serializer_->Run(
861 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
862 RemoveWatcherLocked();
863 Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
864 },
865 DEBUG_LOCATION);
866 }
867
AddWatcherLocked()868 void ClientChannelFilter::ExternalConnectivityWatcher::AddWatcherLocked() {
869 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
870 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
871 chand_->state_tracker_.AddWatcher(
872 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
873 }
874
RemoveWatcherLocked()875 void ClientChannelFilter::ExternalConnectivityWatcher::RemoveWatcherLocked() {
876 chand_->state_tracker_.RemoveWatcher(this);
877 }
878
879 //
880 // ClientChannelFilter::ConnectivityWatcherAdder
881 //
882
883 class ClientChannelFilter::ConnectivityWatcherAdder final {
884 public:
ConnectivityWatcherAdder(ClientChannelFilter * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)885 ConnectivityWatcherAdder(
886 ClientChannelFilter* chand, grpc_connectivity_state initial_state,
887 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
888 : chand_(chand),
889 initial_state_(initial_state),
890 watcher_(std::move(watcher)) {
891 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
892 chand_->work_serializer_->Run(
893 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
894 AddWatcherLocked();
895 },
896 DEBUG_LOCATION);
897 }
898
899 private:
AddWatcherLocked()900 void AddWatcherLocked()
901 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
902 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
903 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
904 delete this;
905 }
906
907 ClientChannelFilter* chand_;
908 grpc_connectivity_state initial_state_;
909 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
910 };
911
912 //
913 // ClientChannelFilter::ConnectivityWatcherRemover
914 //
915
916 class ClientChannelFilter::ConnectivityWatcherRemover final {
917 public:
ConnectivityWatcherRemover(ClientChannelFilter * chand,AsyncConnectivityStateWatcherInterface * watcher)918 ConnectivityWatcherRemover(ClientChannelFilter* chand,
919 AsyncConnectivityStateWatcherInterface* watcher)
920 : chand_(chand), watcher_(watcher) {
921 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
922 chand_->work_serializer_->Run(
923 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
924 RemoveWatcherLocked();
925 },
926 DEBUG_LOCATION);
927 }
928
929 private:
RemoveWatcherLocked()930 void RemoveWatcherLocked()
931 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
932 chand_->state_tracker_.RemoveWatcher(watcher_);
933 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
934 "ConnectivityWatcherRemover");
935 delete this;
936 }
937
938 ClientChannelFilter* chand_;
939 AsyncConnectivityStateWatcherInterface* watcher_;
940 };
941
942 //
943 // ClientChannelFilter::ClientChannelControlHelper
944 //
945
946 class ClientChannelFilter::ClientChannelControlHelper final
947 : public LoadBalancingPolicy::ChannelControlHelper {
948 public:
ClientChannelControlHelper(ClientChannelFilter * chand)949 explicit ClientChannelControlHelper(ClientChannelFilter* chand)
950 : chand_(chand) {
951 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
952 }
953
~ClientChannelControlHelper()954 ~ClientChannelControlHelper() override {
955 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
956 "ClientChannelControlHelper");
957 }
958
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)959 RefCountedPtr<SubchannelInterface> CreateSubchannel(
960 const grpc_resolved_address& address, const ChannelArgs& per_address_args,
961 const ChannelArgs& args) override
962 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
963 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
964 ChannelArgs subchannel_args = Subchannel::MakeSubchannelArgs(
965 args, per_address_args, chand_->subchannel_pool_,
966 chand_->default_authority_);
967 // Create subchannel.
968 RefCountedPtr<Subchannel> subchannel =
969 chand_->client_channel_factory_->CreateSubchannel(address,
970 subchannel_args);
971 if (subchannel == nullptr) return nullptr;
972 // Make sure the subchannel has updated keepalive time.
973 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
974 // Create and return wrapper for the subchannel.
975 return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
976 }
977
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)978 void UpdateState(grpc_connectivity_state state, const absl::Status& status,
979 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
980 override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
981 if (chand_->resolver_ == nullptr) return; // Shutting down.
982 GRPC_TRACE_LOG(client_channel, INFO)
983 << "chand=" << chand_
984 << ": update: state=" << ConnectivityStateName(state) << " status=("
985 << status << ") picker=" << picker.get()
986 << (chand_->disconnect_error_.ok()
987 ? ""
988 : " (ignoring -- channel shutting down)");
989 // Do update only if not shutting down.
990 if (chand_->disconnect_error_.ok()) {
991 chand_->UpdateStateAndPickerLocked(state, status, "helper",
992 std::move(picker));
993 }
994 }
995
RequestReresolution()996 void RequestReresolution() override
997 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
998 if (chand_->resolver_ == nullptr) return; // Shutting down.
999 GRPC_TRACE_LOG(client_channel, INFO)
1000 << "chand=" << chand_ << ": started name re-resolving";
1001 chand_->resolver_->RequestReresolutionLocked();
1002 }
1003
GetTarget()1004 absl::string_view GetTarget() override { return chand_->target_uri_; }
1005
GetAuthority()1006 absl::string_view GetAuthority() override {
1007 return chand_->default_authority_;
1008 }
1009
GetChannelCredentials()1010 RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
1011 return chand_->channel_args_.GetObject<grpc_channel_credentials>()
1012 ->duplicate_without_call_credentials();
1013 }
1014
GetUnsafeChannelCredentials()1015 RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
1016 override {
1017 return chand_->channel_args_.GetObject<grpc_channel_credentials>()->Ref();
1018 }
1019
GetEventEngine()1020 grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
1021 return chand_->owning_stack_->EventEngine();
1022 }
1023
GetStatsPluginGroup()1024 GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
1025 return *chand_->owning_stack_->stats_plugin_group;
1026 }
1027
AddTraceEvent(TraceSeverity severity,absl::string_view message)1028 void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1029 ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1030 if (chand_->resolver_ == nullptr) return; // Shutting down.
1031 if (chand_->channelz_node_ != nullptr) {
1032 chand_->channelz_node_->AddTraceEvent(
1033 ConvertSeverityEnum(severity),
1034 grpc_slice_from_copied_buffer(message.data(), message.size()));
1035 }
1036 }
1037
1038 private:
ConvertSeverityEnum(TraceSeverity severity)1039 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1040 TraceSeverity severity) {
1041 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1042 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1043 return channelz::ChannelTrace::Error;
1044 }
1045
1046 ClientChannelFilter* chand_;
1047 };
1048
1049 //
1050 // ClientChannelFilter implementation
1051 //
1052
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1053 grpc_error_handle ClientChannelFilter::Init(grpc_channel_element* elem,
1054 grpc_channel_element_args* args) {
1055 CHECK(args->is_last);
1056 CHECK(elem->filter == &kFilter);
1057 grpc_error_handle error;
1058 new (elem->channel_data) ClientChannelFilter(args, &error);
1059 return error;
1060 }
1061
Destroy(grpc_channel_element * elem)1062 void ClientChannelFilter::Destroy(grpc_channel_element* elem) {
1063 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1064 chand->~ClientChannelFilter();
1065 }
1066
1067 namespace {
1068
GetSubchannelPool(const ChannelArgs & args)1069 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1070 const ChannelArgs& args) {
1071 if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1072 return MakeRefCounted<LocalSubchannelPool>();
1073 }
1074 return GlobalSubchannelPool::instance();
1075 }
1076
1077 } // namespace
1078
ClientChannelFilter(grpc_channel_element_args * args,grpc_error_handle * error)1079 ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
1080 grpc_error_handle* error)
1081 : channel_args_(args->channel_args),
1082 owning_stack_(args->channel_stack),
1083 client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1084 channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1085 interested_parties_(grpc_pollset_set_create()),
1086 service_config_parser_index_(
1087 internal::ClientChannelServiceConfigParser::ParserIndex()),
1088 work_serializer_(
1089 std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
1090 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1091 subchannel_pool_(GetSubchannelPool(channel_args_)) {
1092 GRPC_TRACE_LOG(client_channel, INFO)
1093 << "chand=" << this << ": creating client_channel for channel stack "
1094 << owning_stack_;
1095 // Start backup polling.
1096 grpc_client_channel_start_backup_polling(interested_parties_);
1097 // Check client channel factory.
1098 if (client_channel_factory_ == nullptr) {
1099 *error = GRPC_ERROR_CREATE(
1100 "Missing client channel factory in args for client channel filter");
1101 return;
1102 }
1103 // Get default service config. If none is specified via the client API,
1104 // we use an empty config.
1105 absl::optional<absl::string_view> service_config_json =
1106 channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1107 if (!service_config_json.has_value()) service_config_json = "{}";
1108 *error = absl::OkStatus();
1109 auto service_config =
1110 ServiceConfigImpl::Create(channel_args_, *service_config_json);
1111 if (!service_config.ok()) {
1112 *error = absl_status_to_grpc_error(service_config.status());
1113 return;
1114 }
1115 default_service_config_ = std::move(*service_config);
1116 // Get URI to resolve, using proxy mapper if needed.
1117 absl::optional<std::string> target_uri =
1118 channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1119 if (!target_uri.has_value()) {
1120 *error = GRPC_ERROR_CREATE(
1121 "target URI channel arg missing or wrong type in client channel "
1122 "filter");
1123 return;
1124 }
1125 target_uri_ = std::move(*target_uri);
1126 uri_to_resolve_ = CoreConfiguration::Get()
1127 .proxy_mapper_registry()
1128 .MapName(target_uri_, &channel_args_)
1129 .value_or(target_uri_);
1130 // Make sure the URI to resolve is valid, so that we know that
1131 // resolver creation will succeed later.
1132 if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1133 uri_to_resolve_)) {
1134 *error = GRPC_ERROR_CREATE(
1135 absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1136 return;
1137 }
1138 // Strip out service config channel arg, so that it doesn't affect
1139 // subchannel uniqueness when the args flow down to that layer.
1140 channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1141 // Set initial keepalive time.
1142 auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1143 if (keepalive_arg.has_value()) {
1144 keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1145 } else {
1146 keepalive_time_ = -1; // unset
1147 }
1148 // Set default authority.
1149 absl::optional<std::string> default_authority =
1150 channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1151 if (!default_authority.has_value()) {
1152 default_authority_ =
1153 CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1154 target_uri_);
1155 } else {
1156 default_authority_ = std::move(*default_authority);
1157 }
1158 // Success.
1159 *error = absl::OkStatus();
1160 }
1161
~ClientChannelFilter()1162 ClientChannelFilter::~ClientChannelFilter() {
1163 GRPC_TRACE_LOG(client_channel, INFO)
1164 << "chand=" << this << ": destroying channel";
1165 DestroyResolverAndLbPolicyLocked();
1166 // Stop backup polling.
1167 grpc_client_channel_stop_backup_polling(interested_parties_);
1168 grpc_pollset_set_destroy(interested_parties_);
1169 }
1170
1171 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1172 ClientChannelFilter::CreateLoadBalancedCall(
1173 const grpc_call_element_args& args, grpc_polling_entity* pollent,
1174 grpc_closure* on_call_destruction_complete,
1175 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1176 promise_detail::Context<Arena> arena_ctx(args.arena);
1177 return OrphanablePtr<FilterBasedLoadBalancedCall>(
1178 args.arena->New<FilterBasedLoadBalancedCall>(
1179 this, args, pollent, on_call_destruction_complete,
1180 std::move(on_commit), is_transparent_retry));
1181 }
1182
ReprocessQueuedResolverCalls()1183 void ClientChannelFilter::ReprocessQueuedResolverCalls() {
1184 for (CallData* calld : resolver_queued_calls_) {
1185 calld->RemoveCallFromResolverQueuedCallsLocked();
1186 calld->RetryCheckResolutionLocked();
1187 }
1188 resolver_queued_calls_.clear();
1189 }
1190
1191 namespace {
1192
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1193 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1194 const Resolver::Result& resolver_result,
1195 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1196 // Prefer the LB policy config found in the service config.
1197 if (parsed_service_config->parsed_lb_config() != nullptr) {
1198 return parsed_service_config->parsed_lb_config();
1199 }
1200 // Try the deprecated LB policy name from the service config.
1201 // If not, try the setting from channel args.
1202 absl::optional<absl::string_view> policy_name;
1203 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1204 policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1205 } else {
1206 policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1207 bool requires_config = false;
1208 if (policy_name.has_value() &&
1209 (!CoreConfiguration::Get()
1210 .lb_policy_registry()
1211 .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1212 requires_config)) {
1213 if (requires_config) {
1214 LOG(ERROR) << "LB policy: " << *policy_name
1215 << " passed through channel_args must not "
1216 "require a config. Using pick_first instead.";
1217 } else {
1218 LOG(ERROR) << "LB policy: " << *policy_name
1219 << " passed through channel_args does not exist. "
1220 "Using pick_first instead.";
1221 }
1222 policy_name = "pick_first";
1223 }
1224 }
1225 // Use pick_first if nothing was specified and we didn't select grpclb
1226 // above.
1227 if (!policy_name.has_value()) policy_name = "pick_first";
1228 // Now that we have the policy name, construct an empty config for it.
1229 Json config_json = Json::FromArray({Json::FromObject({
1230 {std::string(*policy_name), Json::FromObject({})},
1231 })});
1232 auto lb_policy_config =
1233 CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1234 config_json);
1235 // The policy name came from one of three places:
1236 // - The deprecated loadBalancingPolicy field in the service config,
1237 // in which case the code in ClientChannelServiceConfigParser
1238 // already verified that the policy does not require a config.
1239 // - One of the hard-coded values here, all of which are known to not
1240 // require a config.
1241 // - A channel arg, in which case we check that the specified policy exists
1242 // and accepts an empty config. If not, we revert to using pick_first
1243 // lb_policy
1244 CHECK(lb_policy_config.ok());
1245 return std::move(*lb_policy_config);
1246 }
1247
1248 } // namespace
1249
OnResolverResultChangedLocked(Resolver::Result result)1250 void ClientChannelFilter::OnResolverResultChangedLocked(
1251 Resolver::Result result) {
1252 // Handle race conditions.
1253 if (resolver_ == nullptr) return;
1254 GRPC_TRACE_LOG(client_channel, INFO)
1255 << "chand=" << this << ": got resolver result";
1256 // Grab resolver result health callback.
1257 auto resolver_callback = std::move(result.result_health_callback);
1258 absl::Status resolver_result_status;
1259 // We only want to trace the address resolution in the follow cases:
1260 // (a) Address resolution resulted in service config change.
1261 // (b) Address resolution that causes number of backends to go from
1262 // zero to non-zero.
1263 // (c) Address resolution that causes number of backends to go from
1264 // non-zero to zero.
1265 // (d) Address resolution that causes a new LB policy to be created.
1266 //
1267 // We track a list of strings to eventually be concatenated and traced.
1268 std::vector<const char*> trace_strings;
1269 const bool resolution_contains_addresses =
1270 result.addresses.ok() && !result.addresses->empty();
1271 if (!resolution_contains_addresses &&
1272 previous_resolution_contained_addresses_) {
1273 trace_strings.push_back("Address list became empty");
1274 } else if (resolution_contains_addresses &&
1275 !previous_resolution_contained_addresses_) {
1276 trace_strings.push_back("Address list became non-empty");
1277 }
1278 previous_resolution_contained_addresses_ = resolution_contains_addresses;
1279 std::string service_config_error_string_storage;
1280 if (!result.service_config.ok()) {
1281 service_config_error_string_storage =
1282 result.service_config.status().ToString();
1283 trace_strings.push_back(service_config_error_string_storage.c_str());
1284 }
1285 // Choose the service config.
1286 RefCountedPtr<ServiceConfig> service_config;
1287 RefCountedPtr<ConfigSelector> config_selector;
1288 if (!result.service_config.ok()) {
1289 GRPC_TRACE_LOG(client_channel, INFO)
1290 << "chand=" << this << ": resolver returned service config error: "
1291 << result.service_config.status();
1292 // If the service config was invalid, then fallback to the
1293 // previously returned service config.
1294 if (saved_service_config_ != nullptr) {
1295 GRPC_TRACE_LOG(client_channel, INFO)
1296 << "chand=" << this
1297 << ": resolver returned invalid service config. "
1298 "Continuing to use previous service config.";
1299 service_config = saved_service_config_;
1300 config_selector = saved_config_selector_;
1301 } else {
1302 // We received a service config error and we don't have a
1303 // previous service config to fall back to. Put the channel into
1304 // TRANSIENT_FAILURE.
1305 OnResolverErrorLocked(result.service_config.status());
1306 trace_strings.push_back("no valid service config");
1307 resolver_result_status =
1308 absl::UnavailableError("no valid service config");
1309 }
1310 } else if (*result.service_config == nullptr) {
1311 // Resolver did not return any service config.
1312 GRPC_TRACE_LOG(client_channel, INFO)
1313 << "chand=" << this
1314 << ": resolver returned no service config. Using default service "
1315 "config for channel.";
1316 service_config = default_service_config_;
1317 } else {
1318 // Use ServiceConfig and ConfigSelector returned by resolver.
1319 service_config = std::move(*result.service_config);
1320 config_selector = result.args.GetObjectRef<ConfigSelector>();
1321 }
1322 // Remove the config selector from channel args so that we're not holding
1323 // unnecessary refs that cause it to be destroyed somewhere other than in the
1324 // WorkSerializer.
1325 result.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1326 // Note: The only case in which service_config is null here is if the resolver
1327 // returned a service config error and we don't have a previous service
1328 // config to fall back to.
1329 if (service_config != nullptr) {
1330 // Extract global config for client channel.
1331 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1332 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1333 service_config->GetGlobalParsedConfig(
1334 service_config_parser_index_));
1335 // Choose LB policy config.
1336 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1337 ChooseLbPolicy(result, parsed_service_config);
1338 // Check if the ServiceConfig has changed.
1339 const bool service_config_changed =
1340 saved_service_config_ == nullptr ||
1341 service_config->json_string() != saved_service_config_->json_string();
1342 // Check if the ConfigSelector has changed.
1343 const bool config_selector_changed = !ConfigSelector::Equals(
1344 saved_config_selector_.get(), config_selector.get());
1345 // If either has changed, apply the global parameters now.
1346 if (service_config_changed || config_selector_changed) {
1347 // Update service config in control plane.
1348 UpdateServiceConfigInControlPlaneLocked(
1349 std::move(service_config), std::move(config_selector),
1350 std::string(lb_policy_config->name()));
1351 } else {
1352 GRPC_TRACE_LOG(client_channel, INFO)
1353 << "chand=" << this << ": service config not changed";
1354 }
1355 // Create or update LB policy, as needed.
1356 ChannelArgs new_args = result.args;
1357 resolver_result_status = CreateOrUpdateLbPolicyLocked(
1358 std::move(lb_policy_config),
1359 parsed_service_config->health_check_service_name(), std::move(result));
1360 if (service_config_changed || config_selector_changed) {
1361 // Start using new service config for calls.
1362 // This needs to happen after the LB policy has been updated, since
1363 // the ConfigSelector may need the LB policy to know about new
1364 // destinations before it can send RPCs to those destinations.
1365 UpdateServiceConfigInDataPlaneLocked(new_args);
1366 // TODO(ncteisen): might be worth somehow including a snippet of the
1367 // config in the trace, at the risk of bloating the trace logs.
1368 trace_strings.push_back("Service config changed");
1369 }
1370 }
1371 // Invoke resolver callback if needed.
1372 if (resolver_callback != nullptr) {
1373 resolver_callback(std::move(resolver_result_status));
1374 }
1375 // Add channel trace event.
1376 if (!trace_strings.empty()) {
1377 std::string message =
1378 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1379 if (channelz_node_ != nullptr) {
1380 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1381 grpc_slice_from_cpp_string(message));
1382 }
1383 }
1384 }
1385
OnResolverErrorLocked(absl::Status status)1386 void ClientChannelFilter::OnResolverErrorLocked(absl::Status status) {
1387 if (resolver_ == nullptr) return;
1388 GRPC_TRACE_LOG(client_channel, INFO)
1389 << "chand=" << this << ": resolver transient failure: " << status;
1390 // If we already have an LB policy from a previous resolution
1391 // result, then we continue to let it set the connectivity state.
1392 // Otherwise, we go into TRANSIENT_FAILURE.
1393 if (lb_policy_ == nullptr) {
1394 // Update connectivity state.
1395 UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1396 "resolver failure");
1397 {
1398 MutexLock lock(&resolution_mu_);
1399 // Update resolver transient failure.
1400 resolver_transient_failure_error_ =
1401 MaybeRewriteIllegalStatusCode(status, "resolver");
1402 ReprocessQueuedResolverCalls();
1403 }
1404 }
1405 }
1406
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1407 absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked(
1408 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1409 const absl::optional<std::string>& health_check_service_name,
1410 Resolver::Result result) {
1411 // Construct update.
1412 LoadBalancingPolicy::UpdateArgs update_args;
1413 if (!result.addresses.ok()) {
1414 update_args.addresses = result.addresses.status();
1415 } else {
1416 update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1417 std::move(*result.addresses));
1418 }
1419 update_args.config = std::move(lb_policy_config);
1420 update_args.resolution_note = std::move(result.resolution_note);
1421 update_args.args = std::move(result.args);
1422 // Add health check service name to channel args.
1423 if (health_check_service_name.has_value()) {
1424 update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1425 *health_check_service_name);
1426 }
1427 // Create policy if needed.
1428 if (lb_policy_ == nullptr) {
1429 lb_policy_ = CreateLbPolicyLocked(update_args.args);
1430 }
1431 // Update the policy.
1432 GRPC_TRACE_LOG(client_channel, INFO)
1433 << "chand=" << this << ": Updating child policy " << lb_policy_.get();
1434 return lb_policy_->UpdateLocked(std::move(update_args));
1435 }
1436
1437 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1438 OrphanablePtr<LoadBalancingPolicy> ClientChannelFilter::CreateLbPolicyLocked(
1439 const ChannelArgs& args) {
1440 // The LB policy will start in state CONNECTING but will not
1441 // necessarily send us an update synchronously, so set state to
1442 // CONNECTING (in case the resolver had previously failed and put the
1443 // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1444 UpdateStateAndPickerLocked(
1445 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1446 MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1447 // Now create the LB policy.
1448 LoadBalancingPolicy::Args lb_policy_args;
1449 lb_policy_args.work_serializer = work_serializer_;
1450 lb_policy_args.channel_control_helper =
1451 std::make_unique<ClientChannelControlHelper>(this);
1452 lb_policy_args.args = args;
1453 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1454 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1455 &client_channel_trace);
1456 GRPC_TRACE_LOG(client_channel, INFO)
1457 << "chand=" << this << ": created new LB policy " << lb_policy.get();
1458 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1459 interested_parties_);
1460 return lb_policy;
1461 }
1462
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1463 void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
1464 RefCountedPtr<ServiceConfig> service_config,
1465 RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1466 std::string service_config_json(service_config->json_string());
1467 GRPC_TRACE_LOG(client_channel, INFO)
1468 << "chand=" << this << ": using service config: \"" << service_config_json
1469 << "\"";
1470 // Save service config.
1471 saved_service_config_ = std::move(service_config);
1472 // Swap out the data used by GetChannelInfo().
1473 {
1474 MutexLock lock(&info_mu_);
1475 info_lb_policy_name_ = std::move(lb_policy_name);
1476 info_service_config_json_ = std::move(service_config_json);
1477 }
1478 // Save config selector.
1479 saved_config_selector_ = std::move(config_selector);
1480 GRPC_TRACE_LOG(client_channel, INFO)
1481 << "chand=" << this << ": using ConfigSelector "
1482 << saved_config_selector_.get();
1483 }
1484
UpdateServiceConfigInDataPlaneLocked(const ChannelArgs & args)1485 void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked(
1486 const ChannelArgs& args) {
1487 // Grab ref to service config.
1488 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1489 // Grab ref to config selector. Use default if resolver didn't supply one.
1490 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1491 GRPC_TRACE_LOG(client_channel, INFO)
1492 << "chand=" << this << ": switching to ConfigSelector "
1493 << saved_config_selector_.get();
1494 if (config_selector == nullptr) {
1495 config_selector =
1496 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1497 }
1498 // Modify channel args.
1499 ChannelArgs new_args = args.SetObject(this).SetObject(service_config);
1500 bool enable_retries =
1501 !new_args.WantMinimalStack() &&
1502 new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1503 // Construct dynamic filter stack.
1504 std::vector<const grpc_channel_filter*> filters =
1505 config_selector->GetFilters();
1506 if (enable_retries) {
1507 filters.push_back(&RetryFilter::kVtable);
1508 } else {
1509 filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1510 }
1511 auto new_blackboard = MakeRefCounted<Blackboard>();
1512 RefCountedPtr<DynamicFilters> dynamic_filters = DynamicFilters::Create(
1513 new_args, std::move(filters), blackboard_.get(), new_blackboard.get());
1514 CHECK(dynamic_filters != nullptr);
1515 blackboard_ = std::move(new_blackboard);
1516 // Grab data plane lock to update service config.
1517 //
1518 // We defer unreffing the old values (and deallocating memory) until
1519 // after releasing the lock to keep the critical section small.
1520 {
1521 MutexLock lock(&resolution_mu_);
1522 resolver_transient_failure_error_ = absl::OkStatus();
1523 // Update service config.
1524 received_service_config_data_ = true;
1525 // Old values will be unreffed after lock is released.
1526 service_config_.swap(service_config);
1527 config_selector_.swap(config_selector);
1528 dynamic_filters_.swap(dynamic_filters);
1529 // Re-process queued calls asynchronously.
1530 ReprocessQueuedResolverCalls();
1531 }
1532 // Old values will be unreffed after lock is released when they go out
1533 // of scope.
1534 }
1535
CreateResolverLocked()1536 void ClientChannelFilter::CreateResolverLocked() {
1537 GRPC_TRACE_LOG(client_channel, INFO)
1538 << "chand=" << this << ": starting name resolution for "
1539 << uri_to_resolve_;
1540 resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1541 uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1542 std::make_unique<ResolverResultHandler>(this));
1543 // Since the validity of the args was checked when the channel was created,
1544 // CreateResolver() must return a non-null result.
1545 CHECK(resolver_ != nullptr);
1546 UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1547 "started resolving");
1548 resolver_->StartLocked();
1549 GRPC_TRACE_LOG(client_channel, INFO)
1550 << "chand=" << this << ": created resolver=" << resolver_.get();
1551 }
1552
DestroyResolverAndLbPolicyLocked()1553 void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() {
1554 if (resolver_ != nullptr) {
1555 GRPC_TRACE_LOG(client_channel, INFO)
1556 << "chand=" << this << ": shutting down resolver=" << resolver_.get();
1557 resolver_.reset();
1558 // Clear resolution state.
1559 saved_service_config_.reset();
1560 saved_config_selector_.reset();
1561 // Acquire resolution lock to update config selector and associated state.
1562 // To minimize lock contention, we wait to unref these objects until
1563 // after we release the lock.
1564 RefCountedPtr<ServiceConfig> service_config_to_unref;
1565 RefCountedPtr<ConfigSelector> config_selector_to_unref;
1566 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1567 {
1568 MutexLock lock(&resolution_mu_);
1569 received_service_config_data_ = false;
1570 service_config_to_unref = std::move(service_config_);
1571 config_selector_to_unref = std::move(config_selector_);
1572 dynamic_filters_to_unref = std::move(dynamic_filters_);
1573 }
1574 // Clear LB policy if set.
1575 if (lb_policy_ != nullptr) {
1576 GRPC_TRACE_LOG(client_channel, INFO)
1577 << "chand=" << this
1578 << ": shutting down lb_policy=" << lb_policy_.get();
1579 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1580 interested_parties_);
1581 lb_policy_.reset();
1582 }
1583 }
1584 }
1585
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1586 void ClientChannelFilter::UpdateStateLocked(grpc_connectivity_state state,
1587 const absl::Status& status,
1588 const char* reason) {
1589 if (state != GRPC_CHANNEL_SHUTDOWN &&
1590 state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1591 Crash("Illegal transition SHUTDOWN -> anything");
1592 }
1593 state_tracker_.SetState(state, status, reason);
1594 if (channelz_node_ != nullptr) {
1595 channelz_node_->SetConnectivityState(state);
1596 channelz_node_->AddTraceEvent(
1597 channelz::ChannelTrace::Severity::Info,
1598 grpc_slice_from_static_string(
1599 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1600 state)));
1601 }
1602 }
1603
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1604 void ClientChannelFilter::UpdateStateAndPickerLocked(
1605 grpc_connectivity_state state, const absl::Status& status,
1606 const char* reason,
1607 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1608 UpdateStateLocked(state, status, reason);
1609 // Grab the LB lock to update the picker and trigger reprocessing of the
1610 // queued picks.
1611 // Old picker will be unreffed after releasing the lock.
1612 MutexLock lock(&lb_mu_);
1613 picker_.swap(picker);
1614 // Reprocess queued picks.
1615 for (auto& call : lb_queued_calls_) {
1616 call->RemoveCallFromLbQueuedCallsLocked();
1617 call->RetryPickLocked();
1618 }
1619 lb_queued_calls_.clear();
1620 }
1621
1622 namespace {
1623
1624 // TODO(roth): Remove this in favor of src/core/util/match.h once
1625 // we can do that without breaking lock annotations.
1626 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1627 T HandlePickResult(
1628 LoadBalancingPolicy::PickResult* result,
1629 std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1630 std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1631 std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1632 std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1633 auto* complete_pick =
1634 absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1635 if (complete_pick != nullptr) {
1636 return complete_func(complete_pick);
1637 }
1638 auto* queue_pick =
1639 absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1640 if (queue_pick != nullptr) {
1641 return queue_func(queue_pick);
1642 }
1643 auto* fail_pick =
1644 absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1645 if (fail_pick != nullptr) {
1646 return fail_func(fail_pick);
1647 }
1648 auto* drop_pick =
1649 absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1650 CHECK_NE(drop_pick, nullptr);
1651 return drop_func(drop_pick);
1652 }
1653
1654 } // namespace
1655
DoPingLocked(grpc_transport_op * op)1656 grpc_error_handle ClientChannelFilter::DoPingLocked(grpc_transport_op* op) {
1657 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1658 return GRPC_ERROR_CREATE("channel not connected");
1659 }
1660 LoadBalancingPolicy::PickResult result;
1661 {
1662 MutexLock lock(&lb_mu_);
1663 result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1664 }
1665 return HandlePickResult<grpc_error_handle>(
1666 &result,
1667 // Complete pick.
1668 [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1669 ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1670 *ClientChannelFilter::work_serializer_) {
1671 SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1672 complete_pick->subchannel.get());
1673 RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1674 subchannel->connected_subchannel();
1675 if (connected_subchannel == nullptr) {
1676 return GRPC_ERROR_CREATE("LB pick for ping not connected");
1677 }
1678 connected_subchannel->Ping(op->send_ping.on_initiate,
1679 op->send_ping.on_ack);
1680 return absl::OkStatus();
1681 },
1682 // Queue pick.
1683 [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1684 return GRPC_ERROR_CREATE("LB picker queued call");
1685 },
1686 // Fail pick.
1687 [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1688 return absl_status_to_grpc_error(fail_pick->status);
1689 },
1690 // Drop pick.
1691 [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1692 return absl_status_to_grpc_error(drop_pick->status);
1693 });
1694 }
1695
StartTransportOpLocked(grpc_transport_op * op)1696 void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) {
1697 // Connectivity watch.
1698 if (op->start_connectivity_watch != nullptr) {
1699 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1700 std::move(op->start_connectivity_watch));
1701 }
1702 if (op->stop_connectivity_watch != nullptr) {
1703 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1704 }
1705 // Ping.
1706 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1707 grpc_error_handle error = DoPingLocked(op);
1708 if (!error.ok()) {
1709 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1710 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1711 }
1712 op->bind_pollset = nullptr;
1713 op->send_ping.on_initiate = nullptr;
1714 op->send_ping.on_ack = nullptr;
1715 }
1716 // Reset backoff.
1717 if (op->reset_connect_backoff) {
1718 if (lb_policy_ != nullptr) {
1719 lb_policy_->ResetBackoffLocked();
1720 }
1721 }
1722 // Disconnect or enter IDLE.
1723 if (!op->disconnect_with_error.ok()) {
1724 GRPC_TRACE_LOG(client_channel, INFO)
1725 << "chand=" << this << ": disconnect_with_error: "
1726 << StatusToString(op->disconnect_with_error);
1727 DestroyResolverAndLbPolicyLocked();
1728 intptr_t value;
1729 if (grpc_error_get_int(op->disconnect_with_error,
1730 StatusIntProperty::ChannelConnectivityState,
1731 &value) &&
1732 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1733 if (disconnect_error_.ok()) { // Ignore if we're shutting down.
1734 // Enter IDLE state.
1735 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1736 "channel entering IDLE", nullptr);
1737 // TODO(roth): Do we need to check for any queued picks here, in
1738 // case there's a race condition in the client_idle filter?
1739 // And maybe also check for calls in the resolver queue?
1740 }
1741 } else {
1742 // Disconnect.
1743 CHECK(disconnect_error_.ok());
1744 disconnect_error_ = op->disconnect_with_error;
1745 UpdateStateAndPickerLocked(
1746 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1747 MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1748 grpc_error_to_absl_status(op->disconnect_with_error)));
1749 // TODO(roth): If this happens when we're still waiting for a
1750 // resolver result, we need to trigger failures for all calls in
1751 // the resolver queue here.
1752 }
1753 }
1754 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1755 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1756 }
1757
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1758 void ClientChannelFilter::StartTransportOp(grpc_channel_element* elem,
1759 grpc_transport_op* op) {
1760 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1761 CHECK(op->set_accept_stream == false);
1762 // Handle bind_pollset.
1763 if (op->bind_pollset != nullptr) {
1764 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1765 }
1766 // Pop into control plane work_serializer for remaining ops.
1767 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1768 chand->work_serializer_->Run(
1769 [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1770 chand->StartTransportOpLocked(op);
1771 },
1772 DEBUG_LOCATION);
1773 }
1774
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1775 void ClientChannelFilter::GetChannelInfo(grpc_channel_element* elem,
1776 const grpc_channel_info* info) {
1777 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1778 MutexLock lock(&chand->info_mu_);
1779 if (info->lb_policy_name != nullptr) {
1780 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
1781 }
1782 if (info->service_config_json != nullptr) {
1783 *info->service_config_json =
1784 gpr_strdup(chand->info_service_config_json_.c_str());
1785 }
1786 }
1787
TryToConnectLocked()1788 void ClientChannelFilter::TryToConnectLocked() {
1789 if (disconnect_error_.ok()) {
1790 if (lb_policy_ != nullptr) {
1791 lb_policy_->ExitIdleLocked();
1792 } else if (resolver_ == nullptr) {
1793 CreateResolverLocked();
1794 }
1795 }
1796 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1797 }
1798
CheckConnectivityState(bool try_to_connect)1799 grpc_connectivity_state ClientChannelFilter::CheckConnectivityState(
1800 bool try_to_connect) {
1801 // state_tracker_ is guarded by work_serializer_, which we're not
1802 // holding here. But the one method of state_tracker_ that *is*
1803 // thread-safe to call without external synchronization is the state()
1804 // method, so we can disable thread-safety analysis for this one read.
1805 grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1806 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1807 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1808 work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1809 *work_serializer_) { TryToConnectLocked(); },
1810 DEBUG_LOCATION);
1811 }
1812 return out;
1813 }
1814
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1815 void ClientChannelFilter::AddConnectivityWatcher(
1816 grpc_connectivity_state initial_state,
1817 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1818 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1819 }
1820
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1821 void ClientChannelFilter::RemoveConnectivityWatcher(
1822 AsyncConnectivityStateWatcherInterface* watcher) {
1823 new ConnectivityWatcherRemover(this, watcher);
1824 }
1825
1826 //
1827 // CallData implementation
1828 //
1829
RemoveCallFromResolverQueuedCallsLocked()1830 void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() {
1831 GRPC_TRACE_LOG(client_channel_call, INFO)
1832 << "chand=" << chand() << " calld=" << this
1833 << ": removing from resolver queued picks list";
1834 // Remove call's pollent from channel's interested_parties.
1835 grpc_polling_entity_del_from_pollset_set(pollent(),
1836 chand()->interested_parties_);
1837 // Note: There's no need to actually remove the call from the queue
1838 // here, because that will be done in
1839 // ResolverQueuedCallCanceller::CancelLocked() or
1840 // ClientChannelFilter::ReprocessQueuedResolverCalls().
1841 }
1842
AddCallToResolverQueuedCallsLocked()1843 void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() {
1844 GRPC_TRACE_LOG(client_channel_call, INFO)
1845 << "chand=" << chand() << " calld=" << this
1846 << ": adding to resolver queued picks list; pollent="
1847 << grpc_polling_entity_string(pollent());
1848 // Add call's pollent to channel's interested_parties, so that I/O
1849 // can be done under the call's CQ.
1850 grpc_polling_entity_add_to_pollset_set(pollent(),
1851 chand()->interested_parties_);
1852 // Add to queue.
1853 chand()->resolver_queued_calls_.insert(this);
1854 OnAddToQueueLocked();
1855 }
1856
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)1857 grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked(
1858 const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
1859 GRPC_TRACE_LOG(client_channel_call, INFO)
1860 << "chand=" << chand() << " calld=" << this
1861 << ": applying service config to call";
1862 if (!config_selector.ok()) return config_selector.status();
1863 // Create a ClientChannelServiceConfigCallData for the call. This stores
1864 // a ref to the ServiceConfig and caches the right set of parsed configs
1865 // to use for the call. The ClientChannelServiceConfigCallData will store
1866 // itself in the call context, so that it can be accessed by filters
1867 // below us in the stack, and it will be cleaned up when the call ends.
1868 auto* service_config_call_data =
1869 arena()->New<ClientChannelServiceConfigCallData>(arena());
1870 // Use the ConfigSelector to determine the config for the call.
1871 absl::Status call_config_status =
1872 (*config_selector)
1873 ->GetCallConfig(
1874 {send_initial_metadata(), arena(), service_config_call_data});
1875 if (!call_config_status.ok()) {
1876 return absl_status_to_grpc_error(
1877 MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
1878 }
1879 // Apply our own method params to the call.
1880 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
1881 service_config_call_data->GetMethodParsedConfig(
1882 chand()->service_config_parser_index_));
1883 if (method_params != nullptr) {
1884 // If the deadline from the service config is shorter than the one
1885 // from the client API, reset the deadline timer.
1886 if (method_params->timeout() != Duration::Zero()) {
1887 ResetDeadline(method_params->timeout());
1888 }
1889 // If the service config set wait_for_ready and the application
1890 // did not explicitly set it, use the value from the service config.
1891 auto* wait_for_ready =
1892 send_initial_metadata()->GetOrCreatePointer(WaitForReady());
1893 if (method_params->wait_for_ready().has_value() &&
1894 !wait_for_ready->explicitly_set) {
1895 wait_for_ready->value = method_params->wait_for_ready().value();
1896 }
1897 }
1898 return absl::OkStatus();
1899 }
1900
CheckResolution(bool was_queued)1901 absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
1902 bool was_queued) {
1903 // Check if we have a resolver result to use.
1904 absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
1905 {
1906 MutexLock lock(&chand()->resolution_mu_);
1907 bool result_ready = CheckResolutionLocked(&config_selector);
1908 // If no result is available, queue the call.
1909 if (!result_ready) {
1910 AddCallToResolverQueuedCallsLocked();
1911 return absl::nullopt;
1912 }
1913 }
1914 // We have a result. Apply service config to call.
1915 grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
1916 // ConfigSelector must be unreffed inside the WorkSerializer.
1917 if (!IsWorkSerializerDispatchEnabled() && config_selector.ok()) {
1918 chand()->work_serializer_->Run(
1919 [config_selector = std::move(*config_selector)]() mutable {
1920 config_selector.reset();
1921 },
1922 DEBUG_LOCATION);
1923 }
1924 // Handle errors.
1925 if (!error.ok()) {
1926 GRPC_TRACE_LOG(client_channel_call, INFO)
1927 << "chand=" << chand() << " calld=" << this
1928 << ": error applying config to call: error=" << StatusToString(error);
1929 return error;
1930 }
1931 // If the call was queued, add trace annotation.
1932 if (was_queued) {
1933 auto* call_tracer = arena()->GetContext<CallTracerAnnotationInterface>();
1934 if (call_tracer != nullptr) {
1935 call_tracer->RecordAnnotation("Delayed name resolution complete.");
1936 }
1937 }
1938 return absl::OkStatus();
1939 }
1940
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)1941 bool ClientChannelFilter::CallData::CheckResolutionLocked(
1942 absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
1943 // If we don't yet have a resolver result, we need to queue the call
1944 // until we get one.
1945 if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
1946 // If the resolver returned transient failure before returning the
1947 // first service config, fail any non-wait_for_ready calls.
1948 absl::Status resolver_error = chand()->resolver_transient_failure_error_;
1949 if (!resolver_error.ok() &&
1950 !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
1951 GRPC_TRACE_LOG(client_channel_call, INFO)
1952 << "chand=" << chand() << " calld=" << this
1953 << ": resolution failed, failing call";
1954 *config_selector = absl_status_to_grpc_error(resolver_error);
1955 return true;
1956 }
1957 // Either the resolver has not yet returned a result, or it has
1958 // returned transient failure but the call is wait_for_ready. In
1959 // either case, queue the call.
1960 GRPC_TRACE_LOG(client_channel_call, INFO)
1961 << "chand=" << chand() << " calld=" << this
1962 << ": no resolver result yet";
1963 return false;
1964 }
1965 // Result found.
1966 *config_selector = chand()->config_selector_;
1967 dynamic_filters_ = chand()->dynamic_filters_;
1968 return true;
1969 }
1970
1971 //
1972 // FilterBasedCallData implementation
1973 //
1974
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)1975 ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
1976 grpc_call_element* elem, const grpc_call_element_args& args)
1977 : path_(CSliceRef(args.path)),
1978 call_start_time_(args.start_time),
1979 deadline_(args.deadline),
1980 arena_(args.arena),
1981 elem_(elem),
1982 owning_call_(args.call_stack),
1983 call_combiner_(args.call_combiner) {
1984 GRPC_TRACE_LOG(client_channel_call, INFO)
1985 << "chand=" << chand() << " calld=" << this << ": created call";
1986 }
1987
~FilterBasedCallData()1988 ClientChannelFilter::FilterBasedCallData::~FilterBasedCallData() {
1989 CSliceUnref(path_);
1990 // Make sure there are no remaining pending batches.
1991 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1992 CHECK_EQ(pending_batches_[i], nullptr);
1993 }
1994 }
1995
Init(grpc_call_element * elem,const grpc_call_element_args * args)1996 grpc_error_handle ClientChannelFilter::FilterBasedCallData::Init(
1997 grpc_call_element* elem, const grpc_call_element_args* args) {
1998 new (elem->call_data) FilterBasedCallData(elem, *args);
1999 return absl::OkStatus();
2000 }
2001
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2002 void ClientChannelFilter::FilterBasedCallData::Destroy(
2003 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2004 grpc_closure* then_schedule_closure) {
2005 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2006 RefCountedPtr<DynamicFilters::Call> dynamic_call =
2007 std::move(calld->dynamic_call_);
2008 calld->~FilterBasedCallData();
2009 if (GPR_LIKELY(dynamic_call != nullptr)) {
2010 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2011 } else {
2012 // TODO(yashkt) : This can potentially be a Closure::Run
2013 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
2014 }
2015 }
2016
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2017 void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
2018 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2019 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2020 auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2021 if (GRPC_TRACE_FLAG_ENABLED(client_channel_call) &&
2022 !GRPC_TRACE_FLAG_ENABLED(channel)) {
2023 LOG(INFO) << "chand=" << chand << " calld=" << calld
2024 << ": batch started from above: "
2025 << grpc_transport_stream_op_batch_string(batch, false);
2026 }
2027 // Intercept recv_trailing_metadata to commit the call, in case we wind up
2028 // failing the call before we get down to the retry or LB call layer.
2029 if (batch->recv_trailing_metadata) {
2030 calld->original_recv_trailing_metadata_ready_ =
2031 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2032 GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2033 RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2034 calld, nullptr);
2035 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2036 &calld->recv_trailing_metadata_ready_;
2037 }
2038 // If we already have a dynamic call, pass the batch down to it.
2039 // Note that once we have done so, we do not need to acquire the channel's
2040 // resolution mutex, which is more efficient (especially for streaming calls).
2041 if (calld->dynamic_call_ != nullptr) {
2042 GRPC_TRACE_LOG(client_channel_call, INFO)
2043 << "chand=" << chand << " calld=" << calld
2044 << ": starting batch on dynamic_call=" << calld->dynamic_call_.get();
2045 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2046 return;
2047 }
2048 // We do not yet have a dynamic call.
2049 //
2050 // If we've previously been cancelled, immediately fail any new batches.
2051 if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2052 GRPC_TRACE_LOG(client_channel_call, INFO)
2053 << "chand=" << chand << " calld=" << calld
2054 << ": failing batch with error: "
2055 << StatusToString(calld->cancel_error_);
2056 // Note: This will release the call combiner.
2057 grpc_transport_stream_op_batch_finish_with_failure(
2058 batch, calld->cancel_error_, calld->call_combiner());
2059 return;
2060 }
2061 // Handle cancellation.
2062 if (GPR_UNLIKELY(batch->cancel_stream)) {
2063 // Stash a copy of cancel_error in our call data, so that we can use
2064 // it for subsequent operations. This ensures that if the call is
2065 // cancelled before any batches are passed down (e.g., if the deadline
2066 // is in the past when the call starts), we can return the right
2067 // error to the caller when the first batch does get passed down.
2068 calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2069 GRPC_TRACE_LOG(client_channel_call, INFO)
2070 << "chand=" << chand << " calld=" << calld
2071 << ": recording cancel_error=" << StatusToString(calld->cancel_error_);
2072 // Fail all pending batches.
2073 calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2074 // Note: This will release the call combiner.
2075 grpc_transport_stream_op_batch_finish_with_failure(
2076 batch, calld->cancel_error_, calld->call_combiner());
2077 return;
2078 }
2079 // Add the batch to the pending list.
2080 calld->PendingBatchesAdd(batch);
2081 // For batches containing a send_initial_metadata op, acquire the
2082 // channel's resolution mutex to apply the service config to the call,
2083 // after which we will create a dynamic call.
2084 if (GPR_LIKELY(batch->send_initial_metadata)) {
2085 GRPC_TRACE_LOG(client_channel_call, INFO)
2086 << "chand=" << chand << " calld=" << calld
2087 << ": grabbing resolution mutex to apply service ";
2088 // If we're still in IDLE, we need to start resolving.
2089 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2090 GRPC_CHANNEL_IDLE)) {
2091 GRPC_TRACE_LOG(client_channel_call, INFO)
2092 << "chand=" << chand << " calld=" << calld
2093 << ": triggering exit idle";
2094 // Bounce into the control plane work serializer to start resolving.
2095 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2096 chand->work_serializer_->Run(
2097 [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2098 chand->CheckConnectivityState(/*try_to_connect=*/true);
2099 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2100 },
2101 DEBUG_LOCATION);
2102 }
2103 calld->TryCheckResolution(/*was_queued=*/false);
2104 } else {
2105 // For all other batches, release the call combiner.
2106 GRPC_TRACE_LOG(client_channel_call, INFO)
2107 << "chand=" << chand << " calld=" << calld
2108 << ": saved batch, yielding call combiner";
2109 GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2110 "batch does not include send_initial_metadata");
2111 }
2112 }
2113
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2114 void ClientChannelFilter::FilterBasedCallData::SetPollent(
2115 grpc_call_element* elem, grpc_polling_entity* pollent) {
2116 auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2117 calld->pollent_ = pollent;
2118 }
2119
GetBatchIndex(grpc_transport_stream_op_batch * batch)2120 size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex(
2121 grpc_transport_stream_op_batch* batch) {
2122 // Note: It is important the send_initial_metadata be the first entry
2123 // here, since the code in CheckResolution() assumes it will be.
2124 if (batch->send_initial_metadata) return 0;
2125 if (batch->send_message) return 1;
2126 if (batch->send_trailing_metadata) return 2;
2127 if (batch->recv_initial_metadata) return 3;
2128 if (batch->recv_message) return 4;
2129 if (batch->recv_trailing_metadata) return 5;
2130 GPR_UNREACHABLE_CODE(return (size_t)-1);
2131 }
2132
2133 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2134 void ClientChannelFilter::FilterBasedCallData::PendingBatchesAdd(
2135 grpc_transport_stream_op_batch* batch) {
2136 const size_t idx = GetBatchIndex(batch);
2137 GRPC_TRACE_LOG(client_channel_call, INFO)
2138 << "chand=" << chand() << " calld=" << this
2139 << ": adding pending batch at index " << idx;
2140 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2141 CHECK_EQ(pending, nullptr);
2142 pending = batch;
2143 }
2144
2145 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2146 void ClientChannelFilter::FilterBasedCallData::FailPendingBatchInCallCombiner(
2147 void* arg, grpc_error_handle error) {
2148 grpc_transport_stream_op_batch* batch =
2149 static_cast<grpc_transport_stream_op_batch*>(arg);
2150 auto* calld =
2151 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2152 // Note: This will release the call combiner.
2153 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2154 calld->call_combiner());
2155 }
2156
2157 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2158 void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail(
2159 grpc_error_handle error,
2160 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2161 CHECK(!error.ok());
2162 if (GRPC_TRACE_FLAG_ENABLED(client_channel_call)) {
2163 size_t num_batches = 0;
2164 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2165 if (pending_batches_[i] != nullptr) ++num_batches;
2166 }
2167 LOG(INFO) << "chand=" << chand() << " calld=" << this << ": failing "
2168 << num_batches << " pending batches: " << StatusToString(error);
2169 }
2170 CallCombinerClosureList closures;
2171 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2172 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2173 if (batch != nullptr) {
2174 batch->handler_private.extra_arg = this;
2175 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2176 FailPendingBatchInCallCombiner, batch,
2177 grpc_schedule_on_exec_ctx);
2178 closures.Add(&batch->handler_private.closure, error,
2179 "PendingBatchesFail");
2180 batch = nullptr;
2181 }
2182 }
2183 if (yield_call_combiner_predicate(closures)) {
2184 closures.RunClosures(call_combiner());
2185 } else {
2186 closures.RunClosuresWithoutYielding(call_combiner());
2187 }
2188 }
2189
2190 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2191 void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2192 void* arg, grpc_error_handle /*ignored*/) {
2193 grpc_transport_stream_op_batch* batch =
2194 static_cast<grpc_transport_stream_op_batch*>(arg);
2195 auto* calld =
2196 static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2197 // Note: This will release the call combiner.
2198 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2199 }
2200
2201 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2202 void ClientChannelFilter::FilterBasedCallData::PendingBatchesResume() {
2203 // Retries not enabled; send down batches as-is.
2204 if (GRPC_TRACE_FLAG_ENABLED(client_channel_call)) {
2205 size_t num_batches = 0;
2206 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2207 if (pending_batches_[i] != nullptr) ++num_batches;
2208 }
2209 LOG(INFO) << "chand=" << chand() << " calld=" << this << ": starting "
2210 << num_batches
2211 << " pending batches on dynamic_call=" << dynamic_call_.get();
2212 }
2213 CallCombinerClosureList closures;
2214 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2215 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2216 if (batch != nullptr) {
2217 batch->handler_private.extra_arg = this;
2218 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2219 ResumePendingBatchInCallCombiner, batch, nullptr);
2220 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2221 "resuming pending batch from client channel call");
2222 batch = nullptr;
2223 }
2224 }
2225 // Note: This will release the call combiner.
2226 closures.RunClosures(call_combiner());
2227 }
2228
2229 // A class to handle the call combiner cancellation callback for a
2230 // queued pick.
2231 class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller
2232 final {
2233 public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2234 explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2235 : calld_(calld) {
2236 GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2237 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2238 grpc_schedule_on_exec_ctx);
2239 calld->call_combiner()->SetNotifyOnCancel(&closure_);
2240 }
2241
2242 private:
CancelLocked(void * arg,grpc_error_handle error)2243 static void CancelLocked(void* arg, grpc_error_handle error) {
2244 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2245 auto* calld = self->calld_;
2246 auto* chand = calld->chand();
2247 {
2248 MutexLock lock(&chand->resolution_mu_);
2249 GRPC_TRACE_LOG(client_channel_call, INFO)
2250 << "chand=" << chand << " calld=" << calld
2251 << ": cancelling resolver queued pick: "
2252 "error="
2253 << StatusToString(error) << " self=" << self
2254 << " calld->resolver_pick_canceller="
2255 << calld->resolver_call_canceller_;
2256 if (calld->resolver_call_canceller_ == self && !error.ok()) {
2257 // Remove pick from list of queued picks.
2258 calld->RemoveCallFromResolverQueuedCallsLocked();
2259 chand->resolver_queued_calls_.erase(calld);
2260 // Fail pending batches on the call.
2261 calld->PendingBatchesFail(error,
2262 YieldCallCombinerIfPendingBatchesFound);
2263 }
2264 }
2265 GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2266 delete self;
2267 }
2268
2269 FilterBasedCallData* calld_;
2270 grpc_closure closure_;
2271 };
2272
TryCheckResolution(bool was_queued)2273 void ClientChannelFilter::FilterBasedCallData::TryCheckResolution(
2274 bool was_queued) {
2275 auto result = CheckResolution(was_queued);
2276 if (result.has_value()) {
2277 if (!result->ok()) {
2278 PendingBatchesFail(*result, YieldCallCombiner);
2279 return;
2280 }
2281 CreateDynamicCall();
2282 }
2283 }
2284
OnAddToQueueLocked()2285 void ClientChannelFilter::FilterBasedCallData::OnAddToQueueLocked() {
2286 // Register call combiner cancellation callback.
2287 resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2288 }
2289
RetryCheckResolutionLocked()2290 void ClientChannelFilter::FilterBasedCallData::RetryCheckResolutionLocked() {
2291 // Lame the call combiner canceller.
2292 resolver_call_canceller_ = nullptr;
2293 // Do an async callback to resume call processing, so that we're not
2294 // doing it while holding the channel's resolution mutex.
2295 chand()->owning_stack_->EventEngine()->Run([this]() {
2296 ApplicationCallbackExecCtx application_exec_ctx;
2297 ExecCtx exec_ctx;
2298 TryCheckResolution(/*was_queued=*/true);
2299 });
2300 }
2301
CreateDynamicCall()2302 void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() {
2303 DynamicFilters::Call::Args args = {dynamic_filters(), pollent_, path_,
2304 call_start_time_, deadline_, arena(),
2305 call_combiner()};
2306 grpc_error_handle error;
2307 DynamicFilters* channel_stack = args.channel_stack.get();
2308 GRPC_TRACE_LOG(client_channel_call, INFO)
2309 << "chand=" << chand() << " calld=" << this
2310 << ": creating dynamic call stack on channel_stack=" << channel_stack;
2311 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2312 if (!error.ok()) {
2313 GRPC_TRACE_LOG(client_channel_call, INFO)
2314 << "chand=" << chand() << " calld=" << this
2315 << ": failed to create dynamic call: error=" << StatusToString(error);
2316 PendingBatchesFail(error, YieldCallCombiner);
2317 return;
2318 }
2319 PendingBatchesResume();
2320 }
2321
2322 void ClientChannelFilter::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2323 RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2324 void* arg, grpc_error_handle error) {
2325 auto* calld = static_cast<FilterBasedCallData*>(arg);
2326 auto* chand = calld->chand();
2327 auto* service_config_call_data = GetServiceConfigCallData(calld->arena());
2328 GRPC_TRACE_LOG(client_channel_call, INFO)
2329 << "chand=" << chand << " calld=" << calld
2330 << ": got recv_trailing_metadata_ready: error=" << StatusToString(error)
2331 << " service_config_call_data=" << service_config_call_data;
2332 if (service_config_call_data != nullptr) {
2333 service_config_call_data->Commit();
2334 }
2335 // Chain to original callback.
2336 Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2337 error);
2338 }
2339
2340 //
2341 // ClientChannelFilter::LoadBalancedCall::LbCallState
2342 //
2343
2344 class ClientChannelFilter::LoadBalancedCall::LbCallState final
2345 : public ClientChannelLbCallState {
2346 public:
LbCallState(LoadBalancedCall * lb_call)2347 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2348
Alloc(size_t size)2349 void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
2350
2351 // Internal API to allow first-party LB policies to access per-call
2352 // attributes set by the ConfigSelector.
2353 ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2354 UniqueTypeName type) const override;
2355
2356 ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override;
2357
2358 private:
2359 LoadBalancedCall* lb_call_;
2360 };
2361
2362 //
2363 // ClientChannelFilter::LoadBalancedCall::LbCallState
2364 //
2365
2366 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2367 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttribute(
2368 UniqueTypeName type) const {
2369 auto* service_config_call_data = GetServiceConfigCallData(lb_call_->arena_);
2370 return service_config_call_data->GetCallAttribute(type);
2371 }
2372
2373 ClientCallTracer::CallAttemptTracer*
GetCallAttemptTracer() const2374 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttemptTracer()
2375 const {
2376 return lb_call_->call_attempt_tracer();
2377 }
2378
2379 //
2380 // ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor
2381 //
2382
2383 class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
2384 : public LoadBalancingPolicy::BackendMetricAccessor {
2385 public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2386 BackendMetricAccessor(LoadBalancedCall* lb_call,
2387 grpc_metadata_batch* recv_trailing_metadata)
2388 : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2389
GetBackendMetricData()2390 const BackendMetricData* GetBackendMetricData() override {
2391 if (lb_call_->backend_metric_data_ == nullptr &&
2392 recv_trailing_metadata_ != nullptr) {
2393 if (const auto* md = recv_trailing_metadata_->get_pointer(
2394 EndpointLoadMetricsBinMetadata())) {
2395 BackendMetricAllocator allocator(lb_call_->arena_);
2396 lb_call_->backend_metric_data_ =
2397 ParseBackendMetricData(md->as_string_view(), &allocator);
2398 }
2399 }
2400 return lb_call_->backend_metric_data_;
2401 }
2402
2403 private:
2404 class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
2405 public:
BackendMetricAllocator(Arena * arena)2406 explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2407
AllocateBackendMetricData()2408 BackendMetricData* AllocateBackendMetricData() override {
2409 return arena_->New<BackendMetricData>();
2410 }
2411
AllocateString(size_t size)2412 char* AllocateString(size_t size) override {
2413 return static_cast<char*>(arena_->Alloc(size));
2414 }
2415
2416 private:
2417 Arena* arena_;
2418 };
2419
2420 LoadBalancedCall* lb_call_;
2421 grpc_metadata_batch* recv_trailing_metadata_;
2422 };
2423
2424 //
2425 // ClientChannelFilter::LoadBalancedCall
2426 //
2427
2428 namespace {
2429
CreateCallAttemptTracer(Arena * arena,bool is_transparent_retry)2430 void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) {
2431 auto* call_tracer = DownCast<ClientCallTracer*>(
2432 arena->GetContext<CallTracerAnnotationInterface>());
2433 if (call_tracer == nullptr) return;
2434 auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2435 arena->SetContext<CallTracerInterface>(tracer);
2436 }
2437
2438 } // namespace
2439
LoadBalancedCall(ClientChannelFilter * chand,Arena * arena,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2440 ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
2441 ClientChannelFilter* chand, Arena* arena,
2442 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2443 : InternallyRefCounted(GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)
2444 ? "LoadBalancedCall"
2445 : nullptr),
2446 chand_(chand),
2447 on_commit_(std::move(on_commit)),
2448 arena_(arena) {
2449 CreateCallAttemptTracer(arena, is_transparent_retry);
2450 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2451 << "chand=" << chand_ << " lb_call=" << this << ": created";
2452 }
2453
~LoadBalancedCall()2454 ClientChannelFilter::LoadBalancedCall::~LoadBalancedCall() {
2455 if (backend_metric_data_ != nullptr) {
2456 backend_metric_data_->BackendMetricData::~BackendMetricData();
2457 }
2458 }
2459
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2460 void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion(
2461 absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2462 grpc_transport_stream_stats* transport_stream_stats,
2463 absl::string_view peer_address) {
2464 // If we have a tracer, notify it.
2465 if (call_attempt_tracer() != nullptr) {
2466 call_attempt_tracer()->RecordReceivedTrailingMetadata(
2467 status, recv_trailing_metadata, transport_stream_stats);
2468 }
2469 // If the LB policy requested a callback for trailing metadata, invoke
2470 // the callback.
2471 if (lb_subchannel_call_tracker_ != nullptr) {
2472 LbMetadata trailing_metadata(recv_trailing_metadata);
2473 BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2474 LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2475 peer_address, status, &trailing_metadata, &backend_metric_accessor};
2476 lb_subchannel_call_tracker_->Finish(args);
2477 lb_subchannel_call_tracker_.reset();
2478 }
2479 }
2480
RecordLatency()2481 void ClientChannelFilter::LoadBalancedCall::RecordLatency() {
2482 // Compute latency and report it to the tracer.
2483 if (call_attempt_tracer() != nullptr) {
2484 gpr_timespec latency =
2485 gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2486 call_attempt_tracer()->RecordEnd(latency);
2487 }
2488 }
2489
2490 void ClientChannelFilter::LoadBalancedCall::
RemoveCallFromLbQueuedCallsLocked()2491 RemoveCallFromLbQueuedCallsLocked() {
2492 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2493 << "chand=" << chand_ << " lb_call=" << this
2494 << ": removing from queued picks list";
2495 // Remove pollset_set linkage.
2496 grpc_polling_entity_del_from_pollset_set(pollent(),
2497 chand_->interested_parties_);
2498 // Note: There's no need to actually remove the call from the queue
2499 // here, because that will be done in either
2500 // LbQueuedCallCanceller::CancelLocked() or
2501 // in ClientChannelFilter::UpdateStateAndPickerLocked().
2502 }
2503
AddCallToLbQueuedCallsLocked()2504 void ClientChannelFilter::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2505 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2506 << "chand=" << chand_ << " lb_call=" << this
2507 << ": adding to queued picks list";
2508 // Add call's pollent to channel's interested_parties, so that I/O
2509 // can be done under the call's CQ.
2510 grpc_polling_entity_add_to_pollset_set(pollent(),
2511 chand_->interested_parties_);
2512 // Add to queue.
2513 chand_->lb_queued_calls_.insert(Ref());
2514 OnAddToQueueLocked();
2515 }
2516
2517 absl::optional<absl::Status>
PickSubchannel(bool was_queued)2518 ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
2519 // We may accumulate multiple pickers here, because if a picker says
2520 // to queue the call, we check again to see if the picker has been
2521 // updated before we queue it.
2522 // We need to unref pickers in the WorkSerializer.
2523 std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2524 auto cleanup = absl::MakeCleanup(
2525 [work_serializer = chand_->work_serializer_, &pickers]() {
2526 if (IsWorkSerializerDispatchEnabled()) return;
2527 work_serializer->Run(
2528 [pickers = std::move(pickers)]() mutable {
2529 for (auto& picker : pickers) {
2530 picker.reset(DEBUG_LOCATION, "PickSubchannel");
2531 }
2532 },
2533 DEBUG_LOCATION);
2534 });
2535 absl::AnyInvocable<void(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
2536 set_picker;
2537 if (!IsWorkSerializerDispatchEnabled()) {
2538 set_picker =
2539 [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2540 pickers.emplace_back(std::move(picker));
2541 };
2542 } else {
2543 pickers.emplace_back();
2544 set_picker =
2545 [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2546 pickers[0] = std::move(picker);
2547 };
2548 }
2549 // Grab mutex and take a ref to the picker.
2550 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2551 << "chand=" << chand_ << " lb_call=" << this
2552 << ": grabbing LB mutex to get picker";
2553 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
2554 {
2555 MutexLock lock(&chand_->lb_mu_);
2556 set_picker(chand_->picker_);
2557 }
2558 while (true) {
2559 // TODO(roth): Fix race condition in channel_idle filter and any
2560 // other possible causes of this.
2561 if (pickers.back() == nullptr) {
2562 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2563 << "chand=" << chand_ << " lb_call=" << this
2564 << ": picker is null, failing call";
2565 return absl::InternalError("picker is null -- shouldn't happen");
2566 }
2567 // Do pick.
2568 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2569 << "chand=" << chand_ << " lb_call=" << this
2570 << ": performing pick with picker=" << pickers.back().get();
2571 grpc_error_handle error;
2572 bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2573 if (!pick_complete) {
2574 RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> old_picker;
2575 MutexLock lock(&chand_->lb_mu_);
2576 // If picker has been swapped out since we grabbed it, try again.
2577 if (pickers.back() != chand_->picker_) {
2578 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2579 << "chand=" << chand_ << " lb_call=" << this
2580 << ": pick not complete, but picker changed";
2581 if (IsWorkSerializerDispatchEnabled()) {
2582 // Don't unref until after we release the mutex.
2583 old_picker = std::move(pickers.back());
2584 }
2585 set_picker(chand_->picker_);
2586 continue;
2587 }
2588 // Otherwise queue the pick to try again later when we get a new picker.
2589 AddCallToLbQueuedCallsLocked();
2590 return absl::nullopt;
2591 }
2592 // Pick is complete.
2593 // If it was queued, add a trace annotation.
2594 if (was_queued && call_attempt_tracer() != nullptr) {
2595 call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2596 }
2597 // If the pick failed, fail the call.
2598 if (!error.ok()) {
2599 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2600 << "chand=" << chand_ << " lb_call=" << this
2601 << ": failed to pick subchannel: error=" << StatusToString(error);
2602 return error;
2603 }
2604 // Pick succeeded.
2605 Commit();
2606 return absl::OkStatus();
2607 }
2608 }
2609
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2610 bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
2611 LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2612 CHECK(connected_subchannel_ == nullptr);
2613 // Perform LB pick.
2614 LoadBalancingPolicy::PickArgs pick_args;
2615 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2616 CHECK_NE(path, nullptr);
2617 pick_args.path = path->as_string_view();
2618 LbCallState lb_call_state(this);
2619 pick_args.call_state = &lb_call_state;
2620 LbMetadata initial_metadata(send_initial_metadata());
2621 pick_args.initial_metadata = &initial_metadata;
2622 auto result = picker->Pick(pick_args);
2623 return HandlePickResult<bool>(
2624 &result,
2625 // CompletePick
2626 [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2627 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2628 << "chand=" << chand_ << " lb_call=" << this
2629 << ": LB pick succeeded: subchannel="
2630 << complete_pick->subchannel.get();
2631 CHECK(complete_pick->subchannel != nullptr);
2632 // Grab a ref to the connected subchannel while we're still
2633 // holding the data plane mutex.
2634 SubchannelWrapper* subchannel =
2635 static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2636 connected_subchannel_ = subchannel->connected_subchannel();
2637 // If the subchannel has no connected subchannel (e.g., if the
2638 // subchannel has moved out of state READY but the LB policy hasn't
2639 // yet seen that change and given us a new picker), then just
2640 // queue the pick. We'll try again as soon as we get a new picker.
2641 if (connected_subchannel_ == nullptr) {
2642 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2643 << "chand=" << chand_ << " lb_call=" << this
2644 << ": subchannel returned by LB picker "
2645 "has no connected subchannel; queueing pick";
2646 return false;
2647 }
2648 lb_subchannel_call_tracker_ =
2649 std::move(complete_pick->subchannel_call_tracker);
2650 if (lb_subchannel_call_tracker_ != nullptr) {
2651 lb_subchannel_call_tracker_->Start();
2652 }
2653 // Handle metadata mutations.
2654 MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
2655 send_initial_metadata());
2656 MaybeOverrideAuthority(std::move(complete_pick->authority_override),
2657 send_initial_metadata());
2658 return true;
2659 },
2660 // QueuePick
2661 [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
2662 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2663 << "chand=" << chand_ << " lb_call=" << this << ": LB pick queued";
2664 return false;
2665 },
2666 // FailPick
2667 [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
2668 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2669 << "chand=" << chand_ << " lb_call=" << this
2670 << ": LB pick failed: " << fail_pick->status;
2671 // If wait_for_ready is false, then the error indicates the RPC
2672 // attempt's final status.
2673 if (!send_initial_metadata()
2674 ->GetOrCreatePointer(WaitForReady())
2675 ->value) {
2676 *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2677 std::move(fail_pick->status), "LB pick"));
2678 return true;
2679 }
2680 // If wait_for_ready is true, then queue to retry when we get a new
2681 // picker.
2682 return false;
2683 },
2684 // DropPick
2685 [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
2686 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2687 << "chand=" << chand_ << " lb_call=" << this
2688 << ": LB pick dropped: " << drop_pick->status;
2689 *error = grpc_error_set_int(
2690 absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2691 std::move(drop_pick->status), "LB drop")),
2692 StatusIntProperty::kLbPolicyDrop, 1);
2693 return true;
2694 });
2695 }
2696
2697 //
2698 // ClientChannelFilter::FilterBasedLoadBalancedCall
2699 //
2700
FilterBasedLoadBalancedCall(ClientChannelFilter * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2701 ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
2702 ClientChannelFilter* chand, const grpc_call_element_args& args,
2703 grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
2704 absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2705 : LoadBalancedCall(chand, args.arena, std::move(on_commit),
2706 is_transparent_retry),
2707 owning_call_(args.call_stack),
2708 call_combiner_(args.call_combiner),
2709 pollent_(pollent),
2710 on_call_destruction_complete_(on_call_destruction_complete) {}
2711
2712 ClientChannelFilter::FilterBasedLoadBalancedCall::
~FilterBasedLoadBalancedCall()2713 ~FilterBasedLoadBalancedCall() {
2714 // Make sure there are no remaining pending batches.
2715 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2716 CHECK_EQ(pending_batches_[i], nullptr);
2717 }
2718 if (on_call_destruction_complete_ != nullptr) {
2719 ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2720 absl::OkStatus());
2721 }
2722 }
2723
Orphan()2724 void ClientChannelFilter::FilterBasedLoadBalancedCall::Orphan() {
2725 // If the recv_trailing_metadata op was never started, then notify
2726 // about call completion here, as best we can. We assume status
2727 // CANCELLED in this case.
2728 if (recv_trailing_metadata_ == nullptr) {
2729 RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
2730 nullptr, "");
2731 }
2732 RecordLatency();
2733 // Delegate to parent.
2734 LoadBalancedCall::Orphan();
2735 }
2736
GetBatchIndex(grpc_transport_stream_op_batch * batch)2737 size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex(
2738 grpc_transport_stream_op_batch* batch) {
2739 // Note: It is important the send_initial_metadata be the first entry
2740 // here, since the code in PickSubchannelImpl() assumes it will be.
2741 if (batch->send_initial_metadata) return 0;
2742 if (batch->send_message) return 1;
2743 if (batch->send_trailing_metadata) return 2;
2744 if (batch->recv_initial_metadata) return 3;
2745 if (batch->recv_message) return 4;
2746 if (batch->recv_trailing_metadata) return 5;
2747 GPR_UNREACHABLE_CODE(return (size_t)-1);
2748 }
2749
2750 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2751 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesAdd(
2752 grpc_transport_stream_op_batch* batch) {
2753 const size_t idx = GetBatchIndex(batch);
2754 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2755 << "chand=" << chand() << " lb_call=" << this
2756 << ": adding pending batch at index " << idx;
2757 CHECK_EQ(pending_batches_[idx], nullptr);
2758 pending_batches_[idx] = batch;
2759 }
2760
2761 // This is called via the call combiner, so access to calld is synchronized.
2762 void ClientChannelFilter::FilterBasedLoadBalancedCall::
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2763 FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error) {
2764 grpc_transport_stream_op_batch* batch =
2765 static_cast<grpc_transport_stream_op_batch*>(arg);
2766 auto* self = static_cast<FilterBasedLoadBalancedCall*>(
2767 batch->handler_private.extra_arg);
2768 // Note: This will release the call combiner.
2769 grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2770 self->call_combiner_);
2771 }
2772
2773 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2774 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail(
2775 grpc_error_handle error,
2776 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2777 CHECK(!error.ok());
2778 failure_error_ = error;
2779 if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)) {
2780 size_t num_batches = 0;
2781 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2782 if (pending_batches_[i] != nullptr) ++num_batches;
2783 }
2784 LOG(INFO) << "chand=" << chand() << " lb_call=" << this << ": failing "
2785 << num_batches << " pending batches: " << StatusToString(error);
2786 }
2787 CallCombinerClosureList closures;
2788 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2789 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2790 if (batch != nullptr) {
2791 batch->handler_private.extra_arg = this;
2792 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2793 FailPendingBatchInCallCombiner, batch,
2794 grpc_schedule_on_exec_ctx);
2795 closures.Add(&batch->handler_private.closure, error,
2796 "PendingBatchesFail");
2797 batch = nullptr;
2798 }
2799 }
2800 if (yield_call_combiner_predicate(closures)) {
2801 closures.RunClosures(call_combiner_);
2802 } else {
2803 closures.RunClosuresWithoutYielding(call_combiner_);
2804 }
2805 }
2806
2807 // This is called via the call combiner, so access to calld is synchronized.
2808 void ClientChannelFilter::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2809 ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
2810 grpc_transport_stream_op_batch* batch =
2811 static_cast<grpc_transport_stream_op_batch*>(arg);
2812 SubchannelCall* subchannel_call =
2813 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2814 // Note: This will release the call combiner.
2815 subchannel_call->StartTransportStreamOpBatch(batch);
2816 }
2817
2818 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2819 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() {
2820 if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)) {
2821 size_t num_batches = 0;
2822 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2823 if (pending_batches_[i] != nullptr) ++num_batches;
2824 }
2825 LOG(INFO) << "chand=" << chand() << " lb_call=" << this << ": starting "
2826 << num_batches << " pending batches on subchannel_call="
2827 << subchannel_call_.get();
2828 }
2829 CallCombinerClosureList closures;
2830 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2832 if (batch != nullptr) {
2833 batch->handler_private.extra_arg = subchannel_call_.get();
2834 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2835 ResumePendingBatchInCallCombiner, batch,
2836 grpc_schedule_on_exec_ctx);
2837 closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2838 "resuming pending batch from LB call");
2839 batch = nullptr;
2840 }
2841 }
2842 // Note: This will release the call combiner.
2843 closures.RunClosures(call_combiner_);
2844 }
2845
2846 void ClientChannelFilter::FilterBasedLoadBalancedCall::
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2847 StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch) {
2848 if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call) ||
2849 GRPC_TRACE_FLAG_ENABLED(channel)) {
2850 LOG(INFO) << "chand=" << chand() << " lb_call=" << this
2851 << ": batch started from above: "
2852 << grpc_transport_stream_op_batch_string(batch, false)
2853 << ", call_attempt_tracer()=" << call_attempt_tracer();
2854 }
2855 // Handle call tracing.
2856 if (call_attempt_tracer() != nullptr) {
2857 // Record send ops in tracer.
2858 if (batch->cancel_stream) {
2859 call_attempt_tracer()->RecordCancel(
2860 batch->payload->cancel_stream.cancel_error);
2861 }
2862 if (batch->send_initial_metadata) {
2863 call_attempt_tracer()->RecordSendInitialMetadata(
2864 batch->payload->send_initial_metadata.send_initial_metadata);
2865 }
2866 if (batch->send_trailing_metadata) {
2867 call_attempt_tracer()->RecordSendTrailingMetadata(
2868 batch->payload->send_trailing_metadata.send_trailing_metadata);
2869 }
2870 // Intercept recv ops.
2871 if (batch->recv_initial_metadata) {
2872 recv_initial_metadata_ =
2873 batch->payload->recv_initial_metadata.recv_initial_metadata;
2874 original_recv_initial_metadata_ready_ =
2875 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2876 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
2877 this, nullptr);
2878 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2879 &recv_initial_metadata_ready_;
2880 }
2881 }
2882 // Intercept recv_trailing_metadata even if there is no call tracer,
2883 // since we may need to notify the LB policy about trailing metadata.
2884 if (batch->recv_trailing_metadata) {
2885 recv_trailing_metadata_ =
2886 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2887 transport_stream_stats_ =
2888 batch->payload->recv_trailing_metadata.collect_stats;
2889 original_recv_trailing_metadata_ready_ =
2890 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2891 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
2892 this, nullptr);
2893 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2894 &recv_trailing_metadata_ready_;
2895 }
2896 // If we've already gotten a subchannel call, pass the batch down to it.
2897 // Note that once we have picked a subchannel, we do not need to acquire
2898 // the channel's data plane mutex, which is more efficient (especially for
2899 // streaming calls).
2900 if (subchannel_call_ != nullptr) {
2901 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2902 << "chand=" << chand() << " lb_call=" << this
2903 << ": starting batch on subchannel_call=" << subchannel_call_.get();
2904 subchannel_call_->StartTransportStreamOpBatch(batch);
2905 return;
2906 }
2907 // We do not yet have a subchannel call.
2908 //
2909 // If we've previously been cancelled, immediately fail any new batches.
2910 if (GPR_UNLIKELY(!cancel_error_.ok())) {
2911 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2912 << "chand=" << chand() << " lb_call=" << this
2913 << ": failing batch with error: " << StatusToString(cancel_error_);
2914 // Note: This will release the call combiner.
2915 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2916 call_combiner_);
2917 return;
2918 }
2919 // Handle cancellation.
2920 if (GPR_UNLIKELY(batch->cancel_stream)) {
2921 // Stash a copy of cancel_error in our call data, so that we can use
2922 // it for subsequent operations. This ensures that if the call is
2923 // cancelled before any batches are passed down (e.g., if the deadline
2924 // is in the past when the call starts), we can return the right
2925 // error to the caller when the first batch does get passed down.
2926 cancel_error_ = batch->payload->cancel_stream.cancel_error;
2927 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2928 << "chand=" << chand() << " lb_call=" << this
2929 << ": recording cancel_error=" << StatusToString(cancel_error_).c_str();
2930 // Fail all pending batches.
2931 PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
2932 // Note: This will release the call combiner.
2933 grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2934 call_combiner_);
2935 return;
2936 }
2937 // Add the batch to the pending list.
2938 PendingBatchesAdd(batch);
2939 // For batches containing a send_initial_metadata op, acquire the
2940 // channel's LB mutex to pick a subchannel.
2941 if (GPR_LIKELY(batch->send_initial_metadata)) {
2942 TryPick(/*was_queued=*/false);
2943 } else {
2944 // For all other batches, release the call combiner.
2945 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2946 << "chand=" << chand() << " lb_call=" << this
2947 << ": saved batch, yielding call combiner";
2948 GRPC_CALL_COMBINER_STOP(call_combiner_,
2949 "batch does not include send_initial_metadata");
2950 }
2951 }
2952
RecvInitialMetadataReady(void * arg,grpc_error_handle error)2953 void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
2954 void* arg, grpc_error_handle error) {
2955 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
2956 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2957 << "chand=" << self->chand() << " lb_call=" << self
2958 << ": got recv_initial_metadata_ready: error=" << StatusToString(error);
2959 if (error.ok()) {
2960 // recv_initial_metadata_flags is not populated for clients
2961 self->call_attempt_tracer()->RecordReceivedInitialMetadata(
2962 self->recv_initial_metadata_);
2963 auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
2964 if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
2965 }
2966 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
2967 error);
2968 }
2969
2970 void ClientChannelFilter::FilterBasedLoadBalancedCall::
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)2971 RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
2972 auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
2973 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2974 << "chand=" << self->chand() << " lb_call=" << self
2975 << ": got recv_trailing_metadata_ready: error=" << StatusToString(error)
2976 << " call_attempt_tracer()=" << self->call_attempt_tracer()
2977 << " lb_subchannel_call_tracker_=" << self->lb_subchannel_call_tracker()
2978 << " failure_error_=" << StatusToString(self->failure_error_);
2979 // Check if we have a tracer or an LB callback to invoke.
2980 if (self->call_attempt_tracer() != nullptr ||
2981 self->lb_subchannel_call_tracker() != nullptr) {
2982 // Get the call's status.
2983 absl::Status status;
2984 if (!error.ok()) {
2985 // Get status from error.
2986 grpc_status_code code;
2987 std::string message;
2988 grpc_error_get_status(
2989 error, self->arena()->GetContext<Call>()->deadline(), &code, &message,
2990 /*http_error=*/nullptr, /*error_string=*/nullptr);
2991 status = absl::Status(static_cast<absl::StatusCode>(code), message);
2992 } else {
2993 // Get status from headers.
2994 const auto& md = *self->recv_trailing_metadata_;
2995 grpc_status_code code =
2996 md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
2997 if (code != GRPC_STATUS_OK) {
2998 absl::string_view message;
2999 if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3000 message = grpc_message->as_string_view();
3001 }
3002 status = absl::Status(static_cast<absl::StatusCode>(code), message);
3003 }
3004 }
3005 absl::string_view peer_string;
3006 if (self->peer_string_.has_value()) {
3007 peer_string = self->peer_string_->as_string_view();
3008 }
3009 self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3010 self->transport_stream_stats_, peer_string);
3011 }
3012 // Chain to original callback.
3013 if (!self->failure_error_.ok()) {
3014 error = self->failure_error_;
3015 self->failure_error_ = absl::OkStatus();
3016 }
3017 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3018 error);
3019 }
3020
3021 // A class to handle the call combiner cancellation callback for a
3022 // queued pick.
3023 // TODO(roth): When we implement hedging support, we won't be able to
3024 // register a call combiner cancellation closure for each LB pick,
3025 // because there may be multiple LB picks happening in parallel.
3026 // Instead, we will probably need to maintain a list in the CallData
3027 // object of pending LB picks to be cancelled when the closure runs.
3028 class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller
3029 final {
3030 public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3031 explicit LbQueuedCallCanceller(
3032 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3033 : lb_call_(std::move(lb_call)) {
3034 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3035 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3036 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3037 }
3038
3039 private:
CancelLocked(void * arg,grpc_error_handle error)3040 static void CancelLocked(void* arg, grpc_error_handle error) {
3041 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3042 auto* lb_call = self->lb_call_.get();
3043 auto* chand = lb_call->chand();
3044 {
3045 MutexLock lock(&chand->lb_mu_);
3046 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
3047 << "chand=" << chand << " lb_call=" << lb_call
3048 << ": cancelling queued pick: error=" << StatusToString(error)
3049 << " self=" << self
3050 << " calld->pick_canceller=" << lb_call->lb_call_canceller_;
3051 if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3052 lb_call->Commit();
3053 // Remove pick from list of queued picks.
3054 lb_call->RemoveCallFromLbQueuedCallsLocked();
3055 // Remove from queued picks list.
3056 chand->lb_queued_calls_.erase(self->lb_call_);
3057 // Fail pending batches on the call.
3058 lb_call->PendingBatchesFail(error,
3059 YieldCallCombinerIfPendingBatchesFound);
3060 }
3061 }
3062 // Unref lb_call before unreffing the call stack, since unreffing
3063 // the call stack may destroy the arena in which lb_call is allocated.
3064 auto* owning_call = lb_call->owning_call_;
3065 self->lb_call_.reset();
3066 GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3067 delete self;
3068 }
3069
3070 RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3071 grpc_closure closure_;
3072 };
3073
TryPick(bool was_queued)3074 void ClientChannelFilter::FilterBasedLoadBalancedCall::TryPick(
3075 bool was_queued) {
3076 auto result = PickSubchannel(was_queued);
3077 if (result.has_value()) {
3078 if (!result->ok()) {
3079 PendingBatchesFail(*result, YieldCallCombiner);
3080 return;
3081 }
3082 CreateSubchannelCall();
3083 }
3084 }
3085
OnAddToQueueLocked()3086 void ClientChannelFilter::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3087 // Register call combiner cancellation callback.
3088 lb_call_canceller_ =
3089 new LbQueuedCallCanceller(RefAsSubclass<FilterBasedLoadBalancedCall>());
3090 }
3091
RetryPickLocked()3092 void ClientChannelFilter::FilterBasedLoadBalancedCall::RetryPickLocked() {
3093 // Lame the call combiner canceller.
3094 lb_call_canceller_ = nullptr;
3095 // Do an async callback to resume call processing, so that we're not
3096 // doing it while holding the channel's LB mutex.
3097 // TODO(roth): We should really be using EventEngine::Run() here
3098 // instead of ExecCtx::Run(). Unfortunately, doing that seems to cause
3099 // a flaky TSAN failure for reasons that I do not fully understand.
3100 // However, given that we are working toward eliminating this code as
3101 // part of the promise conversion, it doesn't seem worth further
3102 // investigation right now.
3103 ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3104 // If there are a lot of queued calls here, resuming them
3105 // all may cause us to stay inside C-core for a long period
3106 // of time. All of that work would be done using the same
3107 // ExecCtx instance and therefore the same cached value of
3108 // "now". The longer it takes to finish all of this work
3109 // and exit from C-core, the more stale the cached value of
3110 // "now" may become. This can cause problems whereby (e.g.)
3111 // we calculate a timer deadline based on the stale value,
3112 // which results in the timer firing too early. To avoid
3113 // this, we invalidate the cached value for each call we
3114 // process.
3115 ExecCtx::Get()->InvalidateNow();
3116 TryPick(/*was_queued=*/true);
3117 }),
3118 absl::OkStatus());
3119 }
3120
CreateSubchannelCall()3121 void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3122 Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3123 CHECK_NE(path, nullptr);
3124 SubchannelCall::Args call_args = {
3125 connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3126 arena()->GetContext<Call>()->deadline(),
3127 // TODO(roth): When we implement hedging support, we will probably
3128 // need to use a separate call arena for each subchannel call.
3129 arena(), call_combiner_};
3130 grpc_error_handle error;
3131 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3132 GRPC_TRACE_LOG(client_channel_lb_call, INFO)
3133 << "chand=" << chand() << " lb_call=" << this
3134 << ": create subchannel_call=" << subchannel_call_.get()
3135 << ": error=" << StatusToString(error);
3136 if (on_call_destruction_complete_ != nullptr) {
3137 subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3138 on_call_destruction_complete_ = nullptr;
3139 }
3140 if (GPR_UNLIKELY(!error.ok())) {
3141 PendingBatchesFail(error, YieldCallCombiner);
3142 } else {
3143 PendingBatchesResume();
3144 }
3145 }
3146
3147 } // namespace grpc_core
3148