• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/client_channel/client_channel_filter.h"
18 
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/slice.h>
22 #include <grpc/status.h>
23 #include <grpc/support/json.h>
24 #include <grpc/support/port_platform.h>
25 #include <grpc/support/string_util.h>
26 #include <grpc/support/time.h>
27 #include <inttypes.h>
28 #include <limits.h>
29 
30 #include <algorithm>
31 #include <functional>
32 #include <new>
33 #include <set>
34 #include <type_traits>
35 #include <utility>
36 #include <vector>
37 
38 #include "absl/cleanup/cleanup.h"
39 #include "absl/log/check.h"
40 #include "absl/log/log.h"
41 #include "absl/status/status.h"
42 #include "absl/status/statusor.h"
43 #include "absl/strings/cord.h"
44 #include "absl/strings/numbers.h"
45 #include "absl/strings/str_cat.h"
46 #include "absl/strings/str_join.h"
47 #include "absl/strings/string_view.h"
48 #include "absl/types/optional.h"
49 #include "absl/types/variant.h"
50 #include "src/core/channelz/channel_trace.h"
51 #include "src/core/client_channel/backup_poller.h"
52 #include "src/core/client_channel/client_channel_internal.h"
53 #include "src/core/client_channel/client_channel_service_config.h"
54 #include "src/core/client_channel/config_selector.h"
55 #include "src/core/client_channel/dynamic_filters.h"
56 #include "src/core/client_channel/global_subchannel_pool.h"
57 #include "src/core/client_channel/lb_metadata.h"
58 #include "src/core/client_channel/local_subchannel_pool.h"
59 #include "src/core/client_channel/retry_filter.h"
60 #include "src/core/client_channel/subchannel.h"
61 #include "src/core/client_channel/subchannel_interface_internal.h"
62 #include "src/core/config/core_configuration.h"
63 #include "src/core/handshaker/proxy_mapper_registry.h"
64 #include "src/core/lib/address_utils/sockaddr_utils.h"
65 #include "src/core/lib/channel/channel_args.h"
66 #include "src/core/lib/channel/channel_stack.h"
67 #include "src/core/lib/channel/status_util.h"
68 #include "src/core/lib/debug/trace.h"
69 #include "src/core/lib/experiments/experiments.h"
70 #include "src/core/lib/iomgr/exec_ctx.h"
71 #include "src/core/lib/iomgr/polling_entity.h"
72 #include "src/core/lib/iomgr/pollset_set.h"
73 #include "src/core/lib/iomgr/resolved_address.h"
74 #include "src/core/lib/promise/cancel_callback.h"
75 #include "src/core/lib/promise/context.h"
76 #include "src/core/lib/promise/latch.h"
77 #include "src/core/lib/promise/map.h"
78 #include "src/core/lib/promise/pipe.h"
79 #include "src/core/lib/promise/poll.h"
80 #include "src/core/lib/promise/promise.h"
81 #include "src/core/lib/promise/try_seq.h"
82 #include "src/core/lib/security/credentials/credentials.h"
83 #include "src/core/lib/slice/slice.h"
84 #include "src/core/lib/slice/slice_internal.h"
85 #include "src/core/lib/surface/call.h"
86 #include "src/core/lib/transport/connectivity_state.h"
87 #include "src/core/lib/transport/error_utils.h"
88 #include "src/core/lib/transport/metadata_batch.h"
89 #include "src/core/load_balancing/backend_metric_parser.h"
90 #include "src/core/load_balancing/child_policy_handler.h"
91 #include "src/core/load_balancing/lb_policy_registry.h"
92 #include "src/core/load_balancing/subchannel_interface.h"
93 #include "src/core/resolver/endpoint_addresses.h"
94 #include "src/core/resolver/resolver_registry.h"
95 #include "src/core/service_config/service_config_call_data.h"
96 #include "src/core/service_config/service_config_impl.h"
97 #include "src/core/util/crash.h"
98 #include "src/core/util/debug_location.h"
99 #include "src/core/util/json/json.h"
100 #include "src/core/util/manual_constructor.h"
101 #include "src/core/util/status_helper.h"
102 #include "src/core/util/sync.h"
103 #include "src/core/util/unique_type_name.h"
104 #include "src/core/util/useful.h"
105 #include "src/core/util/work_serializer.h"
106 
107 //
108 // Client channel filter
109 //
110 
111 namespace grpc_core {
112 
113 using internal::ClientChannelMethodParsedConfig;
114 
115 //
116 // ClientChannelFilter::CallData definition
117 //
118 
119 class ClientChannelFilter::CallData {
120  public:
121   // Removes the call from the channel's list of calls queued
122   // for name resolution.
123   void RemoveCallFromResolverQueuedCallsLocked()
124       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
125 
126   // Called by the channel for each queued call when a new resolution
127   // result becomes available.
128   virtual void RetryCheckResolutionLocked()
129       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) = 0;
130 
dynamic_filters() const131   RefCountedPtr<DynamicFilters> dynamic_filters() const {
132     return dynamic_filters_;
133   }
134 
135  protected:
136   CallData() = default;
137   virtual ~CallData() = default;
138 
139   // Checks whether a resolver result is available.  The following
140   // outcomes are possible:
141   // - No resolver result is available yet.  The call will be queued and
142   //   absl::nullopt will be returned.  Later, when a resolver result
143   //   becomes available, RetryCheckResolutionLocked() will be called.
144   // - The resolver has returned a transient failure.  If the call is
145   //   not wait_for_ready, a non-OK status will be returned.  (If the
146   //   call *is* wait_for_ready, it will be queued instead.)
147   // - There is a valid resolver result.  The service config will be
148   //   stored in the call context and an OK status will be returned.
149   absl::optional<absl::Status> CheckResolution(bool was_queued);
150 
151  private:
152   // Accessors for data stored in the subclass.
153   virtual ClientChannelFilter* chand() const = 0;
154   virtual Arena* arena() const = 0;
155   virtual grpc_polling_entity* pollent() = 0;
156   virtual grpc_metadata_batch* send_initial_metadata() = 0;
157 
158   // Helper function for CheckResolution().  Returns true if the call
159   // can continue (i.e., there is a valid resolution result, or there is
160   // an invalid resolution result but the call is not wait_for_ready).
161   bool CheckResolutionLocked(
162       absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector)
163       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
164 
165   // Adds the call to the channel's list of calls queued for name resolution.
166   void AddCallToResolverQueuedCallsLocked()
167       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
168 
169   // Called when adding the call to the resolver queue.
OnAddToQueueLocked()170   virtual void OnAddToQueueLocked()
171       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_) {}
172 
173   // Applies service config to the call.  Must be invoked once we know
174   // that the resolver has returned results to the channel.
175   // If an error is returned, the error indicates the status with which
176   // the call should be failed.
177   grpc_error_handle ApplyServiceConfigToCallLocked(
178       const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector);
179 
180   // Called to reset the deadline based on the service config obtained
181   // from the resolver.
182   virtual void ResetDeadline(Duration timeout) = 0;
183 
184   RefCountedPtr<DynamicFilters> dynamic_filters_;
185 };
186 
187 class ClientChannelFilter::FilterBasedCallData final
188     : public ClientChannelFilter::CallData {
189  public:
190   static grpc_error_handle Init(grpc_call_element* elem,
191                                 const grpc_call_element_args* args);
192   static void Destroy(grpc_call_element* elem,
193                       const grpc_call_final_info* final_info,
194                       grpc_closure* then_schedule_closure);
195   static void StartTransportStreamOpBatch(
196       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
197   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
198 
199  private:
200   class ResolverQueuedCallCanceller;
201 
202   FilterBasedCallData(grpc_call_element* elem,
203                       const grpc_call_element_args& args);
204   ~FilterBasedCallData() override;
205 
elem() const206   grpc_call_element* elem() const { return elem_; }
owning_call() const207   grpc_call_stack* owning_call() const { return owning_call_; }
call_combiner() const208   CallCombiner* call_combiner() const { return call_combiner_; }
209 
chand() const210   ClientChannelFilter* chand() const override {
211     return static_cast<ClientChannelFilter*>(elem()->channel_data);
212   }
arena() const213   Arena* arena() const override { return arena_; }
pollent()214   grpc_polling_entity* pollent() override { return pollent_; }
send_initial_metadata()215   grpc_metadata_batch* send_initial_metadata() override {
216     return pending_batches_[0]
217         ->payload->send_initial_metadata.send_initial_metadata;
218   }
219 
220   // Returns the index into pending_batches_ to be used for batch.
221   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
222   void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
223   static void FailPendingBatchInCallCombiner(void* arg,
224                                              grpc_error_handle error);
225   // A predicate type and some useful implementations for PendingBatchesFail().
226   typedef bool (*YieldCallCombinerPredicate)(
227       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)228   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
229     return true;
230   }
NoYieldCallCombiner(const CallCombinerClosureList &)231   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
232     return false;
233   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)234   static bool YieldCallCombinerIfPendingBatchesFound(
235       const CallCombinerClosureList& closures) {
236     return closures.size() > 0;
237   }
238   // Fails all pending batches.
239   // If yield_call_combiner_predicate returns true, assumes responsibility for
240   // yielding the call combiner.
241   void PendingBatchesFail(
242       grpc_error_handle error,
243       YieldCallCombinerPredicate yield_call_combiner_predicate);
244   static void ResumePendingBatchInCallCombiner(void* arg,
245                                                grpc_error_handle ignored);
246   // Resumes all pending batches on dynamic_call_.
247   void PendingBatchesResume();
248 
249   // Called to check for a resolution result, both when the call is
250   // initially started and when it is queued and the channel gets a new
251   // resolution result.
252   void TryCheckResolution(bool was_queued);
253 
254   void OnAddToQueueLocked() override
255       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
256 
257   void RetryCheckResolutionLocked() override
258       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannelFilter::resolution_mu_);
259 
ResetDeadline(Duration timeout)260   void ResetDeadline(Duration timeout) override {
261     const Timestamp per_method_deadline =
262         Timestamp::FromCycleCounterRoundUp(call_start_time_) + timeout;
263     arena_->GetContext<Call>()->UpdateDeadline(per_method_deadline);
264   }
265 
266   void CreateDynamicCall();
267 
268   static void RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
269       void* arg, grpc_error_handle error);
270 
271   grpc_slice path_;  // Request path.
272   gpr_cycle_counter call_start_time_;
273   Timestamp deadline_;
274 
275   Arena* const arena_;
276   grpc_call_element* const elem_;
277   grpc_call_stack* const owning_call_;
278   CallCombiner* const call_combiner_;
279 
280   grpc_polling_entity* pollent_ = nullptr;
281 
282   // Accessed while holding ClientChannelFilter::resolution_mu_.
283   ResolverQueuedCallCanceller* resolver_call_canceller_
284       ABSL_GUARDED_BY(&ClientChannelFilter::resolution_mu_) = nullptr;
285 
286   grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
287   grpc_closure recv_trailing_metadata_ready_;
288 
289   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
290 
291   // Batches are added to this list when received from above.
292   // They are removed when we are done handling the batch (i.e., when
293   // either we have invoked all of the batch's callbacks or we have
294   // passed the batch down to the LB call and are not intercepting any of
295   // its callbacks).
296   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
297 
298   // Set when we get a cancel_stream op.
299   grpc_error_handle cancel_error_;
300 };
301 
302 //
303 // Filter vtable
304 //
305 
306 const grpc_channel_filter ClientChannelFilter::kFilter = {
307     ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch,
308     ClientChannelFilter::StartTransportOp,
309     sizeof(ClientChannelFilter::FilterBasedCallData),
310     ClientChannelFilter::FilterBasedCallData::Init,
311     ClientChannelFilter::FilterBasedCallData::SetPollent,
312     ClientChannelFilter::FilterBasedCallData::Destroy,
313     sizeof(ClientChannelFilter),
314     ClientChannelFilter::Init,
315     grpc_channel_stack_no_post_init,
316     ClientChannelFilter::Destroy,
317     ClientChannelFilter::GetChannelInfo,
318     GRPC_UNIQUE_TYPE_NAME_HERE("client-channel"),
319 };
320 
321 //
322 // dynamic termination filter
323 //
324 
325 namespace {
326 
GetServiceConfigCallData(Arena * arena)327 ClientChannelServiceConfigCallData* GetServiceConfigCallData(Arena* arena) {
328   return DownCast<ClientChannelServiceConfigCallData*>(
329       arena->GetContext<ServiceConfigCallData>());
330 }
331 
332 class DynamicTerminationFilter final {
333  public:
334   class CallData;
335 
336   static const grpc_channel_filter kFilterVtable;
337 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)338   static grpc_error_handle Init(grpc_channel_element* elem,
339                                 grpc_channel_element_args* args) {
340     CHECK(args->is_last);
341     CHECK(elem->filter == &kFilterVtable);
342     new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
343     return absl::OkStatus();
344   }
345 
Destroy(grpc_channel_element * elem)346   static void Destroy(grpc_channel_element* elem) {
347     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
348     chand->~DynamicTerminationFilter();
349   }
350 
351   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)352   static void StartTransportOp(grpc_channel_element* /*elem*/,
353                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)354   static void GetChannelInfo(grpc_channel_element* /*elem*/,
355                              const grpc_channel_info* /*info*/) {}
356 
357  private:
DynamicTerminationFilter(const ChannelArgs & args)358   explicit DynamicTerminationFilter(const ChannelArgs& args)
359       : chand_(args.GetObject<ClientChannelFilter>()) {}
360 
361   ClientChannelFilter* chand_;
362 };
363 
364 class DynamicTerminationFilter::CallData final {
365  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)366   static grpc_error_handle Init(grpc_call_element* elem,
367                                 const grpc_call_element_args* args) {
368     new (elem->call_data) CallData(*args);
369     return absl::OkStatus();
370   }
371 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)372   static void Destroy(grpc_call_element* elem,
373                       const grpc_call_final_info* /*final_info*/,
374                       grpc_closure* then_schedule_closure) {
375     auto* calld = static_cast<CallData*>(elem->call_data);
376     RefCountedPtr<SubchannelCall> subchannel_call;
377     if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
378       subchannel_call = calld->lb_call_->subchannel_call();
379     }
380     calld->~CallData();
381     if (GPR_LIKELY(subchannel_call != nullptr)) {
382       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
383     } else {
384       // TODO(yashkt) : This can potentially be a Closure::Run
385       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
386     }
387   }
388 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)389   static void StartTransportStreamOpBatch(
390       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
391     auto* calld = static_cast<CallData*>(elem->call_data);
392     calld->lb_call_->StartTransportStreamOpBatch(batch);
393   }
394 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)395   static void SetPollent(grpc_call_element* elem,
396                          grpc_polling_entity* pollent) {
397     auto* calld = static_cast<CallData*>(elem->call_data);
398     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
399     ClientChannelFilter* client_channel = chand->chand_;
400     grpc_call_element_args args = {calld->owning_call_, nullptr,
401                                    calld->path_,
402                                    /*start_time=*/0,    calld->deadline_,
403                                    calld->arena_,       calld->call_combiner_};
404     auto* service_config_call_data = GetServiceConfigCallData(calld->arena_);
405     calld->lb_call_ = client_channel->CreateLoadBalancedCall(
406         args, pollent, nullptr,
407         [service_config_call_data]() { service_config_call_data->Commit(); },
408         /*is_transparent_retry=*/false);
409     GRPC_TRACE_LOG(client_channel_call, INFO)
410         << "chand=" << chand << " dynamic_termination_calld=" << client_channel
411         << ": create lb_call=" << calld->lb_call_.get();
412   }
413 
414  private:
CallData(const grpc_call_element_args & args)415   explicit CallData(const grpc_call_element_args& args)
416       : path_(CSliceRef(args.path)),
417         deadline_(args.deadline),
418         arena_(args.arena),
419         owning_call_(args.call_stack),
420         call_combiner_(args.call_combiner) {}
421 
~CallData()422   ~CallData() { CSliceUnref(path_); }
423 
424   grpc_slice path_;  // Request path.
425   Timestamp deadline_;
426   Arena* arena_;
427   grpc_call_stack* owning_call_;
428   CallCombiner* call_combiner_;
429 
430   OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall> lb_call_;
431 };
432 
433 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
434     DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
435     DynamicTerminationFilter::StartTransportOp,
436     sizeof(DynamicTerminationFilter::CallData),
437     DynamicTerminationFilter::CallData::Init,
438     DynamicTerminationFilter::CallData::SetPollent,
439     DynamicTerminationFilter::CallData::Destroy,
440     sizeof(DynamicTerminationFilter),
441     DynamicTerminationFilter::Init,
442     grpc_channel_stack_no_post_init,
443     DynamicTerminationFilter::Destroy,
444     DynamicTerminationFilter::GetChannelInfo,
445     GRPC_UNIQUE_TYPE_NAME_HERE("dynamic_filter_termination"),
446 };
447 
448 }  // namespace
449 
450 //
451 // ClientChannelFilter::ResolverResultHandler
452 //
453 
454 class ClientChannelFilter::ResolverResultHandler final
455     : public Resolver::ResultHandler {
456  public:
ResolverResultHandler(ClientChannelFilter * chand)457   explicit ResolverResultHandler(ClientChannelFilter* chand) : chand_(chand) {
458     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
459   }
460 
~ResolverResultHandler()461   ~ResolverResultHandler() override {
462     GRPC_TRACE_LOG(client_channel, INFO)
463         << "chand=" << chand_ << ": resolver shutdown complete";
464     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
465   }
466 
ReportResult(Resolver::Result result)467   void ReportResult(Resolver::Result result) override
468       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
469     chand_->OnResolverResultChangedLocked(std::move(result));
470   }
471 
472  private:
473   ClientChannelFilter* chand_;
474 };
475 
476 //
477 // ClientChannelFilter::SubchannelWrapper
478 //
479 
480 // This class is a wrapper for Subchannel that hides details of the
481 // channel's implementation (such as the connected subchannel) from the
482 // LB policy API.
483 //
484 // Note that no synchronization is needed here, because even if the
485 // underlying subchannel is shared between channels, this wrapper will only
486 // be used within one channel, so it will always be synchronized by the
487 // control plane work_serializer.
488 class ClientChannelFilter::SubchannelWrapper final
489     : public SubchannelInterface {
490  public:
SubchannelWrapper(ClientChannelFilter * chand,RefCountedPtr<Subchannel> subchannel)491   SubchannelWrapper(ClientChannelFilter* chand,
492                     RefCountedPtr<Subchannel> subchannel)
493       : SubchannelInterface(GRPC_TRACE_FLAG_ENABLED(client_channel)
494                                 ? "SubchannelWrapper"
495                                 : nullptr),
496         chand_(chand),
497         subchannel_(std::move(subchannel)) {
498     GRPC_TRACE_LOG(client_channel, INFO)
499         << "chand=" << chand << ": creating subchannel wrapper " << this
500         << " for subchannel " << subchannel_.get();
501     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
502 #ifndef NDEBUG
503     DCHECK(chand_->work_serializer_->RunningInWorkSerializer());
504 #endif
505     if (chand_->channelz_node_ != nullptr) {
506       auto* subchannel_node = subchannel_->channelz_node();
507       if (subchannel_node != nullptr) {
508         auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
509         if (it == chand_->subchannel_refcount_map_.end()) {
510           chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
511           it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
512                    .first;
513         }
514         ++it->second;
515       }
516     }
517     chand_->subchannel_wrappers_.insert(this);
518   }
519 
~SubchannelWrapper()520   ~SubchannelWrapper() override {
521     GRPC_TRACE_LOG(client_channel, INFO)
522         << "chand=" << chand_ << ": destroying subchannel wrapper " << this
523         << "for subchannel " << subchannel_.get();
524     if (!IsWorkSerializerDispatchEnabled()) {
525       chand_->subchannel_wrappers_.erase(this);
526       if (chand_->channelz_node_ != nullptr) {
527         auto* subchannel_node = subchannel_->channelz_node();
528         if (subchannel_node != nullptr) {
529           auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
530           CHECK(it != chand_->subchannel_refcount_map_.end());
531           --it->second;
532           if (it->second == 0) {
533             chand_->channelz_node_->RemoveChildSubchannel(
534                 subchannel_node->uuid());
535             chand_->subchannel_refcount_map_.erase(it);
536           }
537         }
538       }
539     }
540     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
541   }
542 
Orphaned()543   void Orphaned() override {
544     if (!IsWorkSerializerDispatchEnabled()) return;
545     // Make sure we clean up the channel's subchannel maps inside the
546     // WorkSerializer.
547     // Ref held by callback.
548     WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
549     chand_->work_serializer_->Run(
550         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
551           chand_->subchannel_wrappers_.erase(this);
552           if (chand_->channelz_node_ != nullptr) {
553             auto* subchannel_node = subchannel_->channelz_node();
554             if (subchannel_node != nullptr) {
555               auto it =
556                   chand_->subchannel_refcount_map_.find(subchannel_.get());
557               CHECK(it != chand_->subchannel_refcount_map_.end());
558               --it->second;
559               if (it->second == 0) {
560                 chand_->channelz_node_->RemoveChildSubchannel(
561                     subchannel_node->uuid());
562                 chand_->subchannel_refcount_map_.erase(it);
563               }
564             }
565           }
566           WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
567         },
568         DEBUG_LOCATION);
569   }
570 
WatchConnectivityState(std::unique_ptr<ConnectivityStateWatcherInterface> watcher)571   void WatchConnectivityState(
572       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override
573       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
574     auto& watcher_wrapper = watcher_map_[watcher.get()];
575     CHECK_EQ(watcher_wrapper, nullptr);
576     watcher_wrapper = new WatcherWrapper(
577         std::move(watcher),
578         RefAsSubclass<SubchannelWrapper>(DEBUG_LOCATION, "WatcherWrapper"));
579     subchannel_->WatchConnectivityState(
580         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
581             watcher_wrapper));
582   }
583 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)584   void CancelConnectivityStateWatch(ConnectivityStateWatcherInterface* watcher)
585       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
586     auto it = watcher_map_.find(watcher);
587     CHECK(it != watcher_map_.end());
588     subchannel_->CancelConnectivityStateWatch(it->second);
589     watcher_map_.erase(it);
590   }
591 
connected_subchannel() const592   RefCountedPtr<ConnectedSubchannel> connected_subchannel() const {
593     return subchannel_->connected_subchannel();
594   }
595 
RequestConnection()596   void RequestConnection() override { subchannel_->RequestConnection(); }
597 
ResetBackoff()598   void ResetBackoff() override { subchannel_->ResetBackoff(); }
599 
AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher)600   void AddDataWatcher(std::unique_ptr<DataWatcherInterface> watcher) override
601       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
602     static_cast<InternalSubchannelDataWatcherInterface*>(watcher.get())
603         ->SetSubchannel(subchannel_.get());
604     CHECK(data_watchers_.insert(std::move(watcher)).second);
605   }
606 
CancelDataWatcher(DataWatcherInterface * watcher)607   void CancelDataWatcher(DataWatcherInterface* watcher) override
608       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
609     auto it = data_watchers_.find(watcher);
610     if (it != data_watchers_.end()) data_watchers_.erase(it);
611   }
612 
ThrottleKeepaliveTime(int new_keepalive_time)613   void ThrottleKeepaliveTime(int new_keepalive_time) {
614     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
615   }
616 
address() const617   std::string address() const override { return subchannel_->address(); }
618 
619  private:
620   // This wrapper provides a bridge between the internal Subchannel API
621   // and the SubchannelInterface API that we expose to LB policies.
622   // It implements Subchannel::ConnectivityStateWatcherInterface and wraps
623   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
624   // that was passed in by the LB policy.  We pass an instance of this
625   // class to the underlying Subchannel, and when we get updates from
626   // the subchannel, we pass those on to the wrapped watcher to return
627   // the update to the LB policy.
628   //
629   // This class handles things like hopping into the WorkSerializer
630   // before passing notifications to the LB policy and propagating
631   // keepalive information between subchannels.
632   class WatcherWrapper final
633       : public Subchannel::ConnectivityStateWatcherInterface {
634    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent)635     WatcherWrapper(
636         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
637             watcher,
638         RefCountedPtr<SubchannelWrapper> parent)
639         : watcher_(std::move(watcher)), parent_(std::move(parent)) {}
640 
~WatcherWrapper()641     ~WatcherWrapper() override {
642       if (!IsWorkSerializerDispatchEnabled()) {
643         auto* parent = parent_.release();  // ref owned by lambda
644         parent->chand_->work_serializer_->Run(
645             [parent]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
646                 *parent_->chand_->work_serializer_) {
647               parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
648             },
649             DEBUG_LOCATION);
650         return;
651       }
652       parent_.reset(DEBUG_LOCATION, "WatcherWrapper");
653     }
654 
OnConnectivityStateChange(RefCountedPtr<ConnectivityStateWatcherInterface> self,grpc_connectivity_state state,const absl::Status & status)655     void OnConnectivityStateChange(
656         RefCountedPtr<ConnectivityStateWatcherInterface> self,
657         grpc_connectivity_state state, const absl::Status& status) override {
658       GRPC_TRACE_LOG(client_channel, INFO)
659           << "chand=" << parent_->chand_
660           << ": connectivity change for subchannel wrapper " << parent_.get()
661           << " subchannel " << parent_->subchannel_.get()
662           << "hopping into work_serializer";
663       self.release();  // Held by callback.
664       parent_->chand_->work_serializer_->Run(
665           [this, state, status]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
666               *parent_->chand_->work_serializer_) {
667             ApplyUpdateInControlPlaneWorkSerializer(state, status);
668             Unref();
669           },
670           DEBUG_LOCATION);
671     }
672 
interested_parties()673     grpc_pollset_set* interested_parties() override {
674       return watcher_->interested_parties();
675     }
676 
677    private:
ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,const absl::Status & status)678     void ApplyUpdateInControlPlaneWorkSerializer(grpc_connectivity_state state,
679                                                  const absl::Status& status)
680         ABSL_EXCLUSIVE_LOCKS_REQUIRED(*parent_->chand_->work_serializer_) {
681       GRPC_TRACE_LOG(client_channel, INFO)
682           << "chand=" << parent_->chand_
683           << ": processing connectivity change in work serializer for "
684              "subchannel wrapper "
685           << parent_.get() << " subchannel " << parent_->subchannel_.get()
686           << " watcher=" << watcher_.get()
687           << " state=" << ConnectivityStateName(state) << " status=" << status;
688       absl::optional<absl::Cord> keepalive_throttling =
689           status.GetPayload(kKeepaliveThrottlingKey);
690       if (keepalive_throttling.has_value()) {
691         int new_keepalive_time = -1;
692         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
693                              &new_keepalive_time)) {
694           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
695             parent_->chand_->keepalive_time_ = new_keepalive_time;
696             GRPC_TRACE_LOG(client_channel, INFO)
697                 << "chand=" << parent_->chand_
698                 << ": throttling keepalive time to "
699                 << parent_->chand_->keepalive_time_;
700             // Propagate the new keepalive time to all subchannels. This is so
701             // that new transports created by any subchannel (and not just the
702             // subchannel that received the GOAWAY), use the new keepalive time.
703             for (auto* subchannel_wrapper :
704                  parent_->chand_->subchannel_wrappers_) {
705               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
706             }
707           }
708         } else {
709           LOG(ERROR) << "chand=" << parent_->chand_
710                      << ": Illegal keepalive throttling value "
711                      << std::string(keepalive_throttling.value());
712         }
713       }
714       // Propagate status only in state TF.
715       // We specifically want to avoid propagating the status for
716       // state IDLE that the real subchannel gave us only for the
717       // purpose of keepalive propagation.
718       watcher_->OnConnectivityStateChange(
719           state,
720           state == GRPC_CHANNEL_TRANSIENT_FAILURE ? status : absl::OkStatus());
721     }
722 
723     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
724         watcher_;
725     RefCountedPtr<SubchannelWrapper> parent_;
726   };
727 
728   // A heterogenous lookup comparator for data watchers that allows
729   // unique_ptr keys to be looked up as raw pointers.
730   struct DataWatcherLessThan {
731     using is_transparent = void;
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan732     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
733                     const std::unique_ptr<DataWatcherInterface>& p2) const {
734       return p1 < p2;
735     }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan736     bool operator()(const std::unique_ptr<DataWatcherInterface>& p1,
737                     const DataWatcherInterface* p2) const {
738       return p1.get() < p2;
739     }
operator ()grpc_core::ClientChannelFilter::SubchannelWrapper::DataWatcherLessThan740     bool operator()(const DataWatcherInterface* p1,
741                     const std::unique_ptr<DataWatcherInterface>& p2) const {
742       return p1 < p2.get();
743     }
744   };
745 
746   ClientChannelFilter* chand_;
747   RefCountedPtr<Subchannel> subchannel_;
748   // Maps from the address of the watcher passed to us by the LB policy
749   // to the address of the WrapperWatcher that we passed to the underlying
750   // subchannel.  This is needed so that when the LB policy calls
751   // CancelConnectivityStateWatch() with its watcher, we know the
752   // corresponding WrapperWatcher to cancel on the underlying subchannel.
753   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_
754       ABSL_GUARDED_BY(*chand_->work_serializer_);
755   std::set<std::unique_ptr<DataWatcherInterface>, DataWatcherLessThan>
756       data_watchers_ ABSL_GUARDED_BY(*chand_->work_serializer_);
757 };
758 
759 //
760 // ClientChannelFilter::ExternalConnectivityWatcher
761 //
762 
ExternalConnectivityWatcher(ClientChannelFilter * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)763 ClientChannelFilter::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
764     ClientChannelFilter* chand, grpc_polling_entity pollent,
765     grpc_connectivity_state* state, grpc_closure* on_complete,
766     grpc_closure* watcher_timer_init)
767     : chand_(chand),
768       pollent_(pollent),
769       initial_state_(*state),
770       state_(state),
771       on_complete_(on_complete),
772       watcher_timer_init_(watcher_timer_init) {
773   grpc_polling_entity_add_to_pollset_set(&pollent_,
774                                          chand_->interested_parties_);
775   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
776   {
777     MutexLock lock(&chand_->external_watchers_mu_);
778     // Will be deleted when the watch is complete.
779     CHECK(chand->external_watchers_[on_complete] == nullptr);
780     // Store a ref to the watcher in the external_watchers_ map.
781     chand->external_watchers_[on_complete] =
782         RefAsSubclass<ExternalConnectivityWatcher>(
783             DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
784   }
785   // Pass the ref from creating the object to Start().
786   chand_->work_serializer_->Run(
787       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
788         // The ref is passed to AddWatcherLocked().
789         AddWatcherLocked();
790       },
791       DEBUG_LOCATION);
792 }
793 
794 ClientChannelFilter::ExternalConnectivityWatcher::
~ExternalConnectivityWatcher()795     ~ExternalConnectivityWatcher() {
796   grpc_polling_entity_del_from_pollset_set(&pollent_,
797                                            chand_->interested_parties_);
798   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
799                            "ExternalConnectivityWatcher");
800 }
801 
802 void ClientChannelFilter::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannelFilter * chand,grpc_closure * on_complete,bool cancel)803     RemoveWatcherFromExternalWatchersMap(ClientChannelFilter* chand,
804                                          grpc_closure* on_complete,
805                                          bool cancel) {
806   RefCountedPtr<ExternalConnectivityWatcher> watcher;
807   {
808     MutexLock lock(&chand->external_watchers_mu_);
809     auto it = chand->external_watchers_.find(on_complete);
810     if (it != chand->external_watchers_.end()) {
811       watcher = std::move(it->second);
812       chand->external_watchers_.erase(it);
813     }
814   }
815   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
816   // the mutex before calling it.
817   if (watcher != nullptr && cancel) watcher->Cancel();
818 }
819 
Notify(grpc_connectivity_state state,const absl::Status &)820 void ClientChannelFilter::ExternalConnectivityWatcher::Notify(
821     grpc_connectivity_state state, const absl::Status& /* status */) {
822   bool done = false;
823   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
824                                      std::memory_order_relaxed)) {
825     return;  // Already done.
826   }
827   // Remove external watcher.
828   ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
829       chand_, on_complete_, /*cancel=*/false);
830   // Report new state to the user.
831   *state_ = state;
832   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::OkStatus());
833   // Hop back into the work_serializer to clean up.
834   // Not needed in state SHUTDOWN, because the tracker will
835   // automatically remove all watchers in that case.
836   // Note: The callback takes a ref in case the ref inside the state tracker
837   // gets removed before the callback runs via a SHUTDOWN notification.
838   if (state != GRPC_CHANNEL_SHUTDOWN) {
839     Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
840     chand_->work_serializer_->Run(
841         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
842           RemoveWatcherLocked();
843           Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
844         },
845         DEBUG_LOCATION);
846   }
847 }
848 
Cancel()849 void ClientChannelFilter::ExternalConnectivityWatcher::Cancel() {
850   bool done = false;
851   if (!done_.compare_exchange_strong(done, true, std::memory_order_relaxed,
852                                      std::memory_order_relaxed)) {
853     return;  // Already done.
854   }
855   ExecCtx::Run(DEBUG_LOCATION, on_complete_, absl::CancelledError());
856   // Hop back into the work_serializer to clean up.
857   // Note: The callback takes a ref in case the ref inside the state tracker
858   // gets removed before the callback runs via a SHUTDOWN notification.
859   Ref(DEBUG_LOCATION, "RemoveWatcherLocked()").release();
860   chand_->work_serializer_->Run(
861       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
862         RemoveWatcherLocked();
863         Unref(DEBUG_LOCATION, "RemoveWatcherLocked()");
864       },
865       DEBUG_LOCATION);
866 }
867 
AddWatcherLocked()868 void ClientChannelFilter::ExternalConnectivityWatcher::AddWatcherLocked() {
869   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, absl::OkStatus());
870   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
871   chand_->state_tracker_.AddWatcher(
872       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
873 }
874 
RemoveWatcherLocked()875 void ClientChannelFilter::ExternalConnectivityWatcher::RemoveWatcherLocked() {
876   chand_->state_tracker_.RemoveWatcher(this);
877 }
878 
879 //
880 // ClientChannelFilter::ConnectivityWatcherAdder
881 //
882 
883 class ClientChannelFilter::ConnectivityWatcherAdder final {
884  public:
ConnectivityWatcherAdder(ClientChannelFilter * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)885   ConnectivityWatcherAdder(
886       ClientChannelFilter* chand, grpc_connectivity_state initial_state,
887       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
888       : chand_(chand),
889         initial_state_(initial_state),
890         watcher_(std::move(watcher)) {
891     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
892     chand_->work_serializer_->Run(
893         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
894           AddWatcherLocked();
895         },
896         DEBUG_LOCATION);
897   }
898 
899  private:
AddWatcherLocked()900   void AddWatcherLocked()
901       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
902     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
903     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
904     delete this;
905   }
906 
907   ClientChannelFilter* chand_;
908   grpc_connectivity_state initial_state_;
909   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
910 };
911 
912 //
913 // ClientChannelFilter::ConnectivityWatcherRemover
914 //
915 
916 class ClientChannelFilter::ConnectivityWatcherRemover final {
917  public:
ConnectivityWatcherRemover(ClientChannelFilter * chand,AsyncConnectivityStateWatcherInterface * watcher)918   ConnectivityWatcherRemover(ClientChannelFilter* chand,
919                              AsyncConnectivityStateWatcherInterface* watcher)
920       : chand_(chand), watcher_(watcher) {
921     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
922     chand_->work_serializer_->Run(
923         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
924           RemoveWatcherLocked();
925         },
926         DEBUG_LOCATION);
927   }
928 
929  private:
RemoveWatcherLocked()930   void RemoveWatcherLocked()
931       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
932     chand_->state_tracker_.RemoveWatcher(watcher_);
933     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
934                              "ConnectivityWatcherRemover");
935     delete this;
936   }
937 
938   ClientChannelFilter* chand_;
939   AsyncConnectivityStateWatcherInterface* watcher_;
940 };
941 
942 //
943 // ClientChannelFilter::ClientChannelControlHelper
944 //
945 
946 class ClientChannelFilter::ClientChannelControlHelper final
947     : public LoadBalancingPolicy::ChannelControlHelper {
948  public:
ClientChannelControlHelper(ClientChannelFilter * chand)949   explicit ClientChannelControlHelper(ClientChannelFilter* chand)
950       : chand_(chand) {
951     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
952   }
953 
~ClientChannelControlHelper()954   ~ClientChannelControlHelper() override {
955     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
956                              "ClientChannelControlHelper");
957   }
958 
CreateSubchannel(const grpc_resolved_address & address,const ChannelArgs & per_address_args,const ChannelArgs & args)959   RefCountedPtr<SubchannelInterface> CreateSubchannel(
960       const grpc_resolved_address& address, const ChannelArgs& per_address_args,
961       const ChannelArgs& args) override
962       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
963     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
964     ChannelArgs subchannel_args = Subchannel::MakeSubchannelArgs(
965         args, per_address_args, chand_->subchannel_pool_,
966         chand_->default_authority_);
967     // Create subchannel.
968     RefCountedPtr<Subchannel> subchannel =
969         chand_->client_channel_factory_->CreateSubchannel(address,
970                                                           subchannel_args);
971     if (subchannel == nullptr) return nullptr;
972     // Make sure the subchannel has updated keepalive time.
973     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
974     // Create and return wrapper for the subchannel.
975     return MakeRefCounted<SubchannelWrapper>(chand_, std::move(subchannel));
976   }
977 
UpdateState(grpc_connectivity_state state,const absl::Status & status,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)978   void UpdateState(grpc_connectivity_state state, const absl::Status& status,
979                    RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)
980       override ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
981     if (chand_->resolver_ == nullptr) return;  // Shutting down.
982     GRPC_TRACE_LOG(client_channel, INFO)
983         << "chand=" << chand_
984         << ": update: state=" << ConnectivityStateName(state) << " status=("
985         << status << ") picker=" << picker.get()
986         << (chand_->disconnect_error_.ok()
987                 ? ""
988                 : " (ignoring -- channel shutting down)");
989     // Do update only if not shutting down.
990     if (chand_->disconnect_error_.ok()) {
991       chand_->UpdateStateAndPickerLocked(state, status, "helper",
992                                          std::move(picker));
993     }
994   }
995 
RequestReresolution()996   void RequestReresolution() override
997       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
998     if (chand_->resolver_ == nullptr) return;  // Shutting down.
999     GRPC_TRACE_LOG(client_channel, INFO)
1000         << "chand=" << chand_ << ": started name re-resolving";
1001     chand_->resolver_->RequestReresolutionLocked();
1002   }
1003 
GetTarget()1004   absl::string_view GetTarget() override { return chand_->target_uri_; }
1005 
GetAuthority()1006   absl::string_view GetAuthority() override {
1007     return chand_->default_authority_;
1008   }
1009 
GetChannelCredentials()1010   RefCountedPtr<grpc_channel_credentials> GetChannelCredentials() override {
1011     return chand_->channel_args_.GetObject<grpc_channel_credentials>()
1012         ->duplicate_without_call_credentials();
1013   }
1014 
GetUnsafeChannelCredentials()1015   RefCountedPtr<grpc_channel_credentials> GetUnsafeChannelCredentials()
1016       override {
1017     return chand_->channel_args_.GetObject<grpc_channel_credentials>()->Ref();
1018   }
1019 
GetEventEngine()1020   grpc_event_engine::experimental::EventEngine* GetEventEngine() override {
1021     return chand_->owning_stack_->EventEngine();
1022   }
1023 
GetStatsPluginGroup()1024   GlobalStatsPluginRegistry::StatsPluginGroup& GetStatsPluginGroup() override {
1025     return *chand_->owning_stack_->stats_plugin_group;
1026   }
1027 
AddTraceEvent(TraceSeverity severity,absl::string_view message)1028   void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1029       ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
1030     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1031     if (chand_->channelz_node_ != nullptr) {
1032       chand_->channelz_node_->AddTraceEvent(
1033           ConvertSeverityEnum(severity),
1034           grpc_slice_from_copied_buffer(message.data(), message.size()));
1035     }
1036   }
1037 
1038  private:
ConvertSeverityEnum(TraceSeverity severity)1039   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1040       TraceSeverity severity) {
1041     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1042     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1043     return channelz::ChannelTrace::Error;
1044   }
1045 
1046   ClientChannelFilter* chand_;
1047 };
1048 
1049 //
1050 // ClientChannelFilter implementation
1051 //
1052 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1053 grpc_error_handle ClientChannelFilter::Init(grpc_channel_element* elem,
1054                                             grpc_channel_element_args* args) {
1055   CHECK(args->is_last);
1056   CHECK(elem->filter == &kFilter);
1057   grpc_error_handle error;
1058   new (elem->channel_data) ClientChannelFilter(args, &error);
1059   return error;
1060 }
1061 
Destroy(grpc_channel_element * elem)1062 void ClientChannelFilter::Destroy(grpc_channel_element* elem) {
1063   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1064   chand->~ClientChannelFilter();
1065 }
1066 
1067 namespace {
1068 
GetSubchannelPool(const ChannelArgs & args)1069 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1070     const ChannelArgs& args) {
1071   if (args.GetBool(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL).value_or(false)) {
1072     return MakeRefCounted<LocalSubchannelPool>();
1073   }
1074   return GlobalSubchannelPool::instance();
1075 }
1076 
1077 }  // namespace
1078 
ClientChannelFilter(grpc_channel_element_args * args,grpc_error_handle * error)1079 ClientChannelFilter::ClientChannelFilter(grpc_channel_element_args* args,
1080                                          grpc_error_handle* error)
1081     : channel_args_(args->channel_args),
1082       owning_stack_(args->channel_stack),
1083       client_channel_factory_(channel_args_.GetObject<ClientChannelFactory>()),
1084       channelz_node_(channel_args_.GetObject<channelz::ChannelNode>()),
1085       interested_parties_(grpc_pollset_set_create()),
1086       service_config_parser_index_(
1087           internal::ClientChannelServiceConfigParser::ParserIndex()),
1088       work_serializer_(
1089           std::make_shared<WorkSerializer>(*args->channel_stack->event_engine)),
1090       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1091       subchannel_pool_(GetSubchannelPool(channel_args_)) {
1092   GRPC_TRACE_LOG(client_channel, INFO)
1093       << "chand=" << this << ": creating client_channel for channel stack "
1094       << owning_stack_;
1095   // Start backup polling.
1096   grpc_client_channel_start_backup_polling(interested_parties_);
1097   // Check client channel factory.
1098   if (client_channel_factory_ == nullptr) {
1099     *error = GRPC_ERROR_CREATE(
1100         "Missing client channel factory in args for client channel filter");
1101     return;
1102   }
1103   // Get default service config.  If none is specified via the client API,
1104   // we use an empty config.
1105   absl::optional<absl::string_view> service_config_json =
1106       channel_args_.GetString(GRPC_ARG_SERVICE_CONFIG);
1107   if (!service_config_json.has_value()) service_config_json = "{}";
1108   *error = absl::OkStatus();
1109   auto service_config =
1110       ServiceConfigImpl::Create(channel_args_, *service_config_json);
1111   if (!service_config.ok()) {
1112     *error = absl_status_to_grpc_error(service_config.status());
1113     return;
1114   }
1115   default_service_config_ = std::move(*service_config);
1116   // Get URI to resolve, using proxy mapper if needed.
1117   absl::optional<std::string> target_uri =
1118       channel_args_.GetOwnedString(GRPC_ARG_SERVER_URI);
1119   if (!target_uri.has_value()) {
1120     *error = GRPC_ERROR_CREATE(
1121         "target URI channel arg missing or wrong type in client channel "
1122         "filter");
1123     return;
1124   }
1125   target_uri_ = std::move(*target_uri);
1126   uri_to_resolve_ = CoreConfiguration::Get()
1127                         .proxy_mapper_registry()
1128                         .MapName(target_uri_, &channel_args_)
1129                         .value_or(target_uri_);
1130   // Make sure the URI to resolve is valid, so that we know that
1131   // resolver creation will succeed later.
1132   if (!CoreConfiguration::Get().resolver_registry().IsValidTarget(
1133           uri_to_resolve_)) {
1134     *error = GRPC_ERROR_CREATE(
1135         absl::StrCat("the target uri is not valid: ", uri_to_resolve_));
1136     return;
1137   }
1138   // Strip out service config channel arg, so that it doesn't affect
1139   // subchannel uniqueness when the args flow down to that layer.
1140   channel_args_ = channel_args_.Remove(GRPC_ARG_SERVICE_CONFIG);
1141   // Set initial keepalive time.
1142   auto keepalive_arg = channel_args_.GetInt(GRPC_ARG_KEEPALIVE_TIME_MS);
1143   if (keepalive_arg.has_value()) {
1144     keepalive_time_ = Clamp(*keepalive_arg, 1, INT_MAX);
1145   } else {
1146     keepalive_time_ = -1;  // unset
1147   }
1148   // Set default authority.
1149   absl::optional<std::string> default_authority =
1150       channel_args_.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY);
1151   if (!default_authority.has_value()) {
1152     default_authority_ =
1153         CoreConfiguration::Get().resolver_registry().GetDefaultAuthority(
1154             target_uri_);
1155   } else {
1156     default_authority_ = std::move(*default_authority);
1157   }
1158   // Success.
1159   *error = absl::OkStatus();
1160 }
1161 
~ClientChannelFilter()1162 ClientChannelFilter::~ClientChannelFilter() {
1163   GRPC_TRACE_LOG(client_channel, INFO)
1164       << "chand=" << this << ": destroying channel";
1165   DestroyResolverAndLbPolicyLocked();
1166   // Stop backup polling.
1167   grpc_client_channel_stop_backup_polling(interested_parties_);
1168   grpc_pollset_set_destroy(interested_parties_);
1169 }
1170 
1171 OrphanablePtr<ClientChannelFilter::FilterBasedLoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)1172 ClientChannelFilter::CreateLoadBalancedCall(
1173     const grpc_call_element_args& args, grpc_polling_entity* pollent,
1174     grpc_closure* on_call_destruction_complete,
1175     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry) {
1176   promise_detail::Context<Arena> arena_ctx(args.arena);
1177   return OrphanablePtr<FilterBasedLoadBalancedCall>(
1178       args.arena->New<FilterBasedLoadBalancedCall>(
1179           this, args, pollent, on_call_destruction_complete,
1180           std::move(on_commit), is_transparent_retry));
1181 }
1182 
ReprocessQueuedResolverCalls()1183 void ClientChannelFilter::ReprocessQueuedResolverCalls() {
1184   for (CallData* calld : resolver_queued_calls_) {
1185     calld->RemoveCallFromResolverQueuedCallsLocked();
1186     calld->RetryCheckResolutionLocked();
1187   }
1188   resolver_queued_calls_.clear();
1189 }
1190 
1191 namespace {
1192 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1193 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1194     const Resolver::Result& resolver_result,
1195     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1196   // Prefer the LB policy config found in the service config.
1197   if (parsed_service_config->parsed_lb_config() != nullptr) {
1198     return parsed_service_config->parsed_lb_config();
1199   }
1200   // Try the deprecated LB policy name from the service config.
1201   // If not, try the setting from channel args.
1202   absl::optional<absl::string_view> policy_name;
1203   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1204     policy_name = parsed_service_config->parsed_deprecated_lb_policy();
1205   } else {
1206     policy_name = resolver_result.args.GetString(GRPC_ARG_LB_POLICY_NAME);
1207     bool requires_config = false;
1208     if (policy_name.has_value() &&
1209         (!CoreConfiguration::Get()
1210               .lb_policy_registry()
1211               .LoadBalancingPolicyExists(*policy_name, &requires_config) ||
1212          requires_config)) {
1213       if (requires_config) {
1214         LOG(ERROR) << "LB policy: " << *policy_name
1215                    << " passed through channel_args must not "
1216                       "require a config. Using pick_first instead.";
1217       } else {
1218         LOG(ERROR) << "LB policy: " << *policy_name
1219                    << " passed through channel_args does not exist. "
1220                       "Using pick_first instead.";
1221       }
1222       policy_name = "pick_first";
1223     }
1224   }
1225   // Use pick_first if nothing was specified and we didn't select grpclb
1226   // above.
1227   if (!policy_name.has_value()) policy_name = "pick_first";
1228   // Now that we have the policy name, construct an empty config for it.
1229   Json config_json = Json::FromArray({Json::FromObject({
1230       {std::string(*policy_name), Json::FromObject({})},
1231   })});
1232   auto lb_policy_config =
1233       CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
1234           config_json);
1235   // The policy name came from one of three places:
1236   // - The deprecated loadBalancingPolicy field in the service config,
1237   //   in which case the code in ClientChannelServiceConfigParser
1238   //   already verified that the policy does not require a config.
1239   // - One of the hard-coded values here, all of which are known to not
1240   //   require a config.
1241   // - A channel arg, in which case we check that the specified policy exists
1242   //   and accepts an empty config. If not, we revert to using pick_first
1243   //   lb_policy
1244   CHECK(lb_policy_config.ok());
1245   return std::move(*lb_policy_config);
1246 }
1247 
1248 }  // namespace
1249 
OnResolverResultChangedLocked(Resolver::Result result)1250 void ClientChannelFilter::OnResolverResultChangedLocked(
1251     Resolver::Result result) {
1252   // Handle race conditions.
1253   if (resolver_ == nullptr) return;
1254   GRPC_TRACE_LOG(client_channel, INFO)
1255       << "chand=" << this << ": got resolver result";
1256   // Grab resolver result health callback.
1257   auto resolver_callback = std::move(result.result_health_callback);
1258   absl::Status resolver_result_status;
1259   // We only want to trace the address resolution in the follow cases:
1260   // (a) Address resolution resulted in service config change.
1261   // (b) Address resolution that causes number of backends to go from
1262   //     zero to non-zero.
1263   // (c) Address resolution that causes number of backends to go from
1264   //     non-zero to zero.
1265   // (d) Address resolution that causes a new LB policy to be created.
1266   //
1267   // We track a list of strings to eventually be concatenated and traced.
1268   std::vector<const char*> trace_strings;
1269   const bool resolution_contains_addresses =
1270       result.addresses.ok() && !result.addresses->empty();
1271   if (!resolution_contains_addresses &&
1272       previous_resolution_contained_addresses_) {
1273     trace_strings.push_back("Address list became empty");
1274   } else if (resolution_contains_addresses &&
1275              !previous_resolution_contained_addresses_) {
1276     trace_strings.push_back("Address list became non-empty");
1277   }
1278   previous_resolution_contained_addresses_ = resolution_contains_addresses;
1279   std::string service_config_error_string_storage;
1280   if (!result.service_config.ok()) {
1281     service_config_error_string_storage =
1282         result.service_config.status().ToString();
1283     trace_strings.push_back(service_config_error_string_storage.c_str());
1284   }
1285   // Choose the service config.
1286   RefCountedPtr<ServiceConfig> service_config;
1287   RefCountedPtr<ConfigSelector> config_selector;
1288   if (!result.service_config.ok()) {
1289     GRPC_TRACE_LOG(client_channel, INFO)
1290         << "chand=" << this << ": resolver returned service config error: "
1291         << result.service_config.status();
1292     // If the service config was invalid, then fallback to the
1293     // previously returned service config.
1294     if (saved_service_config_ != nullptr) {
1295       GRPC_TRACE_LOG(client_channel, INFO)
1296           << "chand=" << this
1297           << ": resolver returned invalid service config. "
1298              "Continuing to use previous service config.";
1299       service_config = saved_service_config_;
1300       config_selector = saved_config_selector_;
1301     } else {
1302       // We received a service config error and we don't have a
1303       // previous service config to fall back to.  Put the channel into
1304       // TRANSIENT_FAILURE.
1305       OnResolverErrorLocked(result.service_config.status());
1306       trace_strings.push_back("no valid service config");
1307       resolver_result_status =
1308           absl::UnavailableError("no valid service config");
1309     }
1310   } else if (*result.service_config == nullptr) {
1311     // Resolver did not return any service config.
1312     GRPC_TRACE_LOG(client_channel, INFO)
1313         << "chand=" << this
1314         << ": resolver returned no service config. Using default service "
1315            "config for channel.";
1316     service_config = default_service_config_;
1317   } else {
1318     // Use ServiceConfig and ConfigSelector returned by resolver.
1319     service_config = std::move(*result.service_config);
1320     config_selector = result.args.GetObjectRef<ConfigSelector>();
1321   }
1322   // Remove the config selector from channel args so that we're not holding
1323   // unnecessary refs that cause it to be destroyed somewhere other than in the
1324   // WorkSerializer.
1325   result.args = result.args.Remove(GRPC_ARG_CONFIG_SELECTOR);
1326   // Note: The only case in which service_config is null here is if the resolver
1327   // returned a service config error and we don't have a previous service
1328   // config to fall back to.
1329   if (service_config != nullptr) {
1330     // Extract global config for client channel.
1331     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1332         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1333             service_config->GetGlobalParsedConfig(
1334                 service_config_parser_index_));
1335     // Choose LB policy config.
1336     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1337         ChooseLbPolicy(result, parsed_service_config);
1338     // Check if the ServiceConfig has changed.
1339     const bool service_config_changed =
1340         saved_service_config_ == nullptr ||
1341         service_config->json_string() != saved_service_config_->json_string();
1342     // Check if the ConfigSelector has changed.
1343     const bool config_selector_changed = !ConfigSelector::Equals(
1344         saved_config_selector_.get(), config_selector.get());
1345     // If either has changed, apply the global parameters now.
1346     if (service_config_changed || config_selector_changed) {
1347       // Update service config in control plane.
1348       UpdateServiceConfigInControlPlaneLocked(
1349           std::move(service_config), std::move(config_selector),
1350           std::string(lb_policy_config->name()));
1351     } else {
1352       GRPC_TRACE_LOG(client_channel, INFO)
1353           << "chand=" << this << ": service config not changed";
1354     }
1355     // Create or update LB policy, as needed.
1356     ChannelArgs new_args = result.args;
1357     resolver_result_status = CreateOrUpdateLbPolicyLocked(
1358         std::move(lb_policy_config),
1359         parsed_service_config->health_check_service_name(), std::move(result));
1360     if (service_config_changed || config_selector_changed) {
1361       // Start using new service config for calls.
1362       // This needs to happen after the LB policy has been updated, since
1363       // the ConfigSelector may need the LB policy to know about new
1364       // destinations before it can send RPCs to those destinations.
1365       UpdateServiceConfigInDataPlaneLocked(new_args);
1366       // TODO(ncteisen): might be worth somehow including a snippet of the
1367       // config in the trace, at the risk of bloating the trace logs.
1368       trace_strings.push_back("Service config changed");
1369     }
1370   }
1371   // Invoke resolver callback if needed.
1372   if (resolver_callback != nullptr) {
1373     resolver_callback(std::move(resolver_result_status));
1374   }
1375   // Add channel trace event.
1376   if (!trace_strings.empty()) {
1377     std::string message =
1378         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1379     if (channelz_node_ != nullptr) {
1380       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1381                                     grpc_slice_from_cpp_string(message));
1382     }
1383   }
1384 }
1385 
OnResolverErrorLocked(absl::Status status)1386 void ClientChannelFilter::OnResolverErrorLocked(absl::Status status) {
1387   if (resolver_ == nullptr) return;
1388   GRPC_TRACE_LOG(client_channel, INFO)
1389       << "chand=" << this << ": resolver transient failure: " << status;
1390   // If we already have an LB policy from a previous resolution
1391   // result, then we continue to let it set the connectivity state.
1392   // Otherwise, we go into TRANSIENT_FAILURE.
1393   if (lb_policy_ == nullptr) {
1394     // Update connectivity state.
1395     UpdateStateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
1396                       "resolver failure");
1397     {
1398       MutexLock lock(&resolution_mu_);
1399       // Update resolver transient failure.
1400       resolver_transient_failure_error_ =
1401           MaybeRewriteIllegalStatusCode(status, "resolver");
1402       ReprocessQueuedResolverCalls();
1403     }
1404   }
1405 }
1406 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,const absl::optional<std::string> & health_check_service_name,Resolver::Result result)1407 absl::Status ClientChannelFilter::CreateOrUpdateLbPolicyLocked(
1408     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1409     const absl::optional<std::string>& health_check_service_name,
1410     Resolver::Result result) {
1411   // Construct update.
1412   LoadBalancingPolicy::UpdateArgs update_args;
1413   if (!result.addresses.ok()) {
1414     update_args.addresses = result.addresses.status();
1415   } else {
1416     update_args.addresses = std::make_shared<EndpointAddressesListIterator>(
1417         std::move(*result.addresses));
1418   }
1419   update_args.config = std::move(lb_policy_config);
1420   update_args.resolution_note = std::move(result.resolution_note);
1421   update_args.args = std::move(result.args);
1422   // Add health check service name to channel args.
1423   if (health_check_service_name.has_value()) {
1424     update_args.args = update_args.args.Set(GRPC_ARG_HEALTH_CHECK_SERVICE_NAME,
1425                                             *health_check_service_name);
1426   }
1427   // Create policy if needed.
1428   if (lb_policy_ == nullptr) {
1429     lb_policy_ = CreateLbPolicyLocked(update_args.args);
1430   }
1431   // Update the policy.
1432   GRPC_TRACE_LOG(client_channel, INFO)
1433       << "chand=" << this << ": Updating child policy " << lb_policy_.get();
1434   return lb_policy_->UpdateLocked(std::move(update_args));
1435 }
1436 
1437 // Creates a new LB policy.
CreateLbPolicyLocked(const ChannelArgs & args)1438 OrphanablePtr<LoadBalancingPolicy> ClientChannelFilter::CreateLbPolicyLocked(
1439     const ChannelArgs& args) {
1440   // The LB policy will start in state CONNECTING but will not
1441   // necessarily send us an update synchronously, so set state to
1442   // CONNECTING (in case the resolver had previously failed and put the
1443   // channel into TRANSIENT_FAILURE) and make sure we have a queueing picker.
1444   UpdateStateAndPickerLocked(
1445       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1446       MakeRefCounted<LoadBalancingPolicy::QueuePicker>(nullptr));
1447   // Now create the LB policy.
1448   LoadBalancingPolicy::Args lb_policy_args;
1449   lb_policy_args.work_serializer = work_serializer_;
1450   lb_policy_args.channel_control_helper =
1451       std::make_unique<ClientChannelControlHelper>(this);
1452   lb_policy_args.args = args;
1453   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1454       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1455                                          &client_channel_trace);
1456   GRPC_TRACE_LOG(client_channel, INFO)
1457       << "chand=" << this << ": created new LB policy " << lb_policy.get();
1458   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1459                                    interested_parties_);
1460   return lb_policy;
1461 }
1462 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,std::string lb_policy_name)1463 void ClientChannelFilter::UpdateServiceConfigInControlPlaneLocked(
1464     RefCountedPtr<ServiceConfig> service_config,
1465     RefCountedPtr<ConfigSelector> config_selector, std::string lb_policy_name) {
1466   std::string service_config_json(service_config->json_string());
1467   GRPC_TRACE_LOG(client_channel, INFO)
1468       << "chand=" << this << ": using service config: \"" << service_config_json
1469       << "\"";
1470   // Save service config.
1471   saved_service_config_ = std::move(service_config);
1472   // Swap out the data used by GetChannelInfo().
1473   {
1474     MutexLock lock(&info_mu_);
1475     info_lb_policy_name_ = std::move(lb_policy_name);
1476     info_service_config_json_ = std::move(service_config_json);
1477   }
1478   // Save config selector.
1479   saved_config_selector_ = std::move(config_selector);
1480   GRPC_TRACE_LOG(client_channel, INFO)
1481       << "chand=" << this << ": using ConfigSelector "
1482       << saved_config_selector_.get();
1483 }
1484 
UpdateServiceConfigInDataPlaneLocked(const ChannelArgs & args)1485 void ClientChannelFilter::UpdateServiceConfigInDataPlaneLocked(
1486     const ChannelArgs& args) {
1487   // Grab ref to service config.
1488   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1489   // Grab ref to config selector.  Use default if resolver didn't supply one.
1490   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1491   GRPC_TRACE_LOG(client_channel, INFO)
1492       << "chand=" << this << ": switching to ConfigSelector "
1493       << saved_config_selector_.get();
1494   if (config_selector == nullptr) {
1495     config_selector =
1496         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1497   }
1498   // Modify channel args.
1499   ChannelArgs new_args = args.SetObject(this).SetObject(service_config);
1500   bool enable_retries =
1501       !new_args.WantMinimalStack() &&
1502       new_args.GetBool(GRPC_ARG_ENABLE_RETRIES).value_or(true);
1503   // Construct dynamic filter stack.
1504   std::vector<const grpc_channel_filter*> filters =
1505       config_selector->GetFilters();
1506   if (enable_retries) {
1507     filters.push_back(&RetryFilter::kVtable);
1508   } else {
1509     filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1510   }
1511   auto new_blackboard = MakeRefCounted<Blackboard>();
1512   RefCountedPtr<DynamicFilters> dynamic_filters = DynamicFilters::Create(
1513       new_args, std::move(filters), blackboard_.get(), new_blackboard.get());
1514   CHECK(dynamic_filters != nullptr);
1515   blackboard_ = std::move(new_blackboard);
1516   // Grab data plane lock to update service config.
1517   //
1518   // We defer unreffing the old values (and deallocating memory) until
1519   // after releasing the lock to keep the critical section small.
1520   {
1521     MutexLock lock(&resolution_mu_);
1522     resolver_transient_failure_error_ = absl::OkStatus();
1523     // Update service config.
1524     received_service_config_data_ = true;
1525     // Old values will be unreffed after lock is released.
1526     service_config_.swap(service_config);
1527     config_selector_.swap(config_selector);
1528     dynamic_filters_.swap(dynamic_filters);
1529     // Re-process queued calls asynchronously.
1530     ReprocessQueuedResolverCalls();
1531   }
1532   // Old values will be unreffed after lock is released when they go out
1533   // of scope.
1534 }
1535 
CreateResolverLocked()1536 void ClientChannelFilter::CreateResolverLocked() {
1537   GRPC_TRACE_LOG(client_channel, INFO)
1538       << "chand=" << this << ": starting name resolution for "
1539       << uri_to_resolve_;
1540   resolver_ = CoreConfiguration::Get().resolver_registry().CreateResolver(
1541       uri_to_resolve_, channel_args_, interested_parties_, work_serializer_,
1542       std::make_unique<ResolverResultHandler>(this));
1543   // Since the validity of the args was checked when the channel was created,
1544   // CreateResolver() must return a non-null result.
1545   CHECK(resolver_ != nullptr);
1546   UpdateStateLocked(GRPC_CHANNEL_CONNECTING, absl::Status(),
1547                     "started resolving");
1548   resolver_->StartLocked();
1549   GRPC_TRACE_LOG(client_channel, INFO)
1550       << "chand=" << this << ": created resolver=" << resolver_.get();
1551 }
1552 
DestroyResolverAndLbPolicyLocked()1553 void ClientChannelFilter::DestroyResolverAndLbPolicyLocked() {
1554   if (resolver_ != nullptr) {
1555     GRPC_TRACE_LOG(client_channel, INFO)
1556         << "chand=" << this << ": shutting down resolver=" << resolver_.get();
1557     resolver_.reset();
1558     // Clear resolution state.
1559     saved_service_config_.reset();
1560     saved_config_selector_.reset();
1561     // Acquire resolution lock to update config selector and associated state.
1562     // To minimize lock contention, we wait to unref these objects until
1563     // after we release the lock.
1564     RefCountedPtr<ServiceConfig> service_config_to_unref;
1565     RefCountedPtr<ConfigSelector> config_selector_to_unref;
1566     RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1567     {
1568       MutexLock lock(&resolution_mu_);
1569       received_service_config_data_ = false;
1570       service_config_to_unref = std::move(service_config_);
1571       config_selector_to_unref = std::move(config_selector_);
1572       dynamic_filters_to_unref = std::move(dynamic_filters_);
1573     }
1574     // Clear LB policy if set.
1575     if (lb_policy_ != nullptr) {
1576       GRPC_TRACE_LOG(client_channel, INFO)
1577           << "chand=" << this
1578           << ": shutting down lb_policy=" << lb_policy_.get();
1579       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1580                                        interested_parties_);
1581       lb_policy_.reset();
1582     }
1583   }
1584 }
1585 
UpdateStateLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason)1586 void ClientChannelFilter::UpdateStateLocked(grpc_connectivity_state state,
1587                                             const absl::Status& status,
1588                                             const char* reason) {
1589   if (state != GRPC_CHANNEL_SHUTDOWN &&
1590       state_tracker_.state() == GRPC_CHANNEL_SHUTDOWN) {
1591     Crash("Illegal transition SHUTDOWN -> anything");
1592   }
1593   state_tracker_.SetState(state, status, reason);
1594   if (channelz_node_ != nullptr) {
1595     channelz_node_->SetConnectivityState(state);
1596     channelz_node_->AddTraceEvent(
1597         channelz::ChannelTrace::Severity::Info,
1598         grpc_slice_from_static_string(
1599             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1600                 state)));
1601   }
1602 }
1603 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker)1604 void ClientChannelFilter::UpdateStateAndPickerLocked(
1605     grpc_connectivity_state state, const absl::Status& status,
1606     const char* reason,
1607     RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
1608   UpdateStateLocked(state, status, reason);
1609   // Grab the LB lock to update the picker and trigger reprocessing of the
1610   // queued picks.
1611   // Old picker will be unreffed after releasing the lock.
1612   MutexLock lock(&lb_mu_);
1613   picker_.swap(picker);
1614   // Reprocess queued picks.
1615   for (auto& call : lb_queued_calls_) {
1616     call->RemoveCallFromLbQueuedCallsLocked();
1617     call->RetryPickLocked();
1618   }
1619   lb_queued_calls_.clear();
1620 }
1621 
1622 namespace {
1623 
1624 // TODO(roth): Remove this in favor of src/core/util/match.h once
1625 // we can do that without breaking lock annotations.
1626 template <typename T>
HandlePickResult(LoadBalancingPolicy::PickResult * result,std::function<T (LoadBalancingPolicy::PickResult::Complete *)> complete_func,std::function<T (LoadBalancingPolicy::PickResult::Queue *)> queue_func,std::function<T (LoadBalancingPolicy::PickResult::Fail *)> fail_func,std::function<T (LoadBalancingPolicy::PickResult::Drop *)> drop_func)1627 T HandlePickResult(
1628     LoadBalancingPolicy::PickResult* result,
1629     std::function<T(LoadBalancingPolicy::PickResult::Complete*)> complete_func,
1630     std::function<T(LoadBalancingPolicy::PickResult::Queue*)> queue_func,
1631     std::function<T(LoadBalancingPolicy::PickResult::Fail*)> fail_func,
1632     std::function<T(LoadBalancingPolicy::PickResult::Drop*)> drop_func) {
1633   auto* complete_pick =
1634       absl::get_if<LoadBalancingPolicy::PickResult::Complete>(&result->result);
1635   if (complete_pick != nullptr) {
1636     return complete_func(complete_pick);
1637   }
1638   auto* queue_pick =
1639       absl::get_if<LoadBalancingPolicy::PickResult::Queue>(&result->result);
1640   if (queue_pick != nullptr) {
1641     return queue_func(queue_pick);
1642   }
1643   auto* fail_pick =
1644       absl::get_if<LoadBalancingPolicy::PickResult::Fail>(&result->result);
1645   if (fail_pick != nullptr) {
1646     return fail_func(fail_pick);
1647   }
1648   auto* drop_pick =
1649       absl::get_if<LoadBalancingPolicy::PickResult::Drop>(&result->result);
1650   CHECK_NE(drop_pick, nullptr);
1651   return drop_func(drop_pick);
1652 }
1653 
1654 }  // namespace
1655 
DoPingLocked(grpc_transport_op * op)1656 grpc_error_handle ClientChannelFilter::DoPingLocked(grpc_transport_op* op) {
1657   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1658     return GRPC_ERROR_CREATE("channel not connected");
1659   }
1660   LoadBalancingPolicy::PickResult result;
1661   {
1662     MutexLock lock(&lb_mu_);
1663     result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1664   }
1665   return HandlePickResult<grpc_error_handle>(
1666       &result,
1667       // Complete pick.
1668       [op](LoadBalancingPolicy::PickResult::Complete* complete_pick)
1669           ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1670               *ClientChannelFilter::work_serializer_) {
1671             SubchannelWrapper* subchannel = static_cast<SubchannelWrapper*>(
1672                 complete_pick->subchannel.get());
1673             RefCountedPtr<ConnectedSubchannel> connected_subchannel =
1674                 subchannel->connected_subchannel();
1675             if (connected_subchannel == nullptr) {
1676               return GRPC_ERROR_CREATE("LB pick for ping not connected");
1677             }
1678             connected_subchannel->Ping(op->send_ping.on_initiate,
1679                                        op->send_ping.on_ack);
1680             return absl::OkStatus();
1681           },
1682       // Queue pick.
1683       [](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
1684         return GRPC_ERROR_CREATE("LB picker queued call");
1685       },
1686       // Fail pick.
1687       [](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
1688         return absl_status_to_grpc_error(fail_pick->status);
1689       },
1690       // Drop pick.
1691       [](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
1692         return absl_status_to_grpc_error(drop_pick->status);
1693       });
1694 }
1695 
StartTransportOpLocked(grpc_transport_op * op)1696 void ClientChannelFilter::StartTransportOpLocked(grpc_transport_op* op) {
1697   // Connectivity watch.
1698   if (op->start_connectivity_watch != nullptr) {
1699     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1700                               std::move(op->start_connectivity_watch));
1701   }
1702   if (op->stop_connectivity_watch != nullptr) {
1703     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1704   }
1705   // Ping.
1706   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1707     grpc_error_handle error = DoPingLocked(op);
1708     if (!error.ok()) {
1709       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate, error);
1710       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1711     }
1712     op->bind_pollset = nullptr;
1713     op->send_ping.on_initiate = nullptr;
1714     op->send_ping.on_ack = nullptr;
1715   }
1716   // Reset backoff.
1717   if (op->reset_connect_backoff) {
1718     if (lb_policy_ != nullptr) {
1719       lb_policy_->ResetBackoffLocked();
1720     }
1721   }
1722   // Disconnect or enter IDLE.
1723   if (!op->disconnect_with_error.ok()) {
1724     GRPC_TRACE_LOG(client_channel, INFO)
1725         << "chand=" << this << ": disconnect_with_error: "
1726         << StatusToString(op->disconnect_with_error);
1727     DestroyResolverAndLbPolicyLocked();
1728     intptr_t value;
1729     if (grpc_error_get_int(op->disconnect_with_error,
1730                            StatusIntProperty::ChannelConnectivityState,
1731                            &value) &&
1732         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1733       if (disconnect_error_.ok()) {  // Ignore if we're shutting down.
1734         // Enter IDLE state.
1735         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1736                                    "channel entering IDLE", nullptr);
1737         // TODO(roth): Do we need to check for any queued picks here, in
1738         // case there's a race condition in the client_idle filter?
1739         // And maybe also check for calls in the resolver queue?
1740       }
1741     } else {
1742       // Disconnect.
1743       CHECK(disconnect_error_.ok());
1744       disconnect_error_ = op->disconnect_with_error;
1745       UpdateStateAndPickerLocked(
1746           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1747           MakeRefCounted<LoadBalancingPolicy::TransientFailurePicker>(
1748               grpc_error_to_absl_status(op->disconnect_with_error)));
1749       // TODO(roth): If this happens when we're still waiting for a
1750       // resolver result, we need to trigger failures for all calls in
1751       // the resolver queue here.
1752     }
1753   }
1754   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1755   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, absl::OkStatus());
1756 }
1757 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1758 void ClientChannelFilter::StartTransportOp(grpc_channel_element* elem,
1759                                            grpc_transport_op* op) {
1760   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1761   CHECK(op->set_accept_stream == false);
1762   // Handle bind_pollset.
1763   if (op->bind_pollset != nullptr) {
1764     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1765   }
1766   // Pop into control plane work_serializer for remaining ops.
1767   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1768   chand->work_serializer_->Run(
1769       [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
1770         chand->StartTransportOpLocked(op);
1771       },
1772       DEBUG_LOCATION);
1773 }
1774 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1775 void ClientChannelFilter::GetChannelInfo(grpc_channel_element* elem,
1776                                          const grpc_channel_info* info) {
1777   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
1778   MutexLock lock(&chand->info_mu_);
1779   if (info->lb_policy_name != nullptr) {
1780     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.c_str());
1781   }
1782   if (info->service_config_json != nullptr) {
1783     *info->service_config_json =
1784         gpr_strdup(chand->info_service_config_json_.c_str());
1785   }
1786 }
1787 
TryToConnectLocked()1788 void ClientChannelFilter::TryToConnectLocked() {
1789   if (disconnect_error_.ok()) {
1790     if (lb_policy_ != nullptr) {
1791       lb_policy_->ExitIdleLocked();
1792     } else if (resolver_ == nullptr) {
1793       CreateResolverLocked();
1794     }
1795   }
1796   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1797 }
1798 
CheckConnectivityState(bool try_to_connect)1799 grpc_connectivity_state ClientChannelFilter::CheckConnectivityState(
1800     bool try_to_connect) {
1801   // state_tracker_ is guarded by work_serializer_, which we're not
1802   // holding here.  But the one method of state_tracker_ that *is*
1803   // thread-safe to call without external synchronization is the state()
1804   // method, so we can disable thread-safety analysis for this one read.
1805   grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1806   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1807     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1808     work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1809                               *work_serializer_) { TryToConnectLocked(); },
1810                           DEBUG_LOCATION);
1811   }
1812   return out;
1813 }
1814 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1815 void ClientChannelFilter::AddConnectivityWatcher(
1816     grpc_connectivity_state initial_state,
1817     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1818   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1819 }
1820 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1821 void ClientChannelFilter::RemoveConnectivityWatcher(
1822     AsyncConnectivityStateWatcherInterface* watcher) {
1823   new ConnectivityWatcherRemover(this, watcher);
1824 }
1825 
1826 //
1827 // CallData implementation
1828 //
1829 
RemoveCallFromResolverQueuedCallsLocked()1830 void ClientChannelFilter::CallData::RemoveCallFromResolverQueuedCallsLocked() {
1831   GRPC_TRACE_LOG(client_channel_call, INFO)
1832       << "chand=" << chand() << " calld=" << this
1833       << ": removing from resolver queued picks list";
1834   // Remove call's pollent from channel's interested_parties.
1835   grpc_polling_entity_del_from_pollset_set(pollent(),
1836                                            chand()->interested_parties_);
1837   // Note: There's no need to actually remove the call from the queue
1838   // here, because that will be done in
1839   // ResolverQueuedCallCanceller::CancelLocked() or
1840   // ClientChannelFilter::ReprocessQueuedResolverCalls().
1841 }
1842 
AddCallToResolverQueuedCallsLocked()1843 void ClientChannelFilter::CallData::AddCallToResolverQueuedCallsLocked() {
1844   GRPC_TRACE_LOG(client_channel_call, INFO)
1845       << "chand=" << chand() << " calld=" << this
1846       << ": adding to resolver queued picks list; pollent="
1847       << grpc_polling_entity_string(pollent());
1848   // Add call's pollent to channel's interested_parties, so that I/O
1849   // can be done under the call's CQ.
1850   grpc_polling_entity_add_to_pollset_set(pollent(),
1851                                          chand()->interested_parties_);
1852   // Add to queue.
1853   chand()->resolver_queued_calls_.insert(this);
1854   OnAddToQueueLocked();
1855 }
1856 
ApplyServiceConfigToCallLocked(const absl::StatusOr<RefCountedPtr<ConfigSelector>> & config_selector)1857 grpc_error_handle ClientChannelFilter::CallData::ApplyServiceConfigToCallLocked(
1858     const absl::StatusOr<RefCountedPtr<ConfigSelector>>& config_selector) {
1859   GRPC_TRACE_LOG(client_channel_call, INFO)
1860       << "chand=" << chand() << " calld=" << this
1861       << ": applying service config to call";
1862   if (!config_selector.ok()) return config_selector.status();
1863   // Create a ClientChannelServiceConfigCallData for the call.  This stores
1864   // a ref to the ServiceConfig and caches the right set of parsed configs
1865   // to use for the call.  The ClientChannelServiceConfigCallData will store
1866   // itself in the call context, so that it can be accessed by filters
1867   // below us in the stack, and it will be cleaned up when the call ends.
1868   auto* service_config_call_data =
1869       arena()->New<ClientChannelServiceConfigCallData>(arena());
1870   // Use the ConfigSelector to determine the config for the call.
1871   absl::Status call_config_status =
1872       (*config_selector)
1873           ->GetCallConfig(
1874               {send_initial_metadata(), arena(), service_config_call_data});
1875   if (!call_config_status.ok()) {
1876     return absl_status_to_grpc_error(
1877         MaybeRewriteIllegalStatusCode(call_config_status, "ConfigSelector"));
1878   }
1879   // Apply our own method params to the call.
1880   auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
1881       service_config_call_data->GetMethodParsedConfig(
1882           chand()->service_config_parser_index_));
1883   if (method_params != nullptr) {
1884     // If the deadline from the service config is shorter than the one
1885     // from the client API, reset the deadline timer.
1886     if (method_params->timeout() != Duration::Zero()) {
1887       ResetDeadline(method_params->timeout());
1888     }
1889     // If the service config set wait_for_ready and the application
1890     // did not explicitly set it, use the value from the service config.
1891     auto* wait_for_ready =
1892         send_initial_metadata()->GetOrCreatePointer(WaitForReady());
1893     if (method_params->wait_for_ready().has_value() &&
1894         !wait_for_ready->explicitly_set) {
1895       wait_for_ready->value = method_params->wait_for_ready().value();
1896     }
1897   }
1898   return absl::OkStatus();
1899 }
1900 
CheckResolution(bool was_queued)1901 absl::optional<absl::Status> ClientChannelFilter::CallData::CheckResolution(
1902     bool was_queued) {
1903   // Check if we have a resolver result to use.
1904   absl::StatusOr<RefCountedPtr<ConfigSelector>> config_selector;
1905   {
1906     MutexLock lock(&chand()->resolution_mu_);
1907     bool result_ready = CheckResolutionLocked(&config_selector);
1908     // If no result is available, queue the call.
1909     if (!result_ready) {
1910       AddCallToResolverQueuedCallsLocked();
1911       return absl::nullopt;
1912     }
1913   }
1914   // We have a result.  Apply service config to call.
1915   grpc_error_handle error = ApplyServiceConfigToCallLocked(config_selector);
1916   // ConfigSelector must be unreffed inside the WorkSerializer.
1917   if (!IsWorkSerializerDispatchEnabled() && config_selector.ok()) {
1918     chand()->work_serializer_->Run(
1919         [config_selector = std::move(*config_selector)]() mutable {
1920           config_selector.reset();
1921         },
1922         DEBUG_LOCATION);
1923   }
1924   // Handle errors.
1925   if (!error.ok()) {
1926     GRPC_TRACE_LOG(client_channel_call, INFO)
1927         << "chand=" << chand() << " calld=" << this
1928         << ": error applying config to call: error=" << StatusToString(error);
1929     return error;
1930   }
1931   // If the call was queued, add trace annotation.
1932   if (was_queued) {
1933     auto* call_tracer = arena()->GetContext<CallTracerAnnotationInterface>();
1934     if (call_tracer != nullptr) {
1935       call_tracer->RecordAnnotation("Delayed name resolution complete.");
1936     }
1937   }
1938   return absl::OkStatus();
1939 }
1940 
CheckResolutionLocked(absl::StatusOr<RefCountedPtr<ConfigSelector>> * config_selector)1941 bool ClientChannelFilter::CallData::CheckResolutionLocked(
1942     absl::StatusOr<RefCountedPtr<ConfigSelector>>* config_selector) {
1943   // If we don't yet have a resolver result, we need to queue the call
1944   // until we get one.
1945   if (GPR_UNLIKELY(!chand()->received_service_config_data_)) {
1946     // If the resolver returned transient failure before returning the
1947     // first service config, fail any non-wait_for_ready calls.
1948     absl::Status resolver_error = chand()->resolver_transient_failure_error_;
1949     if (!resolver_error.ok() &&
1950         !send_initial_metadata()->GetOrCreatePointer(WaitForReady())->value) {
1951       GRPC_TRACE_LOG(client_channel_call, INFO)
1952           << "chand=" << chand() << " calld=" << this
1953           << ": resolution failed, failing call";
1954       *config_selector = absl_status_to_grpc_error(resolver_error);
1955       return true;
1956     }
1957     // Either the resolver has not yet returned a result, or it has
1958     // returned transient failure but the call is wait_for_ready.  In
1959     // either case, queue the call.
1960     GRPC_TRACE_LOG(client_channel_call, INFO)
1961         << "chand=" << chand() << " calld=" << this
1962         << ": no resolver result yet";
1963     return false;
1964   }
1965   // Result found.
1966   *config_selector = chand()->config_selector_;
1967   dynamic_filters_ = chand()->dynamic_filters_;
1968   return true;
1969 }
1970 
1971 //
1972 // FilterBasedCallData implementation
1973 //
1974 
FilterBasedCallData(grpc_call_element * elem,const grpc_call_element_args & args)1975 ClientChannelFilter::FilterBasedCallData::FilterBasedCallData(
1976     grpc_call_element* elem, const grpc_call_element_args& args)
1977     : path_(CSliceRef(args.path)),
1978       call_start_time_(args.start_time),
1979       deadline_(args.deadline),
1980       arena_(args.arena),
1981       elem_(elem),
1982       owning_call_(args.call_stack),
1983       call_combiner_(args.call_combiner) {
1984   GRPC_TRACE_LOG(client_channel_call, INFO)
1985       << "chand=" << chand() << " calld=" << this << ": created call";
1986 }
1987 
~FilterBasedCallData()1988 ClientChannelFilter::FilterBasedCallData::~FilterBasedCallData() {
1989   CSliceUnref(path_);
1990   // Make sure there are no remaining pending batches.
1991   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1992     CHECK_EQ(pending_batches_[i], nullptr);
1993   }
1994 }
1995 
Init(grpc_call_element * elem,const grpc_call_element_args * args)1996 grpc_error_handle ClientChannelFilter::FilterBasedCallData::Init(
1997     grpc_call_element* elem, const grpc_call_element_args* args) {
1998   new (elem->call_data) FilterBasedCallData(elem, *args);
1999   return absl::OkStatus();
2000 }
2001 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2002 void ClientChannelFilter::FilterBasedCallData::Destroy(
2003     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
2004     grpc_closure* then_schedule_closure) {
2005   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2006   RefCountedPtr<DynamicFilters::Call> dynamic_call =
2007       std::move(calld->dynamic_call_);
2008   calld->~FilterBasedCallData();
2009   if (GPR_LIKELY(dynamic_call != nullptr)) {
2010     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2011   } else {
2012     // TODO(yashkt) : This can potentially be a Closure::Run
2013     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, absl::OkStatus());
2014   }
2015 }
2016 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2017 void ClientChannelFilter::FilterBasedCallData::StartTransportStreamOpBatch(
2018     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2019   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2020   auto* chand = static_cast<ClientChannelFilter*>(elem->channel_data);
2021   if (GRPC_TRACE_FLAG_ENABLED(client_channel_call) &&
2022       !GRPC_TRACE_FLAG_ENABLED(channel)) {
2023     LOG(INFO) << "chand=" << chand << " calld=" << calld
2024               << ": batch started from above: "
2025               << grpc_transport_stream_op_batch_string(batch, false);
2026   }
2027   // Intercept recv_trailing_metadata to commit the call, in case we wind up
2028   // failing the call before we get down to the retry or LB call layer.
2029   if (batch->recv_trailing_metadata) {
2030     calld->original_recv_trailing_metadata_ready_ =
2031         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2032     GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_ready_,
2033                       RecvTrailingMetadataReadyForConfigSelectorCommitCallback,
2034                       calld, nullptr);
2035     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2036         &calld->recv_trailing_metadata_ready_;
2037   }
2038   // If we already have a dynamic call, pass the batch down to it.
2039   // Note that once we have done so, we do not need to acquire the channel's
2040   // resolution mutex, which is more efficient (especially for streaming calls).
2041   if (calld->dynamic_call_ != nullptr) {
2042     GRPC_TRACE_LOG(client_channel_call, INFO)
2043         << "chand=" << chand << " calld=" << calld
2044         << ": starting batch on dynamic_call=" << calld->dynamic_call_.get();
2045     calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2046     return;
2047   }
2048   // We do not yet have a dynamic call.
2049   //
2050   // If we've previously been cancelled, immediately fail any new batches.
2051   if (GPR_UNLIKELY(!calld->cancel_error_.ok())) {
2052     GRPC_TRACE_LOG(client_channel_call, INFO)
2053         << "chand=" << chand << " calld=" << calld
2054         << ": failing batch with error: "
2055         << StatusToString(calld->cancel_error_);
2056     // Note: This will release the call combiner.
2057     grpc_transport_stream_op_batch_finish_with_failure(
2058         batch, calld->cancel_error_, calld->call_combiner());
2059     return;
2060   }
2061   // Handle cancellation.
2062   if (GPR_UNLIKELY(batch->cancel_stream)) {
2063     // Stash a copy of cancel_error in our call data, so that we can use
2064     // it for subsequent operations.  This ensures that if the call is
2065     // cancelled before any batches are passed down (e.g., if the deadline
2066     // is in the past when the call starts), we can return the right
2067     // error to the caller when the first batch does get passed down.
2068     calld->cancel_error_ = batch->payload->cancel_stream.cancel_error;
2069     GRPC_TRACE_LOG(client_channel_call, INFO)
2070         << "chand=" << chand << " calld=" << calld
2071         << ": recording cancel_error=" << StatusToString(calld->cancel_error_);
2072     // Fail all pending batches.
2073     calld->PendingBatchesFail(calld->cancel_error_, NoYieldCallCombiner);
2074     // Note: This will release the call combiner.
2075     grpc_transport_stream_op_batch_finish_with_failure(
2076         batch, calld->cancel_error_, calld->call_combiner());
2077     return;
2078   }
2079   // Add the batch to the pending list.
2080   calld->PendingBatchesAdd(batch);
2081   // For batches containing a send_initial_metadata op, acquire the
2082   // channel's resolution mutex to apply the service config to the call,
2083   // after which we will create a dynamic call.
2084   if (GPR_LIKELY(batch->send_initial_metadata)) {
2085     GRPC_TRACE_LOG(client_channel_call, INFO)
2086         << "chand=" << chand << " calld=" << calld
2087         << ": grabbing resolution mutex to apply service ";
2088     // If we're still in IDLE, we need to start resolving.
2089     if (GPR_UNLIKELY(chand->CheckConnectivityState(false) ==
2090                      GRPC_CHANNEL_IDLE)) {
2091       GRPC_TRACE_LOG(client_channel_call, INFO)
2092           << "chand=" << chand << " calld=" << calld
2093           << ": triggering exit idle";
2094       // Bounce into the control plane work serializer to start resolving.
2095       GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "ExitIdle");
2096       chand->work_serializer_->Run(
2097           [chand]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand->work_serializer_) {
2098             chand->CheckConnectivityState(/*try_to_connect=*/true);
2099             GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_, "ExitIdle");
2100           },
2101           DEBUG_LOCATION);
2102     }
2103     calld->TryCheckResolution(/*was_queued=*/false);
2104   } else {
2105     // For all other batches, release the call combiner.
2106     GRPC_TRACE_LOG(client_channel_call, INFO)
2107         << "chand=" << chand << " calld=" << calld
2108         << ": saved batch, yielding call combiner";
2109     GRPC_CALL_COMBINER_STOP(calld->call_combiner(),
2110                             "batch does not include send_initial_metadata");
2111   }
2112 }
2113 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2114 void ClientChannelFilter::FilterBasedCallData::SetPollent(
2115     grpc_call_element* elem, grpc_polling_entity* pollent) {
2116   auto* calld = static_cast<FilterBasedCallData*>(elem->call_data);
2117   calld->pollent_ = pollent;
2118 }
2119 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2120 size_t ClientChannelFilter::FilterBasedCallData::GetBatchIndex(
2121     grpc_transport_stream_op_batch* batch) {
2122   // Note: It is important the send_initial_metadata be the first entry
2123   // here, since the code in CheckResolution() assumes it will be.
2124   if (batch->send_initial_metadata) return 0;
2125   if (batch->send_message) return 1;
2126   if (batch->send_trailing_metadata) return 2;
2127   if (batch->recv_initial_metadata) return 3;
2128   if (batch->recv_message) return 4;
2129   if (batch->recv_trailing_metadata) return 5;
2130   GPR_UNREACHABLE_CODE(return (size_t)-1);
2131 }
2132 
2133 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2134 void ClientChannelFilter::FilterBasedCallData::PendingBatchesAdd(
2135     grpc_transport_stream_op_batch* batch) {
2136   const size_t idx = GetBatchIndex(batch);
2137   GRPC_TRACE_LOG(client_channel_call, INFO)
2138       << "chand=" << chand() << " calld=" << this
2139       << ": adding pending batch at index " << idx;
2140   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2141   CHECK_EQ(pending, nullptr);
2142   pending = batch;
2143 }
2144 
2145 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2146 void ClientChannelFilter::FilterBasedCallData::FailPendingBatchInCallCombiner(
2147     void* arg, grpc_error_handle error) {
2148   grpc_transport_stream_op_batch* batch =
2149       static_cast<grpc_transport_stream_op_batch*>(arg);
2150   auto* calld =
2151       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2152   // Note: This will release the call combiner.
2153   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2154                                                      calld->call_combiner());
2155 }
2156 
2157 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2158 void ClientChannelFilter::FilterBasedCallData::PendingBatchesFail(
2159     grpc_error_handle error,
2160     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2161   CHECK(!error.ok());
2162   if (GRPC_TRACE_FLAG_ENABLED(client_channel_call)) {
2163     size_t num_batches = 0;
2164     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2165       if (pending_batches_[i] != nullptr) ++num_batches;
2166     }
2167     LOG(INFO) << "chand=" << chand() << " calld=" << this << ": failing "
2168               << num_batches << " pending batches: " << StatusToString(error);
2169   }
2170   CallCombinerClosureList closures;
2171   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2172     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2173     if (batch != nullptr) {
2174       batch->handler_private.extra_arg = this;
2175       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2176                         FailPendingBatchInCallCombiner, batch,
2177                         grpc_schedule_on_exec_ctx);
2178       closures.Add(&batch->handler_private.closure, error,
2179                    "PendingBatchesFail");
2180       batch = nullptr;
2181     }
2182   }
2183   if (yield_call_combiner_predicate(closures)) {
2184     closures.RunClosures(call_combiner());
2185   } else {
2186     closures.RunClosuresWithoutYielding(call_combiner());
2187   }
2188 }
2189 
2190 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2191 void ClientChannelFilter::FilterBasedCallData::ResumePendingBatchInCallCombiner(
2192     void* arg, grpc_error_handle /*ignored*/) {
2193   grpc_transport_stream_op_batch* batch =
2194       static_cast<grpc_transport_stream_op_batch*>(arg);
2195   auto* calld =
2196       static_cast<FilterBasedCallData*>(batch->handler_private.extra_arg);
2197   // Note: This will release the call combiner.
2198   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2199 }
2200 
2201 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2202 void ClientChannelFilter::FilterBasedCallData::PendingBatchesResume() {
2203   // Retries not enabled; send down batches as-is.
2204   if (GRPC_TRACE_FLAG_ENABLED(client_channel_call)) {
2205     size_t num_batches = 0;
2206     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2207       if (pending_batches_[i] != nullptr) ++num_batches;
2208     }
2209     LOG(INFO) << "chand=" << chand() << " calld=" << this << ": starting "
2210               << num_batches
2211               << " pending batches on dynamic_call=" << dynamic_call_.get();
2212   }
2213   CallCombinerClosureList closures;
2214   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2215     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2216     if (batch != nullptr) {
2217       batch->handler_private.extra_arg = this;
2218       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2219                         ResumePendingBatchInCallCombiner, batch, nullptr);
2220       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2221                    "resuming pending batch from client channel call");
2222       batch = nullptr;
2223     }
2224   }
2225   // Note: This will release the call combiner.
2226   closures.RunClosures(call_combiner());
2227 }
2228 
2229 // A class to handle the call combiner cancellation callback for a
2230 // queued pick.
2231 class ClientChannelFilter::FilterBasedCallData::ResolverQueuedCallCanceller
2232     final {
2233  public:
ResolverQueuedCallCanceller(FilterBasedCallData * calld)2234   explicit ResolverQueuedCallCanceller(FilterBasedCallData* calld)
2235       : calld_(calld) {
2236     GRPC_CALL_STACK_REF(calld->owning_call(), "ResolverQueuedCallCanceller");
2237     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2238                       grpc_schedule_on_exec_ctx);
2239     calld->call_combiner()->SetNotifyOnCancel(&closure_);
2240   }
2241 
2242  private:
CancelLocked(void * arg,grpc_error_handle error)2243   static void CancelLocked(void* arg, grpc_error_handle error) {
2244     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2245     auto* calld = self->calld_;
2246     auto* chand = calld->chand();
2247     {
2248       MutexLock lock(&chand->resolution_mu_);
2249       GRPC_TRACE_LOG(client_channel_call, INFO)
2250           << "chand=" << chand << " calld=" << calld
2251           << ": cancelling resolver queued pick: "
2252              "error="
2253           << StatusToString(error) << " self=" << self
2254           << " calld->resolver_pick_canceller="
2255           << calld->resolver_call_canceller_;
2256       if (calld->resolver_call_canceller_ == self && !error.ok()) {
2257         // Remove pick from list of queued picks.
2258         calld->RemoveCallFromResolverQueuedCallsLocked();
2259         chand->resolver_queued_calls_.erase(calld);
2260         // Fail pending batches on the call.
2261         calld->PendingBatchesFail(error,
2262                                   YieldCallCombinerIfPendingBatchesFound);
2263       }
2264     }
2265     GRPC_CALL_STACK_UNREF(calld->owning_call(), "ResolvingQueuedCallCanceller");
2266     delete self;
2267   }
2268 
2269   FilterBasedCallData* calld_;
2270   grpc_closure closure_;
2271 };
2272 
TryCheckResolution(bool was_queued)2273 void ClientChannelFilter::FilterBasedCallData::TryCheckResolution(
2274     bool was_queued) {
2275   auto result = CheckResolution(was_queued);
2276   if (result.has_value()) {
2277     if (!result->ok()) {
2278       PendingBatchesFail(*result, YieldCallCombiner);
2279       return;
2280     }
2281     CreateDynamicCall();
2282   }
2283 }
2284 
OnAddToQueueLocked()2285 void ClientChannelFilter::FilterBasedCallData::OnAddToQueueLocked() {
2286   // Register call combiner cancellation callback.
2287   resolver_call_canceller_ = new ResolverQueuedCallCanceller(this);
2288 }
2289 
RetryCheckResolutionLocked()2290 void ClientChannelFilter::FilterBasedCallData::RetryCheckResolutionLocked() {
2291   // Lame the call combiner canceller.
2292   resolver_call_canceller_ = nullptr;
2293   // Do an async callback to resume call processing, so that we're not
2294   // doing it while holding the channel's resolution mutex.
2295   chand()->owning_stack_->EventEngine()->Run([this]() {
2296     ApplicationCallbackExecCtx application_exec_ctx;
2297     ExecCtx exec_ctx;
2298     TryCheckResolution(/*was_queued=*/true);
2299   });
2300 }
2301 
CreateDynamicCall()2302 void ClientChannelFilter::FilterBasedCallData::CreateDynamicCall() {
2303   DynamicFilters::Call::Args args = {dynamic_filters(), pollent_,  path_,
2304                                      call_start_time_,  deadline_, arena(),
2305                                      call_combiner()};
2306   grpc_error_handle error;
2307   DynamicFilters* channel_stack = args.channel_stack.get();
2308   GRPC_TRACE_LOG(client_channel_call, INFO)
2309       << "chand=" << chand() << " calld=" << this
2310       << ": creating dynamic call stack on channel_stack=" << channel_stack;
2311   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2312   if (!error.ok()) {
2313     GRPC_TRACE_LOG(client_channel_call, INFO)
2314         << "chand=" << chand() << " calld=" << this
2315         << ": failed to create dynamic call: error=" << StatusToString(error);
2316     PendingBatchesFail(error, YieldCallCombiner);
2317     return;
2318   }
2319   PendingBatchesResume();
2320 }
2321 
2322 void ClientChannelFilter::FilterBasedCallData::
RecvTrailingMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2323     RecvTrailingMetadataReadyForConfigSelectorCommitCallback(
2324         void* arg, grpc_error_handle error) {
2325   auto* calld = static_cast<FilterBasedCallData*>(arg);
2326   auto* chand = calld->chand();
2327   auto* service_config_call_data = GetServiceConfigCallData(calld->arena());
2328   GRPC_TRACE_LOG(client_channel_call, INFO)
2329       << "chand=" << chand << " calld=" << calld
2330       << ": got recv_trailing_metadata_ready: error=" << StatusToString(error)
2331       << " service_config_call_data=" << service_config_call_data;
2332   if (service_config_call_data != nullptr) {
2333     service_config_call_data->Commit();
2334   }
2335   // Chain to original callback.
2336   Closure::Run(DEBUG_LOCATION, calld->original_recv_trailing_metadata_ready_,
2337                error);
2338 }
2339 
2340 //
2341 // ClientChannelFilter::LoadBalancedCall::LbCallState
2342 //
2343 
2344 class ClientChannelFilter::LoadBalancedCall::LbCallState final
2345     : public ClientChannelLbCallState {
2346  public:
LbCallState(LoadBalancedCall * lb_call)2347   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2348 
Alloc(size_t size)2349   void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
2350 
2351   // Internal API to allow first-party LB policies to access per-call
2352   // attributes set by the ConfigSelector.
2353   ServiceConfigCallData::CallAttributeInterface* GetCallAttribute(
2354       UniqueTypeName type) const override;
2355 
2356   ClientCallTracer::CallAttemptTracer* GetCallAttemptTracer() const override;
2357 
2358  private:
2359   LoadBalancedCall* lb_call_;
2360 };
2361 
2362 //
2363 // ClientChannelFilter::LoadBalancedCall::LbCallState
2364 //
2365 
2366 ServiceConfigCallData::CallAttributeInterface*
GetCallAttribute(UniqueTypeName type) const2367 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttribute(
2368     UniqueTypeName type) const {
2369   auto* service_config_call_data = GetServiceConfigCallData(lb_call_->arena_);
2370   return service_config_call_data->GetCallAttribute(type);
2371 }
2372 
2373 ClientCallTracer::CallAttemptTracer*
GetCallAttemptTracer() const2374 ClientChannelFilter::LoadBalancedCall::LbCallState::GetCallAttemptTracer()
2375     const {
2376   return lb_call_->call_attempt_tracer();
2377 }
2378 
2379 //
2380 // ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor
2381 //
2382 
2383 class ClientChannelFilter::LoadBalancedCall::BackendMetricAccessor final
2384     : public LoadBalancingPolicy::BackendMetricAccessor {
2385  public:
BackendMetricAccessor(LoadBalancedCall * lb_call,grpc_metadata_batch * recv_trailing_metadata)2386   BackendMetricAccessor(LoadBalancedCall* lb_call,
2387                         grpc_metadata_batch* recv_trailing_metadata)
2388       : lb_call_(lb_call), recv_trailing_metadata_(recv_trailing_metadata) {}
2389 
GetBackendMetricData()2390   const BackendMetricData* GetBackendMetricData() override {
2391     if (lb_call_->backend_metric_data_ == nullptr &&
2392         recv_trailing_metadata_ != nullptr) {
2393       if (const auto* md = recv_trailing_metadata_->get_pointer(
2394               EndpointLoadMetricsBinMetadata())) {
2395         BackendMetricAllocator allocator(lb_call_->arena_);
2396         lb_call_->backend_metric_data_ =
2397             ParseBackendMetricData(md->as_string_view(), &allocator);
2398       }
2399     }
2400     return lb_call_->backend_metric_data_;
2401   }
2402 
2403  private:
2404   class BackendMetricAllocator final : public BackendMetricAllocatorInterface {
2405    public:
BackendMetricAllocator(Arena * arena)2406     explicit BackendMetricAllocator(Arena* arena) : arena_(arena) {}
2407 
AllocateBackendMetricData()2408     BackendMetricData* AllocateBackendMetricData() override {
2409       return arena_->New<BackendMetricData>();
2410     }
2411 
AllocateString(size_t size)2412     char* AllocateString(size_t size) override {
2413       return static_cast<char*>(arena_->Alloc(size));
2414     }
2415 
2416    private:
2417     Arena* arena_;
2418   };
2419 
2420   LoadBalancedCall* lb_call_;
2421   grpc_metadata_batch* recv_trailing_metadata_;
2422 };
2423 
2424 //
2425 // ClientChannelFilter::LoadBalancedCall
2426 //
2427 
2428 namespace {
2429 
CreateCallAttemptTracer(Arena * arena,bool is_transparent_retry)2430 void CreateCallAttemptTracer(Arena* arena, bool is_transparent_retry) {
2431   auto* call_tracer = DownCast<ClientCallTracer*>(
2432       arena->GetContext<CallTracerAnnotationInterface>());
2433   if (call_tracer == nullptr) return;
2434   auto* tracer = call_tracer->StartNewAttempt(is_transparent_retry);
2435   arena->SetContext<CallTracerInterface>(tracer);
2436 }
2437 
2438 }  // namespace
2439 
LoadBalancedCall(ClientChannelFilter * chand,Arena * arena,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2440 ClientChannelFilter::LoadBalancedCall::LoadBalancedCall(
2441     ClientChannelFilter* chand, Arena* arena,
2442     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2443     : InternallyRefCounted(GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)
2444                                ? "LoadBalancedCall"
2445                                : nullptr),
2446       chand_(chand),
2447       on_commit_(std::move(on_commit)),
2448       arena_(arena) {
2449   CreateCallAttemptTracer(arena, is_transparent_retry);
2450   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2451       << "chand=" << chand_ << " lb_call=" << this << ": created";
2452 }
2453 
~LoadBalancedCall()2454 ClientChannelFilter::LoadBalancedCall::~LoadBalancedCall() {
2455   if (backend_metric_data_ != nullptr) {
2456     backend_metric_data_->BackendMetricData::~BackendMetricData();
2457   }
2458 }
2459 
RecordCallCompletion(absl::Status status,grpc_metadata_batch * recv_trailing_metadata,grpc_transport_stream_stats * transport_stream_stats,absl::string_view peer_address)2460 void ClientChannelFilter::LoadBalancedCall::RecordCallCompletion(
2461     absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
2462     grpc_transport_stream_stats* transport_stream_stats,
2463     absl::string_view peer_address) {
2464   // If we have a tracer, notify it.
2465   if (call_attempt_tracer() != nullptr) {
2466     call_attempt_tracer()->RecordReceivedTrailingMetadata(
2467         status, recv_trailing_metadata, transport_stream_stats);
2468   }
2469   // If the LB policy requested a callback for trailing metadata, invoke
2470   // the callback.
2471   if (lb_subchannel_call_tracker_ != nullptr) {
2472     LbMetadata trailing_metadata(recv_trailing_metadata);
2473     BackendMetricAccessor backend_metric_accessor(this, recv_trailing_metadata);
2474     LoadBalancingPolicy::SubchannelCallTrackerInterface::FinishArgs args = {
2475         peer_address, status, &trailing_metadata, &backend_metric_accessor};
2476     lb_subchannel_call_tracker_->Finish(args);
2477     lb_subchannel_call_tracker_.reset();
2478   }
2479 }
2480 
RecordLatency()2481 void ClientChannelFilter::LoadBalancedCall::RecordLatency() {
2482   // Compute latency and report it to the tracer.
2483   if (call_attempt_tracer() != nullptr) {
2484     gpr_timespec latency =
2485         gpr_cycle_counter_sub(gpr_get_cycle_counter(), lb_call_start_time_);
2486     call_attempt_tracer()->RecordEnd(latency);
2487   }
2488 }
2489 
2490 void ClientChannelFilter::LoadBalancedCall::
RemoveCallFromLbQueuedCallsLocked()2491     RemoveCallFromLbQueuedCallsLocked() {
2492   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2493       << "chand=" << chand_ << " lb_call=" << this
2494       << ": removing from queued picks list";
2495   // Remove pollset_set linkage.
2496   grpc_polling_entity_del_from_pollset_set(pollent(),
2497                                            chand_->interested_parties_);
2498   // Note: There's no need to actually remove the call from the queue
2499   // here, because that will be done in either
2500   // LbQueuedCallCanceller::CancelLocked() or
2501   // in ClientChannelFilter::UpdateStateAndPickerLocked().
2502 }
2503 
AddCallToLbQueuedCallsLocked()2504 void ClientChannelFilter::LoadBalancedCall::AddCallToLbQueuedCallsLocked() {
2505   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2506       << "chand=" << chand_ << " lb_call=" << this
2507       << ": adding to queued picks list";
2508   // Add call's pollent to channel's interested_parties, so that I/O
2509   // can be done under the call's CQ.
2510   grpc_polling_entity_add_to_pollset_set(pollent(),
2511                                          chand_->interested_parties_);
2512   // Add to queue.
2513   chand_->lb_queued_calls_.insert(Ref());
2514   OnAddToQueueLocked();
2515 }
2516 
2517 absl::optional<absl::Status>
PickSubchannel(bool was_queued)2518 ClientChannelFilter::LoadBalancedCall::PickSubchannel(bool was_queued) {
2519   // We may accumulate multiple pickers here, because if a picker says
2520   // to queue the call, we check again to see if the picker has been
2521   // updated before we queue it.
2522   // We need to unref pickers in the WorkSerializer.
2523   std::vector<RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>> pickers;
2524   auto cleanup = absl::MakeCleanup(
2525       [work_serializer = chand_->work_serializer_, &pickers]() {
2526         if (IsWorkSerializerDispatchEnabled()) return;
2527         work_serializer->Run(
2528             [pickers = std::move(pickers)]() mutable {
2529               for (auto& picker : pickers) {
2530                 picker.reset(DEBUG_LOCATION, "PickSubchannel");
2531               }
2532             },
2533             DEBUG_LOCATION);
2534       });
2535   absl::AnyInvocable<void(RefCountedPtr<LoadBalancingPolicy::SubchannelPicker>)>
2536       set_picker;
2537   if (!IsWorkSerializerDispatchEnabled()) {
2538     set_picker =
2539         [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2540           pickers.emplace_back(std::move(picker));
2541         };
2542   } else {
2543     pickers.emplace_back();
2544     set_picker =
2545         [&](RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker) {
2546           pickers[0] = std::move(picker);
2547         };
2548   }
2549   // Grab mutex and take a ref to the picker.
2550   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2551       << "chand=" << chand_ << " lb_call=" << this
2552       << ": grabbing LB mutex to get picker";
2553   RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> picker;
2554   {
2555     MutexLock lock(&chand_->lb_mu_);
2556     set_picker(chand_->picker_);
2557   }
2558   while (true) {
2559     // TODO(roth): Fix race condition in channel_idle filter and any
2560     // other possible causes of this.
2561     if (pickers.back() == nullptr) {
2562       GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2563           << "chand=" << chand_ << " lb_call=" << this
2564           << ": picker is null, failing call";
2565       return absl::InternalError("picker is null -- shouldn't happen");
2566     }
2567     // Do pick.
2568     GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2569         << "chand=" << chand_ << " lb_call=" << this
2570         << ": performing pick with picker=" << pickers.back().get();
2571     grpc_error_handle error;
2572     bool pick_complete = PickSubchannelImpl(pickers.back().get(), &error);
2573     if (!pick_complete) {
2574       RefCountedPtr<LoadBalancingPolicy::SubchannelPicker> old_picker;
2575       MutexLock lock(&chand_->lb_mu_);
2576       // If picker has been swapped out since we grabbed it, try again.
2577       if (pickers.back() != chand_->picker_) {
2578         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2579             << "chand=" << chand_ << " lb_call=" << this
2580             << ": pick not complete, but picker changed";
2581         if (IsWorkSerializerDispatchEnabled()) {
2582           // Don't unref until after we release the mutex.
2583           old_picker = std::move(pickers.back());
2584         }
2585         set_picker(chand_->picker_);
2586         continue;
2587       }
2588       // Otherwise queue the pick to try again later when we get a new picker.
2589       AddCallToLbQueuedCallsLocked();
2590       return absl::nullopt;
2591     }
2592     // Pick is complete.
2593     // If it was queued, add a trace annotation.
2594     if (was_queued && call_attempt_tracer() != nullptr) {
2595       call_attempt_tracer()->RecordAnnotation("Delayed LB pick complete.");
2596     }
2597     // If the pick failed, fail the call.
2598     if (!error.ok()) {
2599       GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2600           << "chand=" << chand_ << " lb_call=" << this
2601           << ": failed to pick subchannel: error=" << StatusToString(error);
2602       return error;
2603     }
2604     // Pick succeeded.
2605     Commit();
2606     return absl::OkStatus();
2607   }
2608 }
2609 
PickSubchannelImpl(LoadBalancingPolicy::SubchannelPicker * picker,grpc_error_handle * error)2610 bool ClientChannelFilter::LoadBalancedCall::PickSubchannelImpl(
2611     LoadBalancingPolicy::SubchannelPicker* picker, grpc_error_handle* error) {
2612   CHECK(connected_subchannel_ == nullptr);
2613   // Perform LB pick.
2614   LoadBalancingPolicy::PickArgs pick_args;
2615   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
2616   CHECK_NE(path, nullptr);
2617   pick_args.path = path->as_string_view();
2618   LbCallState lb_call_state(this);
2619   pick_args.call_state = &lb_call_state;
2620   LbMetadata initial_metadata(send_initial_metadata());
2621   pick_args.initial_metadata = &initial_metadata;
2622   auto result = picker->Pick(pick_args);
2623   return HandlePickResult<bool>(
2624       &result,
2625       // CompletePick
2626       [this](LoadBalancingPolicy::PickResult::Complete* complete_pick) {
2627         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2628             << "chand=" << chand_ << " lb_call=" << this
2629             << ": LB pick succeeded: subchannel="
2630             << complete_pick->subchannel.get();
2631         CHECK(complete_pick->subchannel != nullptr);
2632         // Grab a ref to the connected subchannel while we're still
2633         // holding the data plane mutex.
2634         SubchannelWrapper* subchannel =
2635             static_cast<SubchannelWrapper*>(complete_pick->subchannel.get());
2636         connected_subchannel_ = subchannel->connected_subchannel();
2637         // If the subchannel has no connected subchannel (e.g., if the
2638         // subchannel has moved out of state READY but the LB policy hasn't
2639         // yet seen that change and given us a new picker), then just
2640         // queue the pick.  We'll try again as soon as we get a new picker.
2641         if (connected_subchannel_ == nullptr) {
2642           GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2643               << "chand=" << chand_ << " lb_call=" << this
2644               << ": subchannel returned by LB picker "
2645                  "has no connected subchannel; queueing pick";
2646           return false;
2647         }
2648         lb_subchannel_call_tracker_ =
2649             std::move(complete_pick->subchannel_call_tracker);
2650         if (lb_subchannel_call_tracker_ != nullptr) {
2651           lb_subchannel_call_tracker_->Start();
2652         }
2653         // Handle metadata mutations.
2654         MetadataMutationHandler::Apply(complete_pick->metadata_mutations,
2655                                        send_initial_metadata());
2656         MaybeOverrideAuthority(std::move(complete_pick->authority_override),
2657                                send_initial_metadata());
2658         return true;
2659       },
2660       // QueuePick
2661       [this](LoadBalancingPolicy::PickResult::Queue* /*queue_pick*/) {
2662         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2663             << "chand=" << chand_ << " lb_call=" << this << ": LB pick queued";
2664         return false;
2665       },
2666       // FailPick
2667       [this, &error](LoadBalancingPolicy::PickResult::Fail* fail_pick) {
2668         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2669             << "chand=" << chand_ << " lb_call=" << this
2670             << ": LB pick failed: " << fail_pick->status;
2671         // If wait_for_ready is false, then the error indicates the RPC
2672         // attempt's final status.
2673         if (!send_initial_metadata()
2674                  ->GetOrCreatePointer(WaitForReady())
2675                  ->value) {
2676           *error = absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2677               std::move(fail_pick->status), "LB pick"));
2678           return true;
2679         }
2680         // If wait_for_ready is true, then queue to retry when we get a new
2681         // picker.
2682         return false;
2683       },
2684       // DropPick
2685       [this, &error](LoadBalancingPolicy::PickResult::Drop* drop_pick) {
2686         GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2687             << "chand=" << chand_ << " lb_call=" << this
2688             << ": LB pick dropped: " << drop_pick->status;
2689         *error = grpc_error_set_int(
2690             absl_status_to_grpc_error(MaybeRewriteIllegalStatusCode(
2691                 std::move(drop_pick->status), "LB drop")),
2692             StatusIntProperty::kLbPolicyDrop, 1);
2693         return true;
2694       });
2695 }
2696 
2697 //
2698 // ClientChannelFilter::FilterBasedLoadBalancedCall
2699 //
2700 
FilterBasedLoadBalancedCall(ClientChannelFilter * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete,absl::AnyInvocable<void ()> on_commit,bool is_transparent_retry)2701 ClientChannelFilter::FilterBasedLoadBalancedCall::FilterBasedLoadBalancedCall(
2702     ClientChannelFilter* chand, const grpc_call_element_args& args,
2703     grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete,
2704     absl::AnyInvocable<void()> on_commit, bool is_transparent_retry)
2705     : LoadBalancedCall(chand, args.arena, std::move(on_commit),
2706                        is_transparent_retry),
2707       owning_call_(args.call_stack),
2708       call_combiner_(args.call_combiner),
2709       pollent_(pollent),
2710       on_call_destruction_complete_(on_call_destruction_complete) {}
2711 
2712 ClientChannelFilter::FilterBasedLoadBalancedCall::
~FilterBasedLoadBalancedCall()2713     ~FilterBasedLoadBalancedCall() {
2714   // Make sure there are no remaining pending batches.
2715   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2716     CHECK_EQ(pending_batches_[i], nullptr);
2717   }
2718   if (on_call_destruction_complete_ != nullptr) {
2719     ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2720                  absl::OkStatus());
2721   }
2722 }
2723 
Orphan()2724 void ClientChannelFilter::FilterBasedLoadBalancedCall::Orphan() {
2725   // If the recv_trailing_metadata op was never started, then notify
2726   // about call completion here, as best we can.  We assume status
2727   // CANCELLED in this case.
2728   if (recv_trailing_metadata_ == nullptr) {
2729     RecordCallCompletion(absl::CancelledError("call cancelled"), nullptr,
2730                          nullptr, "");
2731   }
2732   RecordLatency();
2733   // Delegate to parent.
2734   LoadBalancedCall::Orphan();
2735 }
2736 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2737 size_t ClientChannelFilter::FilterBasedLoadBalancedCall::GetBatchIndex(
2738     grpc_transport_stream_op_batch* batch) {
2739   // Note: It is important the send_initial_metadata be the first entry
2740   // here, since the code in PickSubchannelImpl() assumes it will be.
2741   if (batch->send_initial_metadata) return 0;
2742   if (batch->send_message) return 1;
2743   if (batch->send_trailing_metadata) return 2;
2744   if (batch->recv_initial_metadata) return 3;
2745   if (batch->recv_message) return 4;
2746   if (batch->recv_trailing_metadata) return 5;
2747   GPR_UNREACHABLE_CODE(return (size_t)-1);
2748 }
2749 
2750 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2751 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesAdd(
2752     grpc_transport_stream_op_batch* batch) {
2753   const size_t idx = GetBatchIndex(batch);
2754   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2755       << "chand=" << chand() << " lb_call=" << this
2756       << ": adding pending batch at index " << idx;
2757   CHECK_EQ(pending_batches_[idx], nullptr);
2758   pending_batches_[idx] = batch;
2759 }
2760 
2761 // This is called via the call combiner, so access to calld is synchronized.
2762 void ClientChannelFilter::FilterBasedLoadBalancedCall::
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2763     FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error) {
2764   grpc_transport_stream_op_batch* batch =
2765       static_cast<grpc_transport_stream_op_batch*>(arg);
2766   auto* self = static_cast<FilterBasedLoadBalancedCall*>(
2767       batch->handler_private.extra_arg);
2768   // Note: This will release the call combiner.
2769   grpc_transport_stream_op_batch_finish_with_failure(batch, error,
2770                                                      self->call_combiner_);
2771 }
2772 
2773 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2774 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesFail(
2775     grpc_error_handle error,
2776     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2777   CHECK(!error.ok());
2778   failure_error_ = error;
2779   if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)) {
2780     size_t num_batches = 0;
2781     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2782       if (pending_batches_[i] != nullptr) ++num_batches;
2783     }
2784     LOG(INFO) << "chand=" << chand() << " lb_call=" << this << ": failing "
2785               << num_batches << " pending batches: " << StatusToString(error);
2786   }
2787   CallCombinerClosureList closures;
2788   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2789     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2790     if (batch != nullptr) {
2791       batch->handler_private.extra_arg = this;
2792       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2793                         FailPendingBatchInCallCombiner, batch,
2794                         grpc_schedule_on_exec_ctx);
2795       closures.Add(&batch->handler_private.closure, error,
2796                    "PendingBatchesFail");
2797       batch = nullptr;
2798     }
2799   }
2800   if (yield_call_combiner_predicate(closures)) {
2801     closures.RunClosures(call_combiner_);
2802   } else {
2803     closures.RunClosuresWithoutYielding(call_combiner_);
2804   }
2805 }
2806 
2807 // This is called via the call combiner, so access to calld is synchronized.
2808 void ClientChannelFilter::FilterBasedLoadBalancedCall::
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2809     ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
2810   grpc_transport_stream_op_batch* batch =
2811       static_cast<grpc_transport_stream_op_batch*>(arg);
2812   SubchannelCall* subchannel_call =
2813       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2814   // Note: This will release the call combiner.
2815   subchannel_call->StartTransportStreamOpBatch(batch);
2816 }
2817 
2818 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2819 void ClientChannelFilter::FilterBasedLoadBalancedCall::PendingBatchesResume() {
2820   if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call)) {
2821     size_t num_batches = 0;
2822     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2823       if (pending_batches_[i] != nullptr) ++num_batches;
2824     }
2825     LOG(INFO) << "chand=" << chand() << " lb_call=" << this << ": starting "
2826               << num_batches << " pending batches on subchannel_call="
2827               << subchannel_call_.get();
2828   }
2829   CallCombinerClosureList closures;
2830   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2832     if (batch != nullptr) {
2833       batch->handler_private.extra_arg = subchannel_call_.get();
2834       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2835                         ResumePendingBatchInCallCombiner, batch,
2836                         grpc_schedule_on_exec_ctx);
2837       closures.Add(&batch->handler_private.closure, absl::OkStatus(),
2838                    "resuming pending batch from LB call");
2839       batch = nullptr;
2840     }
2841   }
2842   // Note: This will release the call combiner.
2843   closures.RunClosures(call_combiner_);
2844 }
2845 
2846 void ClientChannelFilter::FilterBasedLoadBalancedCall::
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2847     StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch) {
2848   if (GRPC_TRACE_FLAG_ENABLED(client_channel_lb_call) ||
2849       GRPC_TRACE_FLAG_ENABLED(channel)) {
2850     LOG(INFO) << "chand=" << chand() << " lb_call=" << this
2851               << ": batch started from above: "
2852               << grpc_transport_stream_op_batch_string(batch, false)
2853               << ", call_attempt_tracer()=" << call_attempt_tracer();
2854   }
2855   // Handle call tracing.
2856   if (call_attempt_tracer() != nullptr) {
2857     // Record send ops in tracer.
2858     if (batch->cancel_stream) {
2859       call_attempt_tracer()->RecordCancel(
2860           batch->payload->cancel_stream.cancel_error);
2861     }
2862     if (batch->send_initial_metadata) {
2863       call_attempt_tracer()->RecordSendInitialMetadata(
2864           batch->payload->send_initial_metadata.send_initial_metadata);
2865     }
2866     if (batch->send_trailing_metadata) {
2867       call_attempt_tracer()->RecordSendTrailingMetadata(
2868           batch->payload->send_trailing_metadata.send_trailing_metadata);
2869     }
2870     // Intercept recv ops.
2871     if (batch->recv_initial_metadata) {
2872       recv_initial_metadata_ =
2873           batch->payload->recv_initial_metadata.recv_initial_metadata;
2874       original_recv_initial_metadata_ready_ =
2875           batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2876       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
2877                         this, nullptr);
2878       batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2879           &recv_initial_metadata_ready_;
2880     }
2881   }
2882   // Intercept recv_trailing_metadata even if there is no call tracer,
2883   // since we may need to notify the LB policy about trailing metadata.
2884   if (batch->recv_trailing_metadata) {
2885     recv_trailing_metadata_ =
2886         batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2887     transport_stream_stats_ =
2888         batch->payload->recv_trailing_metadata.collect_stats;
2889     original_recv_trailing_metadata_ready_ =
2890         batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2891     GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_, RecvTrailingMetadataReady,
2892                       this, nullptr);
2893     batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2894         &recv_trailing_metadata_ready_;
2895   }
2896   // If we've already gotten a subchannel call, pass the batch down to it.
2897   // Note that once we have picked a subchannel, we do not need to acquire
2898   // the channel's data plane mutex, which is more efficient (especially for
2899   // streaming calls).
2900   if (subchannel_call_ != nullptr) {
2901     GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2902         << "chand=" << chand() << " lb_call=" << this
2903         << ": starting batch on subchannel_call=" << subchannel_call_.get();
2904     subchannel_call_->StartTransportStreamOpBatch(batch);
2905     return;
2906   }
2907   // We do not yet have a subchannel call.
2908   //
2909   // If we've previously been cancelled, immediately fail any new batches.
2910   if (GPR_UNLIKELY(!cancel_error_.ok())) {
2911     GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2912         << "chand=" << chand() << " lb_call=" << this
2913         << ": failing batch with error: " << StatusToString(cancel_error_);
2914     // Note: This will release the call combiner.
2915     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2916                                                        call_combiner_);
2917     return;
2918   }
2919   // Handle cancellation.
2920   if (GPR_UNLIKELY(batch->cancel_stream)) {
2921     // Stash a copy of cancel_error in our call data, so that we can use
2922     // it for subsequent operations.  This ensures that if the call is
2923     // cancelled before any batches are passed down (e.g., if the deadline
2924     // is in the past when the call starts), we can return the right
2925     // error to the caller when the first batch does get passed down.
2926     cancel_error_ = batch->payload->cancel_stream.cancel_error;
2927     GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2928         << "chand=" << chand() << " lb_call=" << this
2929         << ": recording cancel_error=" << StatusToString(cancel_error_).c_str();
2930     // Fail all pending batches.
2931     PendingBatchesFail(cancel_error_, NoYieldCallCombiner);
2932     // Note: This will release the call combiner.
2933     grpc_transport_stream_op_batch_finish_with_failure(batch, cancel_error_,
2934                                                        call_combiner_);
2935     return;
2936   }
2937   // Add the batch to the pending list.
2938   PendingBatchesAdd(batch);
2939   // For batches containing a send_initial_metadata op, acquire the
2940   // channel's LB mutex to pick a subchannel.
2941   if (GPR_LIKELY(batch->send_initial_metadata)) {
2942     TryPick(/*was_queued=*/false);
2943   } else {
2944     // For all other batches, release the call combiner.
2945     GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2946         << "chand=" << chand() << " lb_call=" << this
2947         << ": saved batch, yielding call combiner";
2948     GRPC_CALL_COMBINER_STOP(call_combiner_,
2949                             "batch does not include send_initial_metadata");
2950   }
2951 }
2952 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)2953 void ClientChannelFilter::FilterBasedLoadBalancedCall::RecvInitialMetadataReady(
2954     void* arg, grpc_error_handle error) {
2955   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
2956   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2957       << "chand=" << self->chand() << " lb_call=" << self
2958       << ": got recv_initial_metadata_ready: error=" << StatusToString(error);
2959   if (error.ok()) {
2960     // recv_initial_metadata_flags is not populated for clients
2961     self->call_attempt_tracer()->RecordReceivedInitialMetadata(
2962         self->recv_initial_metadata_);
2963     auto* peer_string = self->recv_initial_metadata_->get_pointer(PeerString());
2964     if (peer_string != nullptr) self->peer_string_ = peer_string->Ref();
2965   }
2966   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
2967                error);
2968 }
2969 
2970 void ClientChannelFilter::FilterBasedLoadBalancedCall::
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)2971     RecvTrailingMetadataReady(void* arg, grpc_error_handle error) {
2972   auto* self = static_cast<FilterBasedLoadBalancedCall*>(arg);
2973   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
2974       << "chand=" << self->chand() << " lb_call=" << self
2975       << ": got recv_trailing_metadata_ready: error=" << StatusToString(error)
2976       << " call_attempt_tracer()=" << self->call_attempt_tracer()
2977       << " lb_subchannel_call_tracker_=" << self->lb_subchannel_call_tracker()
2978       << " failure_error_=" << StatusToString(self->failure_error_);
2979   // Check if we have a tracer or an LB callback to invoke.
2980   if (self->call_attempt_tracer() != nullptr ||
2981       self->lb_subchannel_call_tracker() != nullptr) {
2982     // Get the call's status.
2983     absl::Status status;
2984     if (!error.ok()) {
2985       // Get status from error.
2986       grpc_status_code code;
2987       std::string message;
2988       grpc_error_get_status(
2989           error, self->arena()->GetContext<Call>()->deadline(), &code, &message,
2990           /*http_error=*/nullptr, /*error_string=*/nullptr);
2991       status = absl::Status(static_cast<absl::StatusCode>(code), message);
2992     } else {
2993       // Get status from headers.
2994       const auto& md = *self->recv_trailing_metadata_;
2995       grpc_status_code code =
2996           md.get(GrpcStatusMetadata()).value_or(GRPC_STATUS_UNKNOWN);
2997       if (code != GRPC_STATUS_OK) {
2998         absl::string_view message;
2999         if (const auto* grpc_message = md.get_pointer(GrpcMessageMetadata())) {
3000           message = grpc_message->as_string_view();
3001         }
3002         status = absl::Status(static_cast<absl::StatusCode>(code), message);
3003       }
3004     }
3005     absl::string_view peer_string;
3006     if (self->peer_string_.has_value()) {
3007       peer_string = self->peer_string_->as_string_view();
3008     }
3009     self->RecordCallCompletion(status, self->recv_trailing_metadata_,
3010                                self->transport_stream_stats_, peer_string);
3011   }
3012   // Chain to original callback.
3013   if (!self->failure_error_.ok()) {
3014     error = self->failure_error_;
3015     self->failure_error_ = absl::OkStatus();
3016   }
3017   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
3018                error);
3019 }
3020 
3021 // A class to handle the call combiner cancellation callback for a
3022 // queued pick.
3023 // TODO(roth): When we implement hedging support, we won't be able to
3024 // register a call combiner cancellation closure for each LB pick,
3025 // because there may be multiple LB picks happening in parallel.
3026 // Instead, we will probably need to maintain a list in the CallData
3027 // object of pending LB picks to be cancelled when the closure runs.
3028 class ClientChannelFilter::FilterBasedLoadBalancedCall::LbQueuedCallCanceller
3029     final {
3030  public:
LbQueuedCallCanceller(RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)3031   explicit LbQueuedCallCanceller(
3032       RefCountedPtr<FilterBasedLoadBalancedCall> lb_call)
3033       : lb_call_(std::move(lb_call)) {
3034     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
3035     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
3036     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
3037   }
3038 
3039  private:
CancelLocked(void * arg,grpc_error_handle error)3040   static void CancelLocked(void* arg, grpc_error_handle error) {
3041     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
3042     auto* lb_call = self->lb_call_.get();
3043     auto* chand = lb_call->chand();
3044     {
3045       MutexLock lock(&chand->lb_mu_);
3046       GRPC_TRACE_LOG(client_channel_lb_call, INFO)
3047           << "chand=" << chand << " lb_call=" << lb_call
3048           << ": cancelling queued pick: error=" << StatusToString(error)
3049           << " self=" << self
3050           << " calld->pick_canceller=" << lb_call->lb_call_canceller_;
3051       if (lb_call->lb_call_canceller_ == self && !error.ok()) {
3052         lb_call->Commit();
3053         // Remove pick from list of queued picks.
3054         lb_call->RemoveCallFromLbQueuedCallsLocked();
3055         // Remove from queued picks list.
3056         chand->lb_queued_calls_.erase(self->lb_call_);
3057         // Fail pending batches on the call.
3058         lb_call->PendingBatchesFail(error,
3059                                     YieldCallCombinerIfPendingBatchesFound);
3060       }
3061     }
3062     // Unref lb_call before unreffing the call stack, since unreffing
3063     // the call stack may destroy the arena in which lb_call is allocated.
3064     auto* owning_call = lb_call->owning_call_;
3065     self->lb_call_.reset();
3066     GRPC_CALL_STACK_UNREF(owning_call, "LbQueuedCallCanceller");
3067     delete self;
3068   }
3069 
3070   RefCountedPtr<FilterBasedLoadBalancedCall> lb_call_;
3071   grpc_closure closure_;
3072 };
3073 
TryPick(bool was_queued)3074 void ClientChannelFilter::FilterBasedLoadBalancedCall::TryPick(
3075     bool was_queued) {
3076   auto result = PickSubchannel(was_queued);
3077   if (result.has_value()) {
3078     if (!result->ok()) {
3079       PendingBatchesFail(*result, YieldCallCombiner);
3080       return;
3081     }
3082     CreateSubchannelCall();
3083   }
3084 }
3085 
OnAddToQueueLocked()3086 void ClientChannelFilter::FilterBasedLoadBalancedCall::OnAddToQueueLocked() {
3087   // Register call combiner cancellation callback.
3088   lb_call_canceller_ =
3089       new LbQueuedCallCanceller(RefAsSubclass<FilterBasedLoadBalancedCall>());
3090 }
3091 
RetryPickLocked()3092 void ClientChannelFilter::FilterBasedLoadBalancedCall::RetryPickLocked() {
3093   // Lame the call combiner canceller.
3094   lb_call_canceller_ = nullptr;
3095   // Do an async callback to resume call processing, so that we're not
3096   // doing it while holding the channel's LB mutex.
3097   // TODO(roth): We should really be using EventEngine::Run() here
3098   // instead of ExecCtx::Run().  Unfortunately, doing that seems to cause
3099   // a flaky TSAN failure for reasons that I do not fully understand.
3100   // However, given that we are working toward eliminating this code as
3101   // part of the promise conversion, it doesn't seem worth further
3102   // investigation right now.
3103   ExecCtx::Run(DEBUG_LOCATION, NewClosure([this](grpc_error_handle) {
3104                  // If there are a lot of queued calls here, resuming them
3105                  // all may cause us to stay inside C-core for a long period
3106                  // of time. All of that work would be done using the same
3107                  // ExecCtx instance and therefore the same cached value of
3108                  // "now". The longer it takes to finish all of this work
3109                  // and exit from C-core, the more stale the cached value of
3110                  // "now" may become. This can cause problems whereby (e.g.)
3111                  // we calculate a timer deadline based on the stale value,
3112                  // which results in the timer firing too early. To avoid
3113                  // this, we invalidate the cached value for each call we
3114                  // process.
3115                  ExecCtx::Get()->InvalidateNow();
3116                  TryPick(/*was_queued=*/true);
3117                }),
3118                absl::OkStatus());
3119 }
3120 
CreateSubchannelCall()3121 void ClientChannelFilter::FilterBasedLoadBalancedCall::CreateSubchannelCall() {
3122   Slice* path = send_initial_metadata()->get_pointer(HttpPathMetadata());
3123   CHECK_NE(path, nullptr);
3124   SubchannelCall::Args call_args = {
3125       connected_subchannel()->Ref(), pollent_, path->Ref(), /*start_time=*/0,
3126       arena()->GetContext<Call>()->deadline(),
3127       // TODO(roth): When we implement hedging support, we will probably
3128       // need to use a separate call arena for each subchannel call.
3129       arena(), call_combiner_};
3130   grpc_error_handle error;
3131   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
3132   GRPC_TRACE_LOG(client_channel_lb_call, INFO)
3133       << "chand=" << chand() << " lb_call=" << this
3134       << ": create subchannel_call=" << subchannel_call_.get()
3135       << ": error=" << StatusToString(error);
3136   if (on_call_destruction_complete_ != nullptr) {
3137     subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
3138     on_call_destruction_complete_ = nullptr;
3139   }
3140   if (GPR_UNLIKELY(!error.ok())) {
3141     PendingBatchesFail(error, YieldCallCombiner);
3142   } else {
3143     PendingBatchesResume();
3144   }
3145 }
3146 
3147 }  // namespace grpc_core
3148