1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26 
27 #include <set>
28 
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33 
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38 
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41 
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_filter.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/sync.h"
65 #include "src/core/lib/iomgr/iomgr.h"
66 #include "src/core/lib/iomgr/polling_entity.h"
67 #include "src/core/lib/iomgr/work_serializer.h"
68 #include "src/core/lib/profiling/timers.h"
69 #include "src/core/lib/slice/slice_internal.h"
70 #include "src/core/lib/slice/slice_string_helpers.h"
71 #include "src/core/lib/surface/channel.h"
72 #include "src/core/lib/transport/connectivity_state.h"
73 #include "src/core/lib/transport/error_utils.h"
74 #include "src/core/lib/transport/metadata.h"
75 #include "src/core/lib/transport/metadata_batch.h"
76 #include "src/core/lib/transport/static_metadata.h"
77 #include "src/core/lib/transport/status_metadata.h"
78 
79 //
80 // Client channel filter
81 //
82 
83 namespace grpc_core {
84 
85 using internal::ClientChannelGlobalParsedConfig;
86 using internal::ClientChannelMethodParsedConfig;
87 using internal::ClientChannelServiceConfigParser;
88 
89 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
90 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
91 
92 //
93 // ClientChannel::CallData definition
94 //
95 
96 class ClientChannel::CallData {
97  public:
98   static grpc_error_handle Init(grpc_call_element* elem,
99                                 const grpc_call_element_args* args);
100   static void Destroy(grpc_call_element* elem,
101                       const grpc_call_final_info* final_info,
102                       grpc_closure* then_schedule_closure);
103   static void StartTransportStreamOpBatch(
104       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
105   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
106 
107   // Invoked by channel for queued calls when name resolution is completed.
108   static void CheckResolution(void* arg, grpc_error_handle error);
109   // Helper function for applying the service config to a call while
110   // holding ClientChannel::resolution_mu_.
111   // Returns true if the service config has been applied to the call, in which
112   // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
113   // with the returned error.
114   bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error)
115       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
116   // Schedules a callback to continue processing the call once
117   // resolution is complete.  The callback will not run until after this
118   // method returns.
119   void AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error);
120 
121  private:
122   class ResolverQueuedCallCanceller;
123 
124   CallData(grpc_call_element* elem, const ClientChannel& chand,
125            const grpc_call_element_args& args);
126   ~CallData();
127 
128   // Returns the index into pending_batches_ to be used for batch.
129   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
130   void PendingBatchesAdd(grpc_call_element* elem,
131                          grpc_transport_stream_op_batch* batch);
132   static void FailPendingBatchInCallCombiner(void* arg,
133                                              grpc_error_handle error);
134   // A predicate type and some useful implementations for PendingBatchesFail().
135   typedef bool (*YieldCallCombinerPredicate)(
136       const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)137   static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
138     return true;
139   }
NoYieldCallCombiner(const CallCombinerClosureList &)140   static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
141     return false;
142   }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)143   static bool YieldCallCombinerIfPendingBatchesFound(
144       const CallCombinerClosureList& closures) {
145     return closures.size() > 0;
146   }
147   // Fails all pending batches.
148   // If yield_call_combiner_predicate returns true, assumes responsibility for
149   // yielding the call combiner.
150   void PendingBatchesFail(
151       grpc_call_element* elem, grpc_error_handle error,
152       YieldCallCombinerPredicate yield_call_combiner_predicate);
153   static void ResumePendingBatchInCallCombiner(void* arg,
154                                                grpc_error_handle ignored);
155   // Resumes all pending batches on lb_call_.
156   void PendingBatchesResume(grpc_call_element* elem);
157 
158   // Applies service config to the call.  Must be invoked once we know
159   // that the resolver has returned results to the channel.
160   // If an error is returned, the error indicates the status with which
161   // the call should be failed.
162   grpc_error_handle ApplyServiceConfigToCallLocked(
163       grpc_call_element* elem, grpc_metadata_batch* initial_metadata)
164       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
165   // Invoked when the resolver result is applied to the caller, on both
166   // success or failure.
167   static void ResolutionDone(void* arg, grpc_error_handle error);
168   // Removes the call (if present) from the channel's list of calls queued
169   // for name resolution.
170   void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
171       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
172   // Adds the call (if not already present) to the channel's list of
173   // calls queued for name resolution.
174   void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
175       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
176 
177   static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
178       void* arg, grpc_error_handle error);
179   void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
180       grpc_transport_stream_op_batch* batch);
181 
182   void CreateDynamicCall(grpc_call_element* elem);
183 
184   // State for handling deadlines.
185   // The code in deadline_filter.c requires this to be the first field.
186   // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
187   // and this struct both independently store pointers to the call stack
188   // and call combiner.  If/when we have time, find a way to avoid this
189   // without breaking the grpc_deadline_state abstraction.
190   grpc_deadline_state deadline_state_;
191 
192   grpc_slice path_;  // Request path.
193   gpr_cycle_counter call_start_time_;
194   grpc_millis deadline_;
195   Arena* arena_;
196   grpc_call_stack* owning_call_;
197   CallCombiner* call_combiner_;
198   grpc_call_context_element* call_context_;
199 
200   grpc_polling_entity* pollent_ = nullptr;
201 
202   grpc_closure pick_closure_;
203 
204   // Accessed while holding ClientChannel::resolution_mu_.
205   bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
206       false;
207   bool queued_pending_resolver_result_
208       ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = false;
209   ClientChannel::ResolverQueuedCall resolver_queued_call_
210       ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
211   ResolverQueuedCallCanceller* resolver_call_canceller_
212       ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
213 
214   std::function<void()> on_call_committed_;
215 
216   grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
217   grpc_closure recv_initial_metadata_ready_;
218 
219   RefCountedPtr<DynamicFilters> dynamic_filters_;
220   RefCountedPtr<DynamicFilters::Call> dynamic_call_;
221 
222   // Batches are added to this list when received from above.
223   // They are removed when we are done handling the batch (i.e., when
224   // either we have invoked all of the batch's callbacks or we have
225   // passed the batch down to the LB call and are not intercepting any of
226   // its callbacks).
227   grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
228 
229   // Set when we get a cancel_stream op.
230   grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
231 };
232 
233 //
234 // Filter vtable
235 //
236 
237 const grpc_channel_filter ClientChannel::kFilterVtable = {
238     ClientChannel::CallData::StartTransportStreamOpBatch,
239     ClientChannel::StartTransportOp,
240     sizeof(ClientChannel::CallData),
241     ClientChannel::CallData::Init,
242     ClientChannel::CallData::SetPollent,
243     ClientChannel::CallData::Destroy,
244     sizeof(ClientChannel),
245     ClientChannel::Init,
246     ClientChannel::Destroy,
247     ClientChannel::GetChannelInfo,
248     "client-channel",
249 };
250 
251 //
252 // dynamic termination filter
253 //
254 
255 namespace {
256 
257 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL.
ClientChannelArgCopy(void * p)258 void* ClientChannelArgCopy(void* p) { return p; }
ClientChannelArgDestroy(void *)259 void ClientChannelArgDestroy(void* /*p*/) {}
ClientChannelArgCmp(void * p,void * q)260 int ClientChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
261 const grpc_arg_pointer_vtable kClientChannelArgPointerVtable = {
262     ClientChannelArgCopy, ClientChannelArgDestroy, ClientChannelArgCmp};
263 
264 // Channel arg pointer vtable for GRPC_ARG_SERVICE_CONFIG_OBJ.
ServiceConfigObjArgCopy(void * p)265 void* ServiceConfigObjArgCopy(void* p) {
266   auto* service_config = static_cast<ServiceConfig*>(p);
267   service_config->Ref().release();
268   return p;
269 }
ServiceConfigObjArgDestroy(void * p)270 void ServiceConfigObjArgDestroy(void* p) {
271   auto* service_config = static_cast<ServiceConfig*>(p);
272   service_config->Unref();
273 }
ServiceConfigObjArgCmp(void * p,void * q)274 int ServiceConfigObjArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
275 const grpc_arg_pointer_vtable kServiceConfigObjArgPointerVtable = {
276     ServiceConfigObjArgCopy, ServiceConfigObjArgDestroy,
277     ServiceConfigObjArgCmp};
278 
279 class DynamicTerminationFilter {
280  public:
281   class CallData;
282 
283   static const grpc_channel_filter kFilterVtable;
284 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)285   static grpc_error_handle Init(grpc_channel_element* elem,
286                                 grpc_channel_element_args* args) {
287     GPR_ASSERT(args->is_last);
288     GPR_ASSERT(elem->filter == &kFilterVtable);
289     new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
290     return GRPC_ERROR_NONE;
291   }
292 
Destroy(grpc_channel_element * elem)293   static void Destroy(grpc_channel_element* elem) {
294     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
295     chand->~DynamicTerminationFilter();
296   }
297 
298   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)299   static void StartTransportOp(grpc_channel_element* /*elem*/,
300                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)301   static void GetChannelInfo(grpc_channel_element* /*elem*/,
302                              const grpc_channel_info* /*info*/) {}
303 
304  private:
DynamicTerminationFilter(const grpc_channel_args * args)305   explicit DynamicTerminationFilter(const grpc_channel_args* args)
306       : chand_(grpc_channel_args_find_pointer<ClientChannel>(
307             args, GRPC_ARG_CLIENT_CHANNEL)) {}
308 
309   ClientChannel* chand_;
310 };
311 
312 class DynamicTerminationFilter::CallData {
313  public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)314   static grpc_error_handle Init(grpc_call_element* elem,
315                                 const grpc_call_element_args* args) {
316     new (elem->call_data) CallData(*args);
317     return GRPC_ERROR_NONE;
318   }
319 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)320   static void Destroy(grpc_call_element* elem,
321                       const grpc_call_final_info* /*final_info*/,
322                       grpc_closure* then_schedule_closure) {
323     auto* calld = static_cast<CallData*>(elem->call_data);
324     RefCountedPtr<SubchannelCall> subchannel_call;
325     if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
326       subchannel_call = calld->lb_call_->subchannel_call();
327     }
328     calld->~CallData();
329     if (GPR_LIKELY(subchannel_call != nullptr)) {
330       subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
331     } else {
332       // TODO(yashkt) : This can potentially be a Closure::Run
333       ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
334     }
335   }
336 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)337   static void StartTransportStreamOpBatch(
338       grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
339     auto* calld = static_cast<CallData*>(elem->call_data);
340     calld->lb_call_->StartTransportStreamOpBatch(batch);
341   }
342 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)343   static void SetPollent(grpc_call_element* elem,
344                          grpc_polling_entity* pollent) {
345     auto* calld = static_cast<CallData*>(elem->call_data);
346     auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
347     ClientChannel* client_channel = chand->chand_;
348     grpc_call_element_args args = {
349         calld->owning_call_,     nullptr,
350         calld->call_context_,    calld->path_,
351         calld->call_start_time_, calld->deadline_,
352         calld->arena_,           calld->call_combiner_};
353     calld->lb_call_ =
354         client_channel->CreateLoadBalancedCall(args, pollent, nullptr);
355     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
356       gpr_log(GPR_INFO,
357               "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
358               client_channel, calld->lb_call_.get());
359     }
360   }
361 
362  private:
CallData(const grpc_call_element_args & args)363   explicit CallData(const grpc_call_element_args& args)
364       : path_(grpc_slice_ref_internal(args.path)),
365         call_start_time_(args.start_time),
366         deadline_(args.deadline),
367         arena_(args.arena),
368         owning_call_(args.call_stack),
369         call_combiner_(args.call_combiner),
370         call_context_(args.context) {}
371 
~CallData()372   ~CallData() { grpc_slice_unref_internal(path_); }
373 
374   grpc_slice path_;  // Request path.
375   gpr_cycle_counter call_start_time_;
376   grpc_millis deadline_;
377   Arena* arena_;
378   grpc_call_stack* owning_call_;
379   CallCombiner* call_combiner_;
380   grpc_call_context_element* call_context_;
381 
382   RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call_;
383 };
384 
385 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
386     DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
387     DynamicTerminationFilter::StartTransportOp,
388     sizeof(DynamicTerminationFilter::CallData),
389     DynamicTerminationFilter::CallData::Init,
390     DynamicTerminationFilter::CallData::SetPollent,
391     DynamicTerminationFilter::CallData::Destroy,
392     sizeof(DynamicTerminationFilter),
393     DynamicTerminationFilter::Init,
394     DynamicTerminationFilter::Destroy,
395     DynamicTerminationFilter::GetChannelInfo,
396     "dynamic_filter_termination",
397 };
398 
399 }  // namespace
400 
401 //
402 // ClientChannel::ResolverResultHandler
403 //
404 
405 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
406  public:
ResolverResultHandler(ClientChannel * chand)407   explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) {
408     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
409   }
410 
~ResolverResultHandler()411   ~ResolverResultHandler() override {
412     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
413       gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
414     }
415     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
416   }
417 
ReturnResult(Resolver::Result result)418   void ReturnResult(Resolver::Result result) override
419       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
420     chand_->OnResolverResultChangedLocked(std::move(result));
421   }
422 
ReturnError(grpc_error_handle error)423   void ReturnError(grpc_error_handle error) override
424       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
425     chand_->OnResolverErrorLocked(error);
426   }
427 
428  private:
429   ClientChannel* chand_;
430 };
431 
432 //
433 // ClientChannel::SubchannelWrapper
434 //
435 
436 // This class is a wrapper for Subchannel that hides details of the
437 // channel's implementation (such as the health check service name and
438 // connected subchannel) from the LB policy API.
439 //
440 // Note that no synchronization is needed here, because even if the
441 // underlying subchannel is shared between channels, this wrapper will only
442 // be used within one channel, so it will always be synchronized by the
443 // control plane work_serializer.
444 class ClientChannel::SubchannelWrapper : public SubchannelInterface {
445  public:
SubchannelWrapper(ClientChannel * chand,RefCountedPtr<Subchannel> subchannel,absl::optional<std::string> health_check_service_name)446   SubchannelWrapper(ClientChannel* chand, RefCountedPtr<Subchannel> subchannel,
447                     absl::optional<std::string> health_check_service_name)
448       : SubchannelInterface(
449             GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
450                 ? "SubchannelWrapper"
451                 : nullptr),
452         chand_(chand),
453         subchannel_(std::move(subchannel)),
454         health_check_service_name_(std::move(health_check_service_name)) {
455     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
456       gpr_log(GPR_INFO,
457               "chand=%p: creating subchannel wrapper %p for subchannel %p",
458               chand, this, subchannel_.get());
459     }
460     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
461     auto* subchannel_node = subchannel_->channelz_node();
462     if (subchannel_node != nullptr) {
463       auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
464       if (it == chand_->subchannel_refcount_map_.end()) {
465         chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
466         it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
467                  .first;
468       }
469       ++it->second;
470     }
471     chand_->subchannel_wrappers_.insert(this);
472   }
473 
~SubchannelWrapper()474   ~SubchannelWrapper() override {
475     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
476       gpr_log(GPR_INFO,
477               "chand=%p: destroying subchannel wrapper %p for subchannel %p",
478               chand_, this, subchannel_.get());
479     }
480     chand_->subchannel_wrappers_.erase(this);
481     auto* subchannel_node = subchannel_->channelz_node();
482     if (subchannel_node != nullptr) {
483       auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
484       GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
485       --it->second;
486       if (it->second == 0) {
487         chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
488         chand_->subchannel_refcount_map_.erase(it);
489       }
490     }
491     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
492   }
493 
CheckConnectivityState()494   grpc_connectivity_state CheckConnectivityState() override
495       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
496     RefCountedPtr<ConnectedSubchannel> connected_subchannel;
497     grpc_connectivity_state connectivity_state =
498         subchannel_->CheckConnectivityState(health_check_service_name_,
499                                             &connected_subchannel);
500     MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
501     return connectivity_state;
502   }
503 
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)504   void WatchConnectivityState(
505       grpc_connectivity_state initial_state,
506       std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
507     auto& watcher_wrapper = watcher_map_[watcher.get()];
508     GPR_ASSERT(watcher_wrapper == nullptr);
509     watcher_wrapper = new WatcherWrapper(std::move(watcher),
510                                          Ref(DEBUG_LOCATION, "WatcherWrapper"),
511                                          initial_state);
512     subchannel_->WatchConnectivityState(
513         initial_state, health_check_service_name_,
514         RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
515             watcher_wrapper));
516   }
517 
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)518   void CancelConnectivityStateWatch(
519       ConnectivityStateWatcherInterface* watcher) override {
520     auto it = watcher_map_.find(watcher);
521     GPR_ASSERT(it != watcher_map_.end());
522     subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
523                                               it->second);
524     watcher_map_.erase(it);
525   }
526 
AttemptToConnect()527   void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
528 
ResetBackoff()529   void ResetBackoff() override { subchannel_->ResetBackoff(); }
530 
channel_args()531   const grpc_channel_args* channel_args() override {
532     return subchannel_->channel_args();
533   }
534 
ThrottleKeepaliveTime(int new_keepalive_time)535   void ThrottleKeepaliveTime(int new_keepalive_time) {
536     subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
537   }
538 
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)539   void UpdateHealthCheckServiceName(
540       absl::optional<std::string> health_check_service_name) {
541     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
542       gpr_log(GPR_INFO,
543               "chand=%p: subchannel wrapper %p: updating health check service "
544               "name from \"%s\" to \"%s\"",
545               chand_, this, health_check_service_name_->c_str(),
546               health_check_service_name->c_str());
547     }
548     for (auto& p : watcher_map_) {
549       WatcherWrapper*& watcher_wrapper = p.second;
550       // Cancel the current watcher and create a new one using the new
551       // health check service name.
552       // TODO(roth): If there is not already an existing health watch
553       // call for the new name, then the watcher will initially report
554       // state CONNECTING.  If the LB policy is currently reporting
555       // state READY, this may cause it to switch to CONNECTING before
556       // switching back to READY.  This could cause a small delay for
557       // RPCs being started on the channel.  If/when this becomes a
558       // problem, we may be able to handle it by waiting for the new
559       // watcher to report READY before we use it to replace the old one.
560       WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
561       subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
562                                                 watcher_wrapper);
563       watcher_wrapper = replacement;
564       subchannel_->WatchConnectivityState(
565           replacement->last_seen_state(), health_check_service_name,
566           RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
567               replacement));
568     }
569     // Save the new health check service name.
570     health_check_service_name_ = std::move(health_check_service_name);
571   }
572 
573   // Caller must be holding the control-plane work_serializer.
connected_subchannel() const574   ConnectedSubchannel* connected_subchannel() const
575       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
576     return connected_subchannel_.get();
577   }
578 
579   // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const580   ConnectedSubchannel* connected_subchannel_in_data_plane() const
581       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
582     return connected_subchannel_in_data_plane_.get();
583   }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)584   void set_connected_subchannel_in_data_plane(
585       RefCountedPtr<ConnectedSubchannel> connected_subchannel)
586       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
587     connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
588   }
589 
590  private:
591   // Subchannel and SubchannelInterface have different interfaces for
592   // their respective ConnectivityStateWatcherInterface classes.
593   // The one in Subchannel updates the ConnectedSubchannel along with
594   // the state, whereas the one in SubchannelInterface does not expose
595   // the ConnectedSubchannel.
596   //
597   // This wrapper provides a bridge between the two.  It implements
598   // Subchannel::ConnectivityStateWatcherInterface and wraps
599   // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
600   // that was passed in by the LB policy.  We pass an instance of this
601   // class to the underlying Subchannel, and when we get updates from
602   // the subchannel, we pass those on to the wrapped watcher to return
603   // the update to the LB policy.  This allows us to set the connected
604   // subchannel before passing the result back to the LB policy.
605   class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
606    public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)607     WatcherWrapper(
608         std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
609             watcher,
610         RefCountedPtr<SubchannelWrapper> parent,
611         grpc_connectivity_state initial_state)
612         : watcher_(std::move(watcher)),
613           parent_(std::move(parent)),
614           last_seen_state_(initial_state) {}
615 
~WatcherWrapper()616     ~WatcherWrapper() override {
617       auto* parent = parent_.release();  // ref owned by lambda
618       parent->chand_->work_serializer_->Run(
619           [parent]()
620               ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
621                 parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
622               },
623           DEBUG_LOCATION);
624     }
625 
OnConnectivityStateChange()626     void OnConnectivityStateChange() override {
627       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
628         gpr_log(GPR_INFO,
629                 "chand=%p: connectivity change for subchannel wrapper %p "
630                 "subchannel %p; hopping into work_serializer",
631                 parent_->chand_, parent_.get(), parent_->subchannel_.get());
632       }
633       Ref().release();  // ref owned by lambda
634       parent_->chand_->work_serializer_->Run(
635           [this]()
636               ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
637                 ApplyUpdateInControlPlaneWorkSerializer();
638                 Unref();
639               },
640           DEBUG_LOCATION);
641     }
642 
interested_parties()643     grpc_pollset_set* interested_parties() override {
644       SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
645           watcher_.get();
646       if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
647       return watcher->interested_parties();
648     }
649 
MakeReplacement()650     WatcherWrapper* MakeReplacement() {
651       auto* replacement =
652           new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
653       replacement_ = replacement;
654       return replacement;
655     }
656 
last_seen_state() const657     grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
658 
659    private:
ApplyUpdateInControlPlaneWorkSerializer()660     void ApplyUpdateInControlPlaneWorkSerializer()
661         ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
662       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
663         gpr_log(GPR_INFO,
664                 "chand=%p: processing connectivity change in work serializer "
665                 "for subchannel wrapper %p subchannel %p "
666                 "watcher=%p",
667                 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
668                 watcher_.get());
669       }
670       ConnectivityStateChange state_change = PopConnectivityStateChange();
671       absl::optional<absl::Cord> keepalive_throttling =
672           state_change.status.GetPayload(kKeepaliveThrottlingKey);
673       if (keepalive_throttling.has_value()) {
674         int new_keepalive_time = -1;
675         if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
676                              &new_keepalive_time)) {
677           if (new_keepalive_time > parent_->chand_->keepalive_time_) {
678             parent_->chand_->keepalive_time_ = new_keepalive_time;
679             if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
680               gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
681                       parent_->chand_, parent_->chand_->keepalive_time_);
682             }
683             // Propagate the new keepalive time to all subchannels. This is so
684             // that new transports created by any subchannel (and not just the
685             // subchannel that received the GOAWAY), use the new keepalive time.
686             for (auto* subchannel_wrapper :
687                  parent_->chand_->subchannel_wrappers_) {
688               subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
689             }
690           }
691         } else {
692           gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
693                   parent_->chand_,
694                   std::string(keepalive_throttling.value()).c_str());
695         }
696       }
697       // Ignore update if the parent WatcherWrapper has been replaced
698       // since this callback was scheduled.
699       if (watcher_ != nullptr) {
700         last_seen_state_ = state_change.state;
701         parent_->MaybeUpdateConnectedSubchannel(
702             std::move(state_change.connected_subchannel));
703         watcher_->OnConnectivityStateChange(state_change.state);
704       }
705     }
706 
707     std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
708         watcher_;
709     RefCountedPtr<SubchannelWrapper> parent_;
710     grpc_connectivity_state last_seen_state_;
711     WatcherWrapper* replacement_ = nullptr;
712   };
713 
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)714   void MaybeUpdateConnectedSubchannel(
715       RefCountedPtr<ConnectedSubchannel> connected_subchannel)
716       ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
717     // Update the connected subchannel only if the channel is not shutting
718     // down.  This is because once the channel is shutting down, we
719     // ignore picker updates from the LB policy, which means that
720     // UpdateStateAndPickerLocked() will never process the entries
721     // in chand_->pending_subchannel_updates_.  So we don't want to add
722     // entries there that will never be processed, since that would
723     // leave dangling refs to the channel and prevent its destruction.
724     grpc_error_handle disconnect_error = chand_->disconnect_error();
725     if (disconnect_error != GRPC_ERROR_NONE) return;
726     // Not shutting down, so do the update.
727     if (connected_subchannel_ != connected_subchannel) {
728       connected_subchannel_ = std::move(connected_subchannel);
729       // Record the new connected subchannel so that it can be updated
730       // in the data plane mutex the next time the picker is updated.
731       chand_->pending_subchannel_updates_[Ref(
732           DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
733     }
734   }
735 
736   ClientChannel* chand_;
737   RefCountedPtr<Subchannel> subchannel_;
738   absl::optional<std::string> health_check_service_name_;
739   // Maps from the address of the watcher passed to us by the LB policy
740   // to the address of the WrapperWatcher that we passed to the underlying
741   // subchannel.  This is needed so that when the LB policy calls
742   // CancelConnectivityStateWatch() with its watcher, we know the
743   // corresponding WrapperWatcher to cancel on the underlying subchannel.
744   std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
745   // To be accessed only in the control plane work_serializer.
746   RefCountedPtr<ConnectedSubchannel> connected_subchannel_
747       ABSL_GUARDED_BY(&ClientChannel::work_serializer_);
748   // To be accessed only in the data plane mutex.
749   RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_
750       ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
751 };
752 
753 //
754 // ClientChannel::ExternalConnectivityWatcher
755 //
756 
ExternalConnectivityWatcher(ClientChannel * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)757 ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
758     ClientChannel* chand, grpc_polling_entity pollent,
759     grpc_connectivity_state* state, grpc_closure* on_complete,
760     grpc_closure* watcher_timer_init)
761     : chand_(chand),
762       pollent_(pollent),
763       initial_state_(*state),
764       state_(state),
765       on_complete_(on_complete),
766       watcher_timer_init_(watcher_timer_init) {
767   grpc_polling_entity_add_to_pollset_set(&pollent_,
768                                          chand_->interested_parties_);
769   GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
770   {
771     MutexLock lock(&chand_->external_watchers_mu_);
772     // Will be deleted when the watch is complete.
773     GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
774     // Store a ref to the watcher in the external_watchers_ map.
775     chand->external_watchers_[on_complete] =
776         Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
777   }
778   // Pass the ref from creating the object to Start().
779   chand_->work_serializer_->Run(
780       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
781         // The ref is passed to AddWatcherLocked().
782         AddWatcherLocked();
783       },
784       DEBUG_LOCATION);
785 }
786 
~ExternalConnectivityWatcher()787 ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
788   grpc_polling_entity_del_from_pollset_set(&pollent_,
789                                            chand_->interested_parties_);
790   GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
791                            "ExternalConnectivityWatcher");
792 }
793 
794 void ClientChannel::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannel * chand,grpc_closure * on_complete,bool cancel)795     RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
796                                          grpc_closure* on_complete,
797                                          bool cancel) {
798   RefCountedPtr<ExternalConnectivityWatcher> watcher;
799   {
800     MutexLock lock(&chand->external_watchers_mu_);
801     auto it = chand->external_watchers_.find(on_complete);
802     if (it != chand->external_watchers_.end()) {
803       watcher = std::move(it->second);
804       chand->external_watchers_.erase(it);
805     }
806   }
807   // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
808   // the mutex before calling it.
809   if (watcher != nullptr && cancel) watcher->Cancel();
810 }
811 
Notify(grpc_connectivity_state state,const absl::Status &)812 void ClientChannel::ExternalConnectivityWatcher::Notify(
813     grpc_connectivity_state state, const absl::Status& /* status */) {
814   bool done = false;
815   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
816                                    MemoryOrder::RELAXED)) {
817     return;  // Already done.
818   }
819   // Remove external watcher.
820   ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
821       chand_, on_complete_, /*cancel=*/false);
822   // Report new state to the user.
823   *state_ = state;
824   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
825   // Hop back into the work_serializer to clean up.
826   // Not needed in state SHUTDOWN, because the tracker will
827   // automatically remove all watchers in that case.
828   if (state != GRPC_CHANNEL_SHUTDOWN) {
829     chand_->work_serializer_->Run(
830         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
831           RemoveWatcherLocked();
832         },
833         DEBUG_LOCATION);
834   }
835 }
836 
Cancel()837 void ClientChannel::ExternalConnectivityWatcher::Cancel() {
838   bool done = false;
839   if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
840                                    MemoryOrder::RELAXED)) {
841     return;  // Already done.
842   }
843   ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
844   // Hop back into the work_serializer to clean up.
845   chand_->work_serializer_->Run(
846       [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
847         RemoveWatcherLocked();
848       },
849       DEBUG_LOCATION);
850 }
851 
AddWatcherLocked()852 void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() {
853   Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
854   // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
855   chand_->state_tracker_.AddWatcher(
856       initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
857 }
858 
RemoveWatcherLocked()859 void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() {
860   chand_->state_tracker_.RemoveWatcher(this);
861 }
862 
863 //
864 // ClientChannel::ConnectivityWatcherAdder
865 //
866 
867 class ClientChannel::ConnectivityWatcherAdder {
868  public:
ConnectivityWatcherAdder(ClientChannel * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)869   ConnectivityWatcherAdder(
870       ClientChannel* chand, grpc_connectivity_state initial_state,
871       OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
872       : chand_(chand),
873         initial_state_(initial_state),
874         watcher_(std::move(watcher)) {
875     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
876     chand_->work_serializer_->Run(
877         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
878           AddWatcherLocked();
879         },
880         DEBUG_LOCATION);
881   }
882 
883  private:
AddWatcherLocked()884   void AddWatcherLocked()
885       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
886     chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
887     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
888     delete this;
889   }
890 
891   ClientChannel* chand_;
892   grpc_connectivity_state initial_state_;
893   OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
894 };
895 
896 //
897 // ClientChannel::ConnectivityWatcherRemover
898 //
899 
900 class ClientChannel::ConnectivityWatcherRemover {
901  public:
ConnectivityWatcherRemover(ClientChannel * chand,AsyncConnectivityStateWatcherInterface * watcher)902   ConnectivityWatcherRemover(ClientChannel* chand,
903                              AsyncConnectivityStateWatcherInterface* watcher)
904       : chand_(chand), watcher_(watcher) {
905     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
906     chand_->work_serializer_->Run(
907         [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
908           RemoveWatcherLocked();
909         },
910         DEBUG_LOCATION);
911   }
912 
913  private:
RemoveWatcherLocked()914   void RemoveWatcherLocked()
915       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
916     chand_->state_tracker_.RemoveWatcher(watcher_);
917     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
918                              "ConnectivityWatcherRemover");
919     delete this;
920   }
921 
922   ClientChannel* chand_;
923   AsyncConnectivityStateWatcherInterface* watcher_;
924 };
925 
926 //
927 // ClientChannel::ClientChannelControlHelper
928 //
929 
930 class ClientChannel::ClientChannelControlHelper
931     : public LoadBalancingPolicy::ChannelControlHelper {
932  public:
ClientChannelControlHelper(ClientChannel * chand)933   explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) {
934     GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
935   }
936 
~ClientChannelControlHelper()937   ~ClientChannelControlHelper() override {
938     GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
939                              "ClientChannelControlHelper");
940   }
941 
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)942   RefCountedPtr<SubchannelInterface> CreateSubchannel(
943       ServerAddress address, const grpc_channel_args& args) override
944       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
945     if (chand_->resolver_ == nullptr) return nullptr;  // Shutting down.
946     // Determine health check service name.
947     bool inhibit_health_checking = grpc_channel_args_find_bool(
948         &args, GRPC_ARG_INHIBIT_HEALTH_CHECKING, false);
949     absl::optional<std::string> health_check_service_name;
950     if (!inhibit_health_checking) {
951       health_check_service_name = chand_->health_check_service_name_;
952     }
953     // Remove channel args that should not affect subchannel uniqueness.
954     static const char* args_to_remove[] = {
955         GRPC_ARG_INHIBIT_HEALTH_CHECKING,
956         GRPC_ARG_CHANNELZ_CHANNEL_NODE,
957     };
958     // Add channel args needed for the subchannel.
959     absl::InlinedVector<grpc_arg, 3> args_to_add = {
960         Subchannel::CreateSubchannelAddressArg(&address.address()),
961         SubchannelPoolInterface::CreateChannelArg(
962             chand_->subchannel_pool_.get()),
963     };
964     if (address.args() != nullptr) {
965       for (size_t j = 0; j < address.args()->num_args; ++j) {
966         args_to_add.emplace_back(address.args()->args[j]);
967       }
968     }
969     grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
970         &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
971         args_to_add.data(), args_to_add.size());
972     gpr_free(args_to_add[0].value.string);
973     // Create subchannel.
974     RefCountedPtr<Subchannel> subchannel =
975         chand_->client_channel_factory_->CreateSubchannel(new_args);
976     grpc_channel_args_destroy(new_args);
977     if (subchannel == nullptr) return nullptr;
978     // Make sure the subchannel has updated keepalive time.
979     subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
980     // Create and return wrapper for the subchannel.
981     return MakeRefCounted<SubchannelWrapper>(
982         chand_, std::move(subchannel), std::move(health_check_service_name));
983   }
984 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)985   void UpdateState(
986       grpc_connectivity_state state, const absl::Status& status,
987       std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override
988       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
989     if (chand_->resolver_ == nullptr) return;  // Shutting down.
990     grpc_error_handle disconnect_error = chand_->disconnect_error();
991     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
992       const char* extra = disconnect_error == GRPC_ERROR_NONE
993                               ? ""
994                               : " (ignoring -- channel shutting down)";
995       gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
996               chand_, ConnectivityStateName(state), status.ToString().c_str(),
997               picker.get(), extra);
998     }
999     // Do update only if not shutting down.
1000     if (disconnect_error == GRPC_ERROR_NONE) {
1001       chand_->UpdateStateAndPickerLocked(state, status, "helper",
1002                                          std::move(picker));
1003     }
1004   }
1005 
RequestReresolution()1006   void RequestReresolution() override
1007       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
1008     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1009     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1010       gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1011     }
1012     chand_->resolver_->RequestReresolutionLocked();
1013   }
1014 
AddTraceEvent(TraceSeverity severity,absl::string_view message)1015   void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1016       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
1017     if (chand_->resolver_ == nullptr) return;  // Shutting down.
1018     if (chand_->channelz_node_ != nullptr) {
1019       chand_->channelz_node_->AddTraceEvent(
1020           ConvertSeverityEnum(severity),
1021           grpc_slice_from_copied_buffer(message.data(), message.size()));
1022     }
1023   }
1024 
1025  private:
ConvertSeverityEnum(TraceSeverity severity)1026   static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1027       TraceSeverity severity) {
1028     if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1029     if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1030     return channelz::ChannelTrace::Error;
1031   }
1032 
1033   ClientChannel* chand_;
1034 };
1035 
1036 //
1037 // ClientChannel implementation
1038 //
1039 
GetFromChannel(grpc_channel * channel)1040 ClientChannel* ClientChannel::GetFromChannel(grpc_channel* channel) {
1041   grpc_channel_element* elem =
1042       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
1043   if (elem->filter != &kFilterVtable) return nullptr;
1044   return static_cast<ClientChannel*>(elem->channel_data);
1045 }
1046 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1047 grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
1048                                       grpc_channel_element_args* args) {
1049   GPR_ASSERT(args->is_last);
1050   GPR_ASSERT(elem->filter == &kFilterVtable);
1051   grpc_error_handle error = GRPC_ERROR_NONE;
1052   new (elem->channel_data) ClientChannel(args, &error);
1053   return error;
1054 }
1055 
Destroy(grpc_channel_element * elem)1056 void ClientChannel::Destroy(grpc_channel_element* elem) {
1057   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1058   chand->~ClientChannel();
1059 }
1060 
1061 namespace {
1062 
GetEnableRetries(const grpc_channel_args * args)1063 bool GetEnableRetries(const grpc_channel_args* args) {
1064   return grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, false);
1065 }
1066 
GetSubchannelPool(const grpc_channel_args * args)1067 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1068     const grpc_channel_args* args) {
1069   const bool use_local_subchannel_pool = grpc_channel_args_find_bool(
1070       args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, false);
1071   if (use_local_subchannel_pool) {
1072     return MakeRefCounted<LocalSubchannelPool>();
1073   }
1074   return GlobalSubchannelPool::instance();
1075 }
1076 
GetChannelzNode(const grpc_channel_args * args)1077 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1078   return grpc_channel_args_find_pointer<channelz::ChannelNode>(
1079       args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1080 }
1081 
1082 }  // namespace
1083 
ClientChannel(grpc_channel_element_args * args,grpc_error_handle * error)1084 ClientChannel::ClientChannel(grpc_channel_element_args* args,
1085                              grpc_error_handle* error)
1086     : deadline_checking_enabled_(
1087           grpc_deadline_checking_enabled(args->channel_args)),
1088       enable_retries_(GetEnableRetries(args->channel_args)),
1089       owning_stack_(args->channel_stack),
1090       client_channel_factory_(
1091           ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1092       channelz_node_(GetChannelzNode(args->channel_args)),
1093       interested_parties_(grpc_pollset_set_create()),
1094       work_serializer_(std::make_shared<WorkSerializer>()),
1095       state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1096       subchannel_pool_(GetSubchannelPool(args->channel_args)),
1097       disconnect_error_(GRPC_ERROR_NONE) {
1098   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1099     gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1100             this, owning_stack_);
1101   }
1102   // Start backup polling.
1103   grpc_client_channel_start_backup_polling(interested_parties_);
1104   // Check client channel factory.
1105   if (client_channel_factory_ == nullptr) {
1106     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1107         "Missing client channel factory in args for client channel filter");
1108     return;
1109   }
1110   // Get server name to resolve, using proxy mapper if needed.
1111   const char* server_uri =
1112       grpc_channel_args_find_string(args->channel_args, GRPC_ARG_SERVER_URI);
1113   if (server_uri == nullptr) {
1114     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1115         "server URI channel arg missing or wrong type in client channel "
1116         "filter");
1117     return;
1118   }
1119   // Get default service config.  If none is specified via the client API,
1120   // we use an empty config.
1121   const char* service_config_json = grpc_channel_args_find_string(
1122       args->channel_args, GRPC_ARG_SERVICE_CONFIG);
1123   if (service_config_json == nullptr) service_config_json = "{}";
1124   *error = GRPC_ERROR_NONE;
1125   default_service_config_ =
1126       ServiceConfig::Create(args->channel_args, service_config_json, error);
1127   if (*error != GRPC_ERROR_NONE) {
1128     default_service_config_.reset();
1129     return;
1130   }
1131   absl::StatusOr<URI> uri = URI::Parse(server_uri);
1132   if (uri.ok() && !uri->path().empty()) {
1133     server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1134   }
1135   char* proxy_name = nullptr;
1136   grpc_channel_args* new_args = nullptr;
1137   ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1138                                &new_args);
1139   target_uri_.reset(proxy_name != nullptr ? proxy_name
1140                                           : gpr_strdup(server_uri));
1141   // Strip out service config channel arg, so that it doesn't affect
1142   // subchannel uniqueness when the args flow down to that layer.
1143   const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1144   channel_args_ = grpc_channel_args_copy_and_remove(
1145       new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1146   grpc_channel_args_destroy(new_args);
1147   keepalive_time_ = grpc_channel_args_find_integer(
1148       channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1149       {-1 /* default value, unset */, 1, INT_MAX});
1150   if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1151     std::string error_message =
1152         absl::StrCat("the target uri is not valid: ", target_uri_.get());
1153     *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1154     return;
1155   }
1156   *error = GRPC_ERROR_NONE;
1157 }
1158 
~ClientChannel()1159 ClientChannel::~ClientChannel() {
1160   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1161     gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1162   }
1163   DestroyResolverAndLbPolicyLocked();
1164   grpc_channel_args_destroy(channel_args_);
1165   GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1166   // Stop backup polling.
1167   grpc_client_channel_stop_backup_polling(interested_parties_);
1168   grpc_pollset_set_destroy(interested_parties_);
1169   GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1170 }
1171 
1172 RefCountedPtr<ClientChannel::LoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete)1173 ClientChannel::CreateLoadBalancedCall(
1174     const grpc_call_element_args& args, grpc_polling_entity* pollent,
1175     grpc_closure* on_call_destruction_complete) {
1176   return args.arena->New<LoadBalancedCall>(this, args, pollent,
1177                                            on_call_destruction_complete);
1178 }
1179 
1180 namespace {
1181 
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1182 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1183     const Resolver::Result& resolver_result,
1184     const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1185   // Prefer the LB policy config found in the service config.
1186   if (parsed_service_config->parsed_lb_config() != nullptr) {
1187     return parsed_service_config->parsed_lb_config();
1188   }
1189   // Try the deprecated LB policy name from the service config.
1190   // If not, try the setting from channel args.
1191   const char* policy_name = nullptr;
1192   if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1193     policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1194   } else {
1195     policy_name = grpc_channel_args_find_string(resolver_result.args,
1196                                                 GRPC_ARG_LB_POLICY_NAME);
1197   }
1198   // Use pick_first if nothing was specified and we didn't select grpclb
1199   // above.
1200   if (policy_name == nullptr) policy_name = "pick_first";
1201   // Now that we have the policy name, construct an empty config for it.
1202   Json config_json = Json::Array{Json::Object{
1203       {policy_name, Json::Object{}},
1204   }};
1205   grpc_error_handle parse_error = GRPC_ERROR_NONE;
1206   auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1207       config_json, &parse_error);
1208   // The policy name came from one of three places:
1209   // - The deprecated loadBalancingPolicy field in the service config,
1210   //   in which case the code in ClientChannelServiceConfigParser
1211   //   already verified that the policy does not require a config.
1212   // - One of the hard-coded values here, all of which are known to not
1213   //   require a config.
1214   // - A channel arg, in which case the application did something that
1215   //   is a misuse of our API.
1216   // In the first two cases, these assertions will always be true.  In
1217   // the last case, this is probably fine for now.
1218   // TODO(roth): If the last case becomes a problem, add better error
1219   // handling here.
1220   GPR_ASSERT(lb_policy_config != nullptr);
1221   GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
1222   return lb_policy_config;
1223 }
1224 
1225 }  // namespace
1226 
OnResolverResultChangedLocked(Resolver::Result result)1227 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1228   // Handle race conditions.
1229   if (resolver_ == nullptr) return;
1230   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1231     gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1232   }
1233   // We only want to trace the address resolution in the follow cases:
1234   // (a) Address resolution resulted in service config change.
1235   // (b) Address resolution that causes number of backends to go from
1236   //     zero to non-zero.
1237   // (c) Address resolution that causes number of backends to go from
1238   //     non-zero to zero.
1239   // (d) Address resolution that causes a new LB policy to be created.
1240   //
1241   // We track a list of strings to eventually be concatenated and traced.
1242   absl::InlinedVector<const char*, 3> trace_strings;
1243   if (result.addresses.empty() && previous_resolution_contained_addresses_) {
1244     trace_strings.push_back("Address list became empty");
1245   } else if (!result.addresses.empty() &&
1246              !previous_resolution_contained_addresses_) {
1247     trace_strings.push_back("Address list became non-empty");
1248   }
1249   previous_resolution_contained_addresses_ = !result.addresses.empty();
1250   std::string service_config_error_string_storage;
1251   if (result.service_config_error != GRPC_ERROR_NONE) {
1252     service_config_error_string_storage =
1253         grpc_error_std_string(result.service_config_error);
1254     trace_strings.push_back(service_config_error_string_storage.c_str());
1255   }
1256   // Choose the service config.
1257   RefCountedPtr<ServiceConfig> service_config;
1258   RefCountedPtr<ConfigSelector> config_selector;
1259   if (result.service_config_error != GRPC_ERROR_NONE) {
1260     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1261       gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1262               this, grpc_error_std_string(result.service_config_error).c_str());
1263     }
1264     // If the service config was invalid, then fallback to the
1265     // previously returned service config.
1266     if (saved_service_config_ != nullptr) {
1267       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1268         gpr_log(GPR_INFO,
1269                 "chand=%p: resolver returned invalid service config. "
1270                 "Continuing to use previous service config.",
1271                 this);
1272       }
1273       service_config = saved_service_config_;
1274       config_selector = saved_config_selector_;
1275     } else {
1276       // We received an invalid service config and we don't have a
1277       // previous service config to fall back to.  Put the channel into
1278       // TRANSIENT_FAILURE.
1279       OnResolverErrorLocked(GRPC_ERROR_REF(result.service_config_error));
1280       trace_strings.push_back("no valid service config");
1281     }
1282   } else if (result.service_config == nullptr) {
1283     // Resolver did not return any service config.
1284     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1285       gpr_log(GPR_INFO,
1286               "chand=%p: resolver returned no service config. Using default "
1287               "service config for channel.",
1288               this);
1289     }
1290     service_config = default_service_config_;
1291   } else {
1292     // Use ServiceConfig and ConfigSelector returned by resolver.
1293     service_config = result.service_config;
1294     config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
1295   }
1296   if (service_config != nullptr) {
1297     // Extract global config for client channel.
1298     const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1299         static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1300             service_config->GetGlobalParsedConfig(
1301                 internal::ClientChannelServiceConfigParser::ParserIndex()));
1302     // Choose LB policy config.
1303     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1304         ChooseLbPolicy(result, parsed_service_config);
1305     // Check if the ServiceConfig has changed.
1306     const bool service_config_changed =
1307         saved_service_config_ == nullptr ||
1308         service_config->json_string() != saved_service_config_->json_string();
1309     // Check if the ConfigSelector has changed.
1310     const bool config_selector_changed = !ConfigSelector::Equals(
1311         saved_config_selector_.get(), config_selector.get());
1312     // If either has changed, apply the global parameters now.
1313     if (service_config_changed || config_selector_changed) {
1314       // Update service config in control plane.
1315       UpdateServiceConfigInControlPlaneLocked(
1316           std::move(service_config), std::move(config_selector),
1317           parsed_service_config, lb_policy_config->name());
1318     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1319       gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1320     }
1321     // Create or update LB policy, as needed.
1322     CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
1323                                  std::move(result));
1324     if (service_config_changed || config_selector_changed) {
1325       // Start using new service config for calls.
1326       // This needs to happen after the LB policy has been updated, since
1327       // the ConfigSelector may need the LB policy to know about new
1328       // destinations before it can send RPCs to those destinations.
1329       UpdateServiceConfigInDataPlaneLocked();
1330       // TODO(ncteisen): might be worth somehow including a snippet of the
1331       // config in the trace, at the risk of bloating the trace logs.
1332       trace_strings.push_back("Service config changed");
1333     }
1334   }
1335   // Add channel trace event.
1336   if (!trace_strings.empty()) {
1337     std::string message =
1338         absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1339     if (channelz_node_ != nullptr) {
1340       channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1341                                     grpc_slice_from_cpp_string(message));
1342     }
1343   }
1344 }
1345 
OnResolverErrorLocked(grpc_error_handle error)1346 void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
1347   if (resolver_ == nullptr) {
1348     GRPC_ERROR_UNREF(error);
1349     return;
1350   }
1351   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1352     gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1353             grpc_error_std_string(error).c_str());
1354   }
1355   // If we already have an LB policy from a previous resolution
1356   // result, then we continue to let it set the connectivity state.
1357   // Otherwise, we go into TRANSIENT_FAILURE.
1358   if (lb_policy_ == nullptr) {
1359     grpc_error_handle state_error =
1360         GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1361             "Resolver transient failure", &error, 1);
1362     {
1363       MutexLock lock(&resolution_mu_);
1364       // Update resolver transient failure.
1365       GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1366       resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
1367       // Process calls that were queued waiting for the resolver result.
1368       for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1369            call = call->next) {
1370         grpc_call_element* elem = call->elem;
1371         CallData* calld = static_cast<CallData*>(elem->call_data);
1372         grpc_error_handle error = GRPC_ERROR_NONE;
1373         if (calld->CheckResolutionLocked(elem, &error)) {
1374           calld->AsyncResolutionDone(elem, error);
1375         }
1376       }
1377     }
1378     // Update connectivity state.
1379     UpdateStateAndPickerLocked(
1380         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
1381         "resolver failure",
1382         absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1383             state_error));
1384   }
1385   GRPC_ERROR_UNREF(error);
1386 }
1387 
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)1388 void ClientChannel::CreateOrUpdateLbPolicyLocked(
1389     RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1390     Resolver::Result result) {
1391   // Construct update.
1392   LoadBalancingPolicy::UpdateArgs update_args;
1393   update_args.addresses = std::move(result.addresses);
1394   update_args.config = std::move(lb_policy_config);
1395   // Remove the config selector from channel args so that we're not holding
1396   // unnecessary refs that cause it to be destroyed somewhere other than in the
1397   // WorkSerializer.
1398   const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
1399   update_args.args =
1400       grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
1401   // Create policy if needed.
1402   if (lb_policy_ == nullptr) {
1403     lb_policy_ = CreateLbPolicyLocked(*update_args.args);
1404   }
1405   // Update the policy.
1406   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1407     gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1408             lb_policy_.get());
1409   }
1410   lb_policy_->UpdateLocked(std::move(update_args));
1411 }
1412 
1413 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)1414 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1415     const grpc_channel_args& args) {
1416   LoadBalancingPolicy::Args lb_policy_args;
1417   lb_policy_args.work_serializer = work_serializer_;
1418   lb_policy_args.channel_control_helper =
1419       absl::make_unique<ClientChannelControlHelper>(this);
1420   lb_policy_args.args = &args;
1421   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1422       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1423                                          &grpc_client_channel_routing_trace);
1424   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1425     gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1426             lb_policy.get());
1427   }
1428   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1429                                    interested_parties_);
1430   return lb_policy;
1431 }
1432 
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)1433 void ClientChannel::AddResolverQueuedCall(ResolverQueuedCall* call,
1434                                           grpc_polling_entity* pollent) {
1435   // Add call to queued calls list.
1436   call->next = resolver_queued_calls_;
1437   resolver_queued_calls_ = call;
1438   // Add call's pollent to channel's interested_parties, so that I/O
1439   // can be done under the call's CQ.
1440   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1441 }
1442 
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)1443 void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
1444                                              grpc_polling_entity* pollent) {
1445   // Remove call's pollent from channel's interested_parties.
1446   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1447   // Remove from queued calls list.
1448   for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
1449        call = &(*call)->next) {
1450     if (*call == to_remove) {
1451       *call = to_remove->next;
1452       return;
1453     }
1454   }
1455 }
1456 
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)1457 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1458     RefCountedPtr<ServiceConfig> service_config,
1459     RefCountedPtr<ConfigSelector> config_selector,
1460     const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1461     const char* lb_policy_name) {
1462   UniquePtr<char> service_config_json(
1463       gpr_strdup(service_config->json_string().c_str()));
1464   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1465     gpr_log(GPR_INFO,
1466             "chand=%p: resolver returned updated service config: \"%s\"", this,
1467             service_config_json.get());
1468   }
1469   // Save service config.
1470   saved_service_config_ = std::move(service_config);
1471   // Update health check service name if needed.
1472   if (health_check_service_name_ !=
1473       parsed_service_config->health_check_service_name()) {
1474     health_check_service_name_ =
1475         parsed_service_config->health_check_service_name();
1476     // Update health check service name used by existing subchannel wrappers.
1477     for (auto* subchannel_wrapper : subchannel_wrappers_) {
1478       subchannel_wrapper->UpdateHealthCheckServiceName(
1479           health_check_service_name_);
1480     }
1481   }
1482   // Swap out the data used by GetChannelInfo().
1483   UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
1484   {
1485     MutexLock lock(&info_mu_);
1486     info_lb_policy_name_ = std::move(lb_policy_name_owned);
1487     info_service_config_json_ = std::move(service_config_json);
1488   }
1489   // Save config selector.
1490   saved_config_selector_ = std::move(config_selector);
1491   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1492     gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1493             saved_config_selector_.get());
1494   }
1495 }
1496 
UpdateServiceConfigInDataPlaneLocked()1497 void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
1498   // Grab ref to service config.
1499   RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1500   // Grab ref to config selector.  Use default if resolver didn't supply one.
1501   RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1502   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1503     gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1504             saved_config_selector_.get());
1505   }
1506   if (config_selector == nullptr) {
1507     config_selector =
1508         MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1509   }
1510   // Construct dynamic filter stack.
1511   std::vector<const grpc_channel_filter*> filters =
1512       config_selector->GetFilters();
1513   if (enable_retries_) {
1514     filters.push_back(&kRetryFilterVtable);
1515   } else {
1516     filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1517   }
1518   absl::InlinedVector<grpc_arg, 2> args_to_add = {
1519       grpc_channel_arg_pointer_create(
1520           const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL), this,
1521           &kClientChannelArgPointerVtable),
1522       grpc_channel_arg_pointer_create(
1523           const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_OBJ), service_config.get(),
1524           &kServiceConfigObjArgPointerVtable),
1525   };
1526   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
1527       channel_args_, args_to_add.data(), args_to_add.size());
1528   new_args = config_selector->ModifyChannelArgs(new_args);
1529   RefCountedPtr<DynamicFilters> dynamic_filters =
1530       DynamicFilters::Create(new_args, std::move(filters));
1531   GPR_ASSERT(dynamic_filters != nullptr);
1532   grpc_channel_args_destroy(new_args);
1533   // Grab data plane lock to update service config.
1534   //
1535   // We defer unreffing the old values (and deallocating memory) until
1536   // after releasing the lock to keep the critical section small.
1537   std::set<grpc_call_element*> calls_pending_resolver_result;
1538   {
1539     MutexLock lock(&resolution_mu_);
1540     GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1541     resolver_transient_failure_error_ = GRPC_ERROR_NONE;
1542     // Update service config.
1543     received_service_config_data_ = true;
1544     // Old values will be unreffed after lock is released.
1545     service_config_.swap(service_config);
1546     config_selector_.swap(config_selector);
1547     dynamic_filters_.swap(dynamic_filters);
1548     // Process calls that were queued waiting for the resolver result.
1549     for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1550          call = call->next) {
1551       grpc_call_element* elem = call->elem;
1552       CallData* calld = static_cast<CallData*>(elem->call_data);
1553       grpc_error_handle error = GRPC_ERROR_NONE;
1554       if (calld->CheckResolutionLocked(elem, &error)) {
1555         calld->AsyncResolutionDone(elem, error);
1556       }
1557     }
1558   }
1559   // Old values will be unreffed after lock is released when they go out
1560   // of scope.
1561 }
1562 
CreateResolverLocked()1563 void ClientChannel::CreateResolverLocked() {
1564   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1565     gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
1566   }
1567   resolver_ = ResolverRegistry::CreateResolver(
1568       target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
1569       absl::make_unique<ResolverResultHandler>(this));
1570   // Since the validity of the args was checked when the channel was created,
1571   // CreateResolver() must return a non-null result.
1572   GPR_ASSERT(resolver_ != nullptr);
1573   UpdateStateAndPickerLocked(
1574       GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1575       absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
1576   resolver_->StartLocked();
1577   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1578     gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1579   }
1580 }
1581 
DestroyResolverAndLbPolicyLocked()1582 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
1583   if (resolver_ != nullptr) {
1584     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1585       gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1586               resolver_.get());
1587     }
1588     resolver_.reset();
1589     if (lb_policy_ != nullptr) {
1590       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1591         gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1592                 lb_policy_.get());
1593       }
1594       grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1595                                        interested_parties_);
1596       lb_policy_.reset();
1597     }
1598   }
1599 }
1600 
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1601 void ClientChannel::UpdateStateAndPickerLocked(
1602     grpc_connectivity_state state, const absl::Status& status,
1603     const char* reason,
1604     std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
1605   // Special case for IDLE and SHUTDOWN states.
1606   if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
1607     saved_service_config_.reset();
1608     saved_config_selector_.reset();
1609     // Acquire resolution lock to update config selector and associated state.
1610     // To minimize lock contention, we wait to unref these objects until
1611     // after we release the lock.
1612     RefCountedPtr<ServiceConfig> service_config_to_unref;
1613     RefCountedPtr<ConfigSelector> config_selector_to_unref;
1614     RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1615     {
1616       MutexLock lock(&resolution_mu_);
1617       received_service_config_data_ = false;
1618       service_config_to_unref = std::move(service_config_);
1619       config_selector_to_unref = std::move(config_selector_);
1620       dynamic_filters_to_unref = std::move(dynamic_filters_);
1621     }
1622   }
1623   // Update connectivity state.
1624   state_tracker_.SetState(state, status, reason);
1625   if (channelz_node_ != nullptr) {
1626     channelz_node_->SetConnectivityState(state);
1627     channelz_node_->AddTraceEvent(
1628         channelz::ChannelTrace::Severity::Info,
1629         grpc_slice_from_static_string(
1630             channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1631                 state)));
1632   }
1633   // Grab data plane lock to do subchannel updates and update the picker.
1634   //
1635   // Note that we want to minimize the work done while holding the data
1636   // plane lock, to keep the critical section small.  So, for all of the
1637   // objects that we might wind up unreffing here, we actually hold onto
1638   // the refs until after we release the lock, and then unref them at
1639   // that point.  This includes the following:
1640   // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
1641   // - ownership of the existing picker in picker_
1642   {
1643     MutexLock lock(&data_plane_mu_);
1644     // Handle subchannel updates.
1645     for (auto& p : pending_subchannel_updates_) {
1646       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1647         gpr_log(GPR_INFO,
1648                 "chand=%p: updating subchannel wrapper %p data plane "
1649                 "connected_subchannel to %p",
1650                 this, p.first.get(), p.second.get());
1651       }
1652       // Note: We do not remove the entry from pending_subchannel_updates_
1653       // here, since this would unref the subchannel wrapper; instead,
1654       // we wait until we've released the lock to clear the map.
1655       p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
1656     }
1657     // Swap out the picker.
1658     // Note: Original value will be destroyed after the lock is released.
1659     picker_.swap(picker);
1660     // Re-process queued picks.
1661     for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
1662          call = call->next) {
1663       grpc_error_handle error = GRPC_ERROR_NONE;
1664       if (call->lb_call->PickSubchannelLocked(&error)) {
1665         call->lb_call->AsyncPickDone(error);
1666       }
1667     }
1668   }
1669   // Clear the pending update map after releasing the lock, to keep the
1670   // critical section small.
1671   pending_subchannel_updates_.clear();
1672 }
1673 
DoPingLocked(grpc_transport_op * op)1674 grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
1675   if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1676     return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1677   }
1678   LoadBalancingPolicy::PickResult result;
1679   {
1680     MutexLock lock(&data_plane_mu_);
1681     result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1682   }
1683   ConnectedSubchannel* connected_subchannel = nullptr;
1684   if (result.subchannel != nullptr) {
1685     SubchannelWrapper* subchannel =
1686         static_cast<SubchannelWrapper*>(result.subchannel.get());
1687     connected_subchannel = subchannel->connected_subchannel();
1688   }
1689   if (connected_subchannel != nullptr) {
1690     connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1691   } else {
1692     if (result.error == GRPC_ERROR_NONE) {
1693       result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1694           "LB policy dropped call on ping");
1695     }
1696   }
1697   return result.error;
1698 }
1699 
StartTransportOpLocked(grpc_transport_op * op)1700 void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
1701   // Connectivity watch.
1702   if (op->start_connectivity_watch != nullptr) {
1703     state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1704                               std::move(op->start_connectivity_watch));
1705   }
1706   if (op->stop_connectivity_watch != nullptr) {
1707     state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1708   }
1709   // Ping.
1710   if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1711     grpc_error_handle error = DoPingLocked(op);
1712     if (error != GRPC_ERROR_NONE) {
1713       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
1714                    GRPC_ERROR_REF(error));
1715       ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1716     }
1717     op->bind_pollset = nullptr;
1718     op->send_ping.on_initiate = nullptr;
1719     op->send_ping.on_ack = nullptr;
1720   }
1721   // Reset backoff.
1722   if (op->reset_connect_backoff) {
1723     if (lb_policy_ != nullptr) {
1724       lb_policy_->ResetBackoffLocked();
1725     }
1726   }
1727   // Disconnect or enter IDLE.
1728   if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1729     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1730       gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1731               grpc_error_std_string(op->disconnect_with_error).c_str());
1732     }
1733     DestroyResolverAndLbPolicyLocked();
1734     intptr_t value;
1735     if (grpc_error_get_int(op->disconnect_with_error,
1736                            GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
1737         static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1738       if (disconnect_error() == GRPC_ERROR_NONE) {
1739         // Enter IDLE state.
1740         UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1741                                    "channel entering IDLE", nullptr);
1742       }
1743       GRPC_ERROR_UNREF(op->disconnect_with_error);
1744     } else {
1745       // Disconnect.
1746       GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
1747                  GRPC_ERROR_NONE);
1748       disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
1749       UpdateStateAndPickerLocked(
1750           GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1751           absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1752               GRPC_ERROR_REF(op->disconnect_with_error)));
1753     }
1754   }
1755   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1756   ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1757 }
1758 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1759 void ClientChannel::StartTransportOp(grpc_channel_element* elem,
1760                                      grpc_transport_op* op) {
1761   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1762   GPR_ASSERT(op->set_accept_stream == false);
1763   // Handle bind_pollset.
1764   if (op->bind_pollset != nullptr) {
1765     grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1766   }
1767   // Pop into control plane work_serializer for remaining ops.
1768   GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1769   chand->work_serializer_->Run(
1770       [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) {
1771         chand->StartTransportOpLocked(op);
1772       },
1773       DEBUG_LOCATION);
1774 }
1775 
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1776 void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
1777                                    const grpc_channel_info* info) {
1778   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1779   MutexLock lock(&chand->info_mu_);
1780   if (info->lb_policy_name != nullptr) {
1781     *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1782   }
1783   if (info->service_config_json != nullptr) {
1784     *info->service_config_json =
1785         gpr_strdup(chand->info_service_config_json_.get());
1786   }
1787 }
1788 
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)1789 void ClientChannel::AddLbQueuedCall(LbQueuedCall* call,
1790                                     grpc_polling_entity* pollent) {
1791   // Add call to queued picks list.
1792   call->next = lb_queued_calls_;
1793   lb_queued_calls_ = call;
1794   // Add call's pollent to channel's interested_parties, so that I/O
1795   // can be done under the call's CQ.
1796   grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1797 }
1798 
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)1799 void ClientChannel::RemoveLbQueuedCall(LbQueuedCall* to_remove,
1800                                        grpc_polling_entity* pollent) {
1801   // Remove call's pollent from channel's interested_parties.
1802   grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1803   // Remove from queued picks list.
1804   for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
1805        call = &(*call)->next) {
1806     if (*call == to_remove) {
1807       *call = to_remove->next;
1808       return;
1809     }
1810   }
1811 }
1812 
1813 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const1814 ClientChannel::GetConnectedSubchannelInDataPlane(
1815     SubchannelInterface* subchannel) const {
1816   SubchannelWrapper* subchannel_wrapper =
1817       static_cast<SubchannelWrapper*>(subchannel);
1818   ConnectedSubchannel* connected_subchannel =
1819       subchannel_wrapper->connected_subchannel_in_data_plane();
1820   if (connected_subchannel == nullptr) return nullptr;
1821   return connected_subchannel->Ref();
1822 }
1823 
TryToConnectLocked()1824 void ClientChannel::TryToConnectLocked() {
1825   if (lb_policy_ != nullptr) {
1826     lb_policy_->ExitIdleLocked();
1827   } else if (resolver_ == nullptr) {
1828     CreateResolverLocked();
1829   }
1830   GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1831 }
1832 
CheckConnectivityState(bool try_to_connect)1833 grpc_connectivity_state ClientChannel::CheckConnectivityState(
1834     bool try_to_connect) {
1835   // state_tracker_ is guarded by work_serializer_, which we're not
1836   // holding here.  But the one method of state_tracker_ that *is*
1837   // thread-safe to call without external synchronization is the state()
1838   // method, so we can disable thread-safety analysis for this one read.
1839   grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1840   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1841     GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1842     work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1843                               work_serializer_) { TryToConnectLocked(); },
1844                           DEBUG_LOCATION);
1845   }
1846   return out;
1847 }
1848 
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1849 void ClientChannel::AddConnectivityWatcher(
1850     grpc_connectivity_state initial_state,
1851     OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1852   new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1853 }
1854 
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1855 void ClientChannel::RemoveConnectivityWatcher(
1856     AsyncConnectivityStateWatcherInterface* watcher) {
1857   new ConnectivityWatcherRemover(this, watcher);
1858 }
1859 
1860 //
1861 // CallData implementation
1862 //
1863 
CallData(grpc_call_element * elem,const ClientChannel & chand,const grpc_call_element_args & args)1864 ClientChannel::CallData::CallData(grpc_call_element* elem,
1865                                   const ClientChannel& chand,
1866                                   const grpc_call_element_args& args)
1867     : deadline_state_(elem, args,
1868                       GPR_LIKELY(chand.deadline_checking_enabled_)
1869                           ? args.deadline
1870                           : GRPC_MILLIS_INF_FUTURE),
1871       path_(grpc_slice_ref_internal(args.path)),
1872       call_start_time_(args.start_time),
1873       deadline_(args.deadline),
1874       arena_(args.arena),
1875       owning_call_(args.call_stack),
1876       call_combiner_(args.call_combiner),
1877       call_context_(args.context) {
1878   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1879     gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
1880   }
1881 }
1882 
~CallData()1883 ClientChannel::CallData::~CallData() {
1884   grpc_slice_unref_internal(path_);
1885   GRPC_ERROR_UNREF(cancel_error_);
1886   // Make sure there are no remaining pending batches.
1887   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1888     GPR_ASSERT(pending_batches_[i] == nullptr);
1889   }
1890 }
1891 
Init(grpc_call_element * elem,const grpc_call_element_args * args)1892 grpc_error_handle ClientChannel::CallData::Init(
1893     grpc_call_element* elem, const grpc_call_element_args* args) {
1894   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1895   new (elem->call_data) CallData(elem, *chand, *args);
1896   return GRPC_ERROR_NONE;
1897 }
1898 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1899 void ClientChannel::CallData::Destroy(
1900     grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1901     grpc_closure* then_schedule_closure) {
1902   CallData* calld = static_cast<CallData*>(elem->call_data);
1903   RefCountedPtr<DynamicFilters::Call> dynamic_call =
1904       std::move(calld->dynamic_call_);
1905   calld->~CallData();
1906   if (GPR_LIKELY(dynamic_call != nullptr)) {
1907     dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1908   } else {
1909     // TODO(yashkt) : This can potentially be a Closure::Run
1910     ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1911   }
1912 }
1913 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1914 void ClientChannel::CallData::StartTransportStreamOpBatch(
1915     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1916   GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1917   CallData* calld = static_cast<CallData*>(elem->call_data);
1918   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1919   if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
1920     grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1921   }
1922   // Intercept recv_initial_metadata for config selector on-committed callback.
1923   if (batch->recv_initial_metadata) {
1924     calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
1925   }
1926   // If we've previously been cancelled, immediately fail any new batches.
1927   if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1928     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1929       gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1930               chand, calld,
1931               grpc_error_std_string(calld->cancel_error_).c_str());
1932     }
1933     // Note: This will release the call combiner.
1934     grpc_transport_stream_op_batch_finish_with_failure(
1935         batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1936     return;
1937   }
1938   // Handle cancellation.
1939   if (GPR_UNLIKELY(batch->cancel_stream)) {
1940     // Stash a copy of cancel_error in our call data, so that we can use
1941     // it for subsequent operations.  This ensures that if the call is
1942     // cancelled before any batches are passed down (e.g., if the deadline
1943     // is in the past when the call starts), we can return the right
1944     // error to the caller when the first batch does get passed down.
1945     GRPC_ERROR_UNREF(calld->cancel_error_);
1946     calld->cancel_error_ =
1947         GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1948     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1949       gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1950               calld, grpc_error_std_string(calld->cancel_error_).c_str());
1951     }
1952     // If we do not have a dynamic call (i.e., name resolution has not
1953     // yet completed), fail all pending batches.  Otherwise, send the
1954     // cancellation down to the dynamic call.
1955     if (calld->dynamic_call_ == nullptr) {
1956       calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1957                                 NoYieldCallCombiner);
1958       // Note: This will release the call combiner.
1959       grpc_transport_stream_op_batch_finish_with_failure(
1960           batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1961     } else {
1962       // Note: This will release the call combiner.
1963       calld->dynamic_call_->StartTransportStreamOpBatch(batch);
1964     }
1965     return;
1966   }
1967   // Add the batch to the pending list.
1968   calld->PendingBatchesAdd(elem, batch);
1969   // Check if we've already created a dynamic call.
1970   // Note that once we have done so, we do not need to acquire the channel's
1971   // resolution mutex, which is more efficient (especially for streaming calls).
1972   if (calld->dynamic_call_ != nullptr) {
1973     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1974       gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
1975               chand, calld, calld->dynamic_call_.get());
1976     }
1977     calld->PendingBatchesResume(elem);
1978     return;
1979   }
1980   // We do not yet have a dynamic call.
1981   // For batches containing a send_initial_metadata op, acquire the
1982   // channel's resolution mutex to apply the service config to the call,
1983   // after which we will create a dynamic call.
1984   if (GPR_LIKELY(batch->send_initial_metadata)) {
1985     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1986       gpr_log(GPR_INFO,
1987               "chand=%p calld=%p: grabbing resolution mutex to apply service "
1988               "config",
1989               chand, calld);
1990     }
1991     CheckResolution(elem, GRPC_ERROR_NONE);
1992   } else {
1993     // For all other batches, release the call combiner.
1994     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1995       gpr_log(GPR_INFO,
1996               "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1997               calld);
1998     }
1999     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2000                             "batch does not include send_initial_metadata");
2001   }
2002 }
2003 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2004 void ClientChannel::CallData::SetPollent(grpc_call_element* elem,
2005                                          grpc_polling_entity* pollent) {
2006   CallData* calld = static_cast<CallData*>(elem->call_data);
2007   calld->pollent_ = pollent;
2008 }
2009 
2010 //
2011 // pending_batches management
2012 //
2013 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2014 size_t ClientChannel::CallData::GetBatchIndex(
2015     grpc_transport_stream_op_batch* batch) {
2016   // Note: It is important the send_initial_metadata be the first entry
2017   // here, since the code in ApplyServiceConfigToCallLocked() and
2018   // CheckResolutionLocked() assumes it will be.
2019   if (batch->send_initial_metadata) return 0;
2020   if (batch->send_message) return 1;
2021   if (batch->send_trailing_metadata) return 2;
2022   if (batch->recv_initial_metadata) return 3;
2023   if (batch->recv_message) return 4;
2024   if (batch->recv_trailing_metadata) return 5;
2025   GPR_UNREACHABLE_CODE(return (size_t)-1);
2026 }
2027 
2028 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2029 void ClientChannel::CallData::PendingBatchesAdd(
2030     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2031   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2032   const size_t idx = GetBatchIndex(batch);
2033   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2034     gpr_log(GPR_INFO,
2035             "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2036             this, idx);
2037   }
2038   grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2039   GPR_ASSERT(pending == nullptr);
2040   pending = batch;
2041 }
2042 
2043 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2044 void ClientChannel::CallData::FailPendingBatchInCallCombiner(
2045     void* arg, grpc_error_handle error) {
2046   grpc_transport_stream_op_batch* batch =
2047       static_cast<grpc_transport_stream_op_batch*>(arg);
2048   CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2049   // Note: This will release the call combiner.
2050   grpc_transport_stream_op_batch_finish_with_failure(
2051       batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2052 }
2053 
2054 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2055 void ClientChannel::CallData::PendingBatchesFail(
2056     grpc_call_element* elem, grpc_error_handle error,
2057     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2058   GPR_ASSERT(error != GRPC_ERROR_NONE);
2059   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2060     size_t num_batches = 0;
2061     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2062       if (pending_batches_[i] != nullptr) ++num_batches;
2063     }
2064     gpr_log(GPR_INFO,
2065             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2066             elem->channel_data, this, num_batches,
2067             grpc_error_std_string(error).c_str());
2068   }
2069   CallCombinerClosureList closures;
2070   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2071     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2072     if (batch != nullptr) {
2073       batch->handler_private.extra_arg = this;
2074       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2075                         FailPendingBatchInCallCombiner, batch,
2076                         grpc_schedule_on_exec_ctx);
2077       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2078                    "PendingBatchesFail");
2079       batch = nullptr;
2080     }
2081   }
2082   if (yield_call_combiner_predicate(closures)) {
2083     closures.RunClosures(call_combiner_);
2084   } else {
2085     closures.RunClosuresWithoutYielding(call_combiner_);
2086   }
2087   GRPC_ERROR_UNREF(error);
2088 }
2089 
2090 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2091 void ClientChannel::CallData::ResumePendingBatchInCallCombiner(
2092     void* arg, grpc_error_handle /*ignored*/) {
2093   grpc_transport_stream_op_batch* batch =
2094       static_cast<grpc_transport_stream_op_batch*>(arg);
2095   auto* elem =
2096       static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2097   auto* calld = static_cast<CallData*>(elem->call_data);
2098   // Note: This will release the call combiner.
2099   calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2100 }
2101 
2102 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2103 void ClientChannel::CallData::PendingBatchesResume(grpc_call_element* elem) {
2104   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2105   // Retries not enabled; send down batches as-is.
2106   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2107     size_t num_batches = 0;
2108     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2109       if (pending_batches_[i] != nullptr) ++num_batches;
2110     }
2111     gpr_log(GPR_INFO,
2112             "chand=%p calld=%p: starting %" PRIuPTR
2113             " pending batches on dynamic_call=%p",
2114             chand, this, num_batches, dynamic_call_.get());
2115   }
2116   CallCombinerClosureList closures;
2117   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2118     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2119     if (batch != nullptr) {
2120       batch->handler_private.extra_arg = elem;
2121       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2122                         ResumePendingBatchInCallCombiner, batch, nullptr);
2123       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2124                    "PendingBatchesResume");
2125       batch = nullptr;
2126     }
2127   }
2128   // Note: This will release the call combiner.
2129   closures.RunClosures(call_combiner_);
2130 }
2131 
2132 //
2133 // name resolution
2134 //
2135 
2136 // A class to handle the call combiner cancellation callback for a
2137 // queued pick.
2138 class ClientChannel::CallData::ResolverQueuedCallCanceller {
2139  public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2140   explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2141     auto* calld = static_cast<CallData*>(elem->call_data);
2142     GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2143     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2144                       grpc_schedule_on_exec_ctx);
2145     calld->call_combiner_->SetNotifyOnCancel(&closure_);
2146   }
2147 
2148  private:
CancelLocked(void * arg,grpc_error_handle error)2149   static void CancelLocked(void* arg, grpc_error_handle error) {
2150     auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2151     auto* chand = static_cast<ClientChannel*>(self->elem_->channel_data);
2152     auto* calld = static_cast<CallData*>(self->elem_->call_data);
2153     {
2154       MutexLock lock(&chand->resolution_mu_);
2155       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2156         gpr_log(GPR_INFO,
2157                 "chand=%p calld=%p: cancelling resolver queued pick: "
2158                 "error=%s self=%p calld->resolver_pick_canceller=%p",
2159                 chand, calld, grpc_error_std_string(error).c_str(), self,
2160                 calld->resolver_call_canceller_);
2161       }
2162       if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2163         // Remove pick from list of queued picks.
2164         calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2165         // Fail pending batches on the call.
2166         calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2167                                   YieldCallCombinerIfPendingBatchesFound);
2168       }
2169     }
2170     GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2171     delete self;
2172   }
2173 
2174   grpc_call_element* elem_;
2175   grpc_closure closure_;
2176 };
2177 
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2178 void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2179     grpc_call_element* elem) {
2180   if (!queued_pending_resolver_result_) return;
2181   auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2182   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2183     gpr_log(GPR_INFO,
2184             "chand=%p calld=%p: removing from resolver queued picks list",
2185             chand, this);
2186   }
2187   chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2188   queued_pending_resolver_result_ = false;
2189   // Lame the call combiner canceller.
2190   resolver_call_canceller_ = nullptr;
2191 }
2192 
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2193 void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked(
2194     grpc_call_element* elem) {
2195   if (queued_pending_resolver_result_) return;
2196   auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2197   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2198     gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2199             chand, this);
2200   }
2201   queued_pending_resolver_result_ = true;
2202   resolver_queued_call_.elem = elem;
2203   chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2204   // Register call combiner cancellation callback.
2205   resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2206 }
2207 
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2208 grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
2209     grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2210   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2211   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2212     gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2213             chand, this);
2214   }
2215   ConfigSelector* config_selector = chand->config_selector_.get();
2216   if (config_selector != nullptr) {
2217     // Use the ConfigSelector to determine the config for the call.
2218     ConfigSelector::CallConfig call_config =
2219         config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2220     if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2221     on_call_committed_ = std::move(call_config.on_call_committed);
2222     // Create a ServiceConfigCallData for the call.  This stores a ref to the
2223     // ServiceConfig and caches the right set of parsed configs to use for
2224     // the call.  The MethodConfig will store itself in the call context,
2225     // so that it can be accessed by filters in the subchannel, and it
2226     // will be cleaned up when the call ends.
2227     auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2228         std::move(call_config.service_config), call_config.method_configs,
2229         std::move(call_config.call_attributes), call_context_);
2230     // Apply our own method params to the call.
2231     auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
2232         service_config_call_data->GetMethodParsedConfig(
2233             internal::ClientChannelServiceConfigParser::ParserIndex()));
2234     if (method_params != nullptr) {
2235       // If the deadline from the service config is shorter than the one
2236       // from the client API, reset the deadline timer.
2237       if (chand->deadline_checking_enabled_ && method_params->timeout() != 0) {
2238         const grpc_millis per_method_deadline =
2239             grpc_cycle_counter_to_millis_round_up(call_start_time_) +
2240             method_params->timeout();
2241         if (per_method_deadline < deadline_) {
2242           deadline_ = per_method_deadline;
2243           grpc_deadline_state_reset(elem, deadline_);
2244         }
2245       }
2246       // If the service config set wait_for_ready and the application
2247       // did not explicitly set it, use the value from the service config.
2248       uint32_t* send_initial_metadata_flags =
2249           &pending_batches_[0]
2250                ->payload->send_initial_metadata.send_initial_metadata_flags;
2251       if (method_params->wait_for_ready().has_value() &&
2252           !(*send_initial_metadata_flags &
2253             GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
2254         if (method_params->wait_for_ready().value()) {
2255           *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2256         } else {
2257           *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2258         }
2259       }
2260     }
2261     // Set the dynamic filter stack.
2262     dynamic_filters_ = chand->dynamic_filters_;
2263   }
2264   return GRPC_ERROR_NONE;
2265 }
2266 
2267 void ClientChannel::CallData::
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2268     RecvInitialMetadataReadyForConfigSelectorCommitCallback(
2269         void* arg, grpc_error_handle error) {
2270   auto* self = static_cast<CallData*>(arg);
2271   if (self->on_call_committed_ != nullptr) {
2272     self->on_call_committed_();
2273     self->on_call_committed_ = nullptr;
2274   }
2275   // Chain to original callback.
2276   Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
2277                GRPC_ERROR_REF(error));
2278 }
2279 
2280 // TODO(roth): Consider not intercepting this callback unless we
2281 // actually need to, if this causes a performance problem.
2282 void ClientChannel::CallData::
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)2283     InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
2284         grpc_transport_stream_op_batch* batch) {
2285   original_recv_initial_metadata_ready_ =
2286       batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2287   GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
2288                     RecvInitialMetadataReadyForConfigSelectorCommitCallback,
2289                     this, nullptr);
2290   batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2291       &recv_initial_metadata_ready_;
2292 }
2293 
AsyncResolutionDone(grpc_call_element * elem,grpc_error_handle error)2294 void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem,
2295                                                   grpc_error_handle error) {
2296   GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
2297   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
2298 }
2299 
ResolutionDone(void * arg,grpc_error_handle error)2300 void ClientChannel::CallData::ResolutionDone(void* arg,
2301                                              grpc_error_handle error) {
2302   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2303   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2304   CallData* calld = static_cast<CallData*>(elem->call_data);
2305   if (error != GRPC_ERROR_NONE) {
2306     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2307       gpr_log(GPR_INFO,
2308               "chand=%p calld=%p: error applying config to call: error=%s",
2309               chand, calld, grpc_error_std_string(error).c_str());
2310     }
2311     calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
2312     return;
2313   }
2314   calld->CreateDynamicCall(elem);
2315 }
2316 
CheckResolution(void * arg,grpc_error_handle error)2317 void ClientChannel::CallData::CheckResolution(void* arg,
2318                                               grpc_error_handle error) {
2319   grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2320   CallData* calld = static_cast<CallData*>(elem->call_data);
2321   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2322   bool resolution_complete;
2323   {
2324     MutexLock lock(&chand->resolution_mu_);
2325     resolution_complete = calld->CheckResolutionLocked(elem, &error);
2326   }
2327   if (resolution_complete) {
2328     ResolutionDone(elem, error);
2329     GRPC_ERROR_UNREF(error);
2330   }
2331 }
2332 
CheckResolutionLocked(grpc_call_element * elem,grpc_error_handle * error)2333 bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
2334                                                     grpc_error_handle* error) {
2335   ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2336   // If we're still in IDLE, we need to start resolving.
2337   if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
2338     // Bounce into the control plane work serializer to start resolving,
2339     // in case we are still in IDLE state.  Since we are holding on to the
2340     // resolution mutex here, we offload it on the ExecCtx so that we don't
2341     // deadlock with ourselves.
2342     GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked");
2343     ExecCtx::Run(
2344         DEBUG_LOCATION,
2345         GRPC_CLOSURE_CREATE(
2346             [](void* arg, grpc_error_handle /*error*/) {
2347               auto* chand = static_cast<ClientChannel*>(arg);
2348               chand->work_serializer_->Run(
2349                   [chand]()
2350                       ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) {
2351                         chand->CheckConnectivityState(/*try_to_connect=*/true);
2352                         GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
2353                                                  "CheckResolutionLocked");
2354                       },
2355                   DEBUG_LOCATION);
2356             },
2357             chand, nullptr),
2358         GRPC_ERROR_NONE);
2359   }
2360   // Get send_initial_metadata batch and flags.
2361   auto& send_initial_metadata =
2362       pending_batches_[0]->payload->send_initial_metadata;
2363   grpc_metadata_batch* initial_metadata_batch =
2364       send_initial_metadata.send_initial_metadata;
2365   const uint32_t send_initial_metadata_flags =
2366       send_initial_metadata.send_initial_metadata_flags;
2367   // If we don't yet have a resolver result, we need to queue the call
2368   // until we get one.
2369   if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
2370     // If the resolver returned transient failure before returning the
2371     // first service config, fail any non-wait_for_ready calls.
2372     grpc_error_handle resolver_error = chand->resolver_transient_failure_error_;
2373     if (resolver_error != GRPC_ERROR_NONE &&
2374         (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
2375             0) {
2376       MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2377       *error = GRPC_ERROR_REF(resolver_error);
2378       return true;
2379     }
2380     // Either the resolver has not yet returned a result, or it has
2381     // returned transient failure but the call is wait_for_ready.  In
2382     // either case, queue the call.
2383     MaybeAddCallToResolverQueuedCallsLocked(elem);
2384     return false;
2385   }
2386   // Apply service config to call if not yet applied.
2387   if (GPR_LIKELY(!service_config_applied_)) {
2388     service_config_applied_ = true;
2389     *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
2390   }
2391   MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2392   return true;
2393 }
2394 
CreateDynamicCall(grpc_call_element * elem)2395 void ClientChannel::CallData::CreateDynamicCall(grpc_call_element* elem) {
2396   auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2397   DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
2398                                      pollent_,
2399                                      path_,
2400                                      call_start_time_,
2401                                      deadline_,
2402                                      arena_,
2403                                      call_context_,
2404                                      call_combiner_};
2405   grpc_error_handle error = GRPC_ERROR_NONE;
2406   DynamicFilters* channel_stack = args.channel_stack.get();
2407   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2408     gpr_log(
2409         GPR_INFO,
2410         "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2411         chand, this, channel_stack);
2412   }
2413   dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2414   if (error != GRPC_ERROR_NONE) {
2415     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2416       gpr_log(GPR_INFO,
2417               "chand=%p calld=%p: failed to create dynamic call: error=%s",
2418               chand, this, grpc_error_std_string(error).c_str());
2419     }
2420     PendingBatchesFail(elem, error, YieldCallCombiner);
2421     return;
2422   }
2423   PendingBatchesResume(elem);
2424 }
2425 
2426 //
2427 // ClientChannel::LoadBalancedCall::Metadata
2428 //
2429 
2430 class ClientChannel::LoadBalancedCall::Metadata
2431     : public LoadBalancingPolicy::MetadataInterface {
2432  public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)2433   Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
2434       : lb_call_(lb_call), batch_(batch) {}
2435 
Add(absl::string_view key,absl::string_view value)2436   void Add(absl::string_view key, absl::string_view value) override {
2437     grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
2438         lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
2439     linked_mdelem->md = grpc_mdelem_from_slices(
2440         ExternallyManagedSlice(key.data(), key.size()),
2441         ExternallyManagedSlice(value.data(), value.size()));
2442     GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
2443                GRPC_ERROR_NONE);
2444   }
2445 
begin() const2446   iterator begin() const override {
2447     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
2448                   "iterator size too large");
2449     return iterator(
2450         this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
2451   }
end() const2452   iterator end() const override {
2453     static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
2454                   "iterator size too large");
2455     return iterator(this, 0);
2456   }
2457 
erase(iterator it)2458   iterator erase(iterator it) override {
2459     grpc_linked_mdelem* linked_mdelem =
2460         reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
2461     intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
2462     grpc_metadata_batch_remove(batch_, linked_mdelem);
2463     return iterator(this, handle);
2464   }
2465 
2466  private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const2467   grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
2468     if (entry != nullptr && batch_->idx.named.path == entry) {
2469       return entry->next;
2470     }
2471     return entry;
2472   }
2473 
IteratorHandleNext(intptr_t handle) const2474   intptr_t IteratorHandleNext(intptr_t handle) const override {
2475     grpc_linked_mdelem* linked_mdelem =
2476         reinterpret_cast<grpc_linked_mdelem*>(handle);
2477     return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
2478   }
2479 
IteratorHandleGet(intptr_t handle) const2480   std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
2481       intptr_t handle) const override {
2482     grpc_linked_mdelem* linked_mdelem =
2483         reinterpret_cast<grpc_linked_mdelem*>(handle);
2484     return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
2485                           StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
2486   }
2487 
2488   LoadBalancedCall* lb_call_;
2489   grpc_metadata_batch* batch_;
2490 };
2491 
2492 //
2493 // ClientChannel::LoadBalancedCall::LbCallState
2494 //
2495 
2496 class ClientChannel::LoadBalancedCall::LbCallState
2497     : public LoadBalancingPolicy::CallState {
2498  public:
LbCallState(LoadBalancedCall * lb_call)2499   explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2500 
Alloc(size_t size)2501   void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
2502 
GetBackendMetricData()2503   const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
2504       override {
2505     if (lb_call_->backend_metric_data_ == nullptr) {
2506       grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
2507                                    .x_endpoint_load_metrics_bin;
2508       if (md != nullptr) {
2509         lb_call_->backend_metric_data_ =
2510             ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
2511       }
2512     }
2513     return lb_call_->backend_metric_data_;
2514   }
2515 
ExperimentalGetCallAttribute(const char * key)2516   absl::string_view ExperimentalGetCallAttribute(const char* key) override {
2517     auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
2518         lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2519     auto& call_attributes = service_config_call_data->call_attributes();
2520     auto it = call_attributes.find(key);
2521     if (it == call_attributes.end()) return absl::string_view();
2522     return it->second;
2523   }
2524 
2525  private:
2526   LoadBalancedCall* lb_call_;
2527 };
2528 
2529 //
2530 // LoadBalancedCall
2531 //
2532 
LoadBalancedCall(ClientChannel * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete)2533 ClientChannel::LoadBalancedCall::LoadBalancedCall(
2534     ClientChannel* chand, const grpc_call_element_args& args,
2535     grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete)
2536     : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
2537                      ? "LoadBalancedCall"
2538                      : nullptr),
2539       chand_(chand),
2540       path_(grpc_slice_ref_internal(args.path)),
2541       call_start_time_(args.start_time),
2542       deadline_(args.deadline),
2543       arena_(args.arena),
2544       owning_call_(args.call_stack),
2545       call_combiner_(args.call_combiner),
2546       call_context_(args.context),
2547       pollent_(pollent),
2548       on_call_destruction_complete_(on_call_destruction_complete) {}
2549 
~LoadBalancedCall()2550 ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
2551   grpc_slice_unref_internal(path_);
2552   GRPC_ERROR_UNREF(cancel_error_);
2553   GRPC_ERROR_UNREF(failure_error_);
2554   if (backend_metric_data_ != nullptr) {
2555     backend_metric_data_
2556         ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
2557   }
2558   // Make sure there are no remaining pending batches.
2559   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2560     GPR_ASSERT(pending_batches_[i] == nullptr);
2561   }
2562   if (on_call_destruction_complete_ != nullptr) {
2563     ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2564                  GRPC_ERROR_NONE);
2565   }
2566 }
2567 
GetBatchIndex(grpc_transport_stream_op_batch * batch)2568 size_t ClientChannel::LoadBalancedCall::GetBatchIndex(
2569     grpc_transport_stream_op_batch* batch) {
2570   // Note: It is important the send_initial_metadata be the first entry
2571   // here, since the code in PickSubchannelLocked() assumes it will be.
2572   if (batch->send_initial_metadata) return 0;
2573   if (batch->send_message) return 1;
2574   if (batch->send_trailing_metadata) return 2;
2575   if (batch->recv_initial_metadata) return 3;
2576   if (batch->recv_message) return 4;
2577   if (batch->recv_trailing_metadata) return 5;
2578   GPR_UNREACHABLE_CODE(return (size_t)-1);
2579 }
2580 
2581 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2582 void ClientChannel::LoadBalancedCall::PendingBatchesAdd(
2583     grpc_transport_stream_op_batch* batch) {
2584   const size_t idx = GetBatchIndex(batch);
2585   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2586     gpr_log(GPR_INFO,
2587             "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2588             chand_, this, idx);
2589   }
2590   GPR_ASSERT(pending_batches_[idx] == nullptr);
2591   pending_batches_[idx] = batch;
2592 }
2593 
2594 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2595 void ClientChannel::LoadBalancedCall::FailPendingBatchInCallCombiner(
2596     void* arg, grpc_error_handle error) {
2597   grpc_transport_stream_op_batch* batch =
2598       static_cast<grpc_transport_stream_op_batch*>(arg);
2599   auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
2600   // Note: This will release the call combiner.
2601   grpc_transport_stream_op_batch_finish_with_failure(
2602       batch, GRPC_ERROR_REF(error), self->call_combiner_);
2603 }
2604 
2605 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2606 void ClientChannel::LoadBalancedCall::PendingBatchesFail(
2607     grpc_error_handle error,
2608     YieldCallCombinerPredicate yield_call_combiner_predicate) {
2609   GPR_ASSERT(error != GRPC_ERROR_NONE);
2610   GRPC_ERROR_UNREF(failure_error_);
2611   failure_error_ = error;
2612   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2613     size_t num_batches = 0;
2614     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2615       if (pending_batches_[i] != nullptr) ++num_batches;
2616     }
2617     gpr_log(GPR_INFO,
2618             "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
2619             chand_, this, num_batches, grpc_error_std_string(error).c_str());
2620   }
2621   CallCombinerClosureList closures;
2622   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2623     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2624     if (batch != nullptr) {
2625       batch->handler_private.extra_arg = this;
2626       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2627                         FailPendingBatchInCallCombiner, batch,
2628                         grpc_schedule_on_exec_ctx);
2629       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2630                    "PendingBatchesFail");
2631       batch = nullptr;
2632     }
2633   }
2634   if (yield_call_combiner_predicate(closures)) {
2635     closures.RunClosures(call_combiner_);
2636   } else {
2637     closures.RunClosuresWithoutYielding(call_combiner_);
2638   }
2639 }
2640 
2641 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2642 void ClientChannel::LoadBalancedCall::ResumePendingBatchInCallCombiner(
2643     void* arg, grpc_error_handle /*ignored*/) {
2644   grpc_transport_stream_op_batch* batch =
2645       static_cast<grpc_transport_stream_op_batch*>(arg);
2646   SubchannelCall* subchannel_call =
2647       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2648   // Note: This will release the call combiner.
2649   subchannel_call->StartTransportStreamOpBatch(batch);
2650 }
2651 
2652 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2653 void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
2654   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2655     size_t num_batches = 0;
2656     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2657       if (pending_batches_[i] != nullptr) ++num_batches;
2658     }
2659     gpr_log(GPR_INFO,
2660             "chand=%p lb_call=%p: starting %" PRIuPTR
2661             " pending batches on subchannel_call=%p",
2662             chand_, this, num_batches, subchannel_call_.get());
2663   }
2664   CallCombinerClosureList closures;
2665   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2666     grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2667     if (batch != nullptr) {
2668       batch->handler_private.extra_arg = subchannel_call_.get();
2669       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2670                         ResumePendingBatchInCallCombiner, batch,
2671                         grpc_schedule_on_exec_ctx);
2672       closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2673                    "PendingBatchesResume");
2674       batch = nullptr;
2675     }
2676   }
2677   // Note: This will release the call combiner.
2678   closures.RunClosures(call_combiner_);
2679 }
2680 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2681 void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
2682     grpc_transport_stream_op_batch* batch) {
2683   // Intercept recv_trailing_metadata_ready for LB callback.
2684   if (batch->recv_trailing_metadata) {
2685     InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2686   }
2687   // If we've previously been cancelled, immediately fail any new batches.
2688   if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
2689     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2690       gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
2691               chand_, this, grpc_error_std_string(cancel_error_).c_str());
2692     }
2693     // Note: This will release the call combiner.
2694     grpc_transport_stream_op_batch_finish_with_failure(
2695         batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2696     return;
2697   }
2698   // Handle cancellation.
2699   if (GPR_UNLIKELY(batch->cancel_stream)) {
2700     // Stash a copy of cancel_error in our call data, so that we can use
2701     // it for subsequent operations.  This ensures that if the call is
2702     // cancelled before any batches are passed down (e.g., if the deadline
2703     // is in the past when the call starts), we can return the right
2704     // error to the caller when the first batch does get passed down.
2705     GRPC_ERROR_UNREF(cancel_error_);
2706     cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2707     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2708       gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
2709               chand_, this, grpc_error_std_string(cancel_error_).c_str());
2710     }
2711     // If we do not have a subchannel call (i.e., a pick has not yet
2712     // been started), fail all pending batches.  Otherwise, send the
2713     // cancellation down to the subchannel call.
2714     if (subchannel_call_ == nullptr) {
2715       PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
2716       // Note: This will release the call combiner.
2717       grpc_transport_stream_op_batch_finish_with_failure(
2718           batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2719     } else {
2720       // Note: This will release the call combiner.
2721       subchannel_call_->StartTransportStreamOpBatch(batch);
2722     }
2723     return;
2724   }
2725   // Add the batch to the pending list.
2726   PendingBatchesAdd(batch);
2727   // Check if we've already gotten a subchannel call.
2728   // Note that once we have picked a subchannel, we do not need to acquire
2729   // the channel's data plane mutex, which is more efficient (especially for
2730   // streaming calls).
2731   if (subchannel_call_ != nullptr) {
2732     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2733       gpr_log(GPR_INFO,
2734               "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2735               chand_, this, subchannel_call_.get());
2736     }
2737     PendingBatchesResume();
2738     return;
2739   }
2740   // We do not yet have a subchannel call.
2741   // For batches containing a send_initial_metadata op, acquire the
2742   // channel's data plane mutex to pick a subchannel.
2743   if (GPR_LIKELY(batch->send_initial_metadata)) {
2744     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2745       gpr_log(GPR_INFO,
2746               "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
2747               chand_, this);
2748     }
2749     PickSubchannel(this, GRPC_ERROR_NONE);
2750   } else {
2751     // For all other batches, release the call combiner.
2752     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2753       gpr_log(GPR_INFO,
2754               "chand=%p lb_call=%p: saved batch, yielding call combiner",
2755               chand_, this);
2756     }
2757     GRPC_CALL_COMBINER_STOP(call_combiner_,
2758                             "batch does not include send_initial_metadata");
2759   }
2760 }
2761 
2762 void ClientChannel::LoadBalancedCall::
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error_handle error)2763     RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg,
2764                                                     grpc_error_handle error) {
2765   auto* self = static_cast<LoadBalancedCall*>(arg);
2766   if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
2767     // Set error if call did not succeed.
2768     grpc_error_handle error_for_lb = GRPC_ERROR_NONE;
2769     if (error != GRPC_ERROR_NONE) {
2770       error_for_lb = error;
2771     } else {
2772       const auto& fields = self->recv_trailing_metadata_->idx.named;
2773       GPR_ASSERT(fields.grpc_status != nullptr);
2774       grpc_status_code status =
2775           grpc_get_status_code_from_metadata(fields.grpc_status->md);
2776       std::string msg;
2777       if (status != GRPC_STATUS_OK) {
2778         error_for_lb = grpc_error_set_int(
2779             GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
2780             GRPC_ERROR_INT_GRPC_STATUS, status);
2781         if (fields.grpc_message != nullptr) {
2782           error_for_lb = grpc_error_set_str(
2783               error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
2784               grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
2785         }
2786       }
2787     }
2788     // Invoke callback to LB policy.
2789     Metadata trailing_metadata(self, self->recv_trailing_metadata_);
2790     LbCallState lb_call_state(self);
2791     self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
2792                                            &lb_call_state);
2793     if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
2794   }
2795   // Chain to original callback.
2796   if (self->failure_error_ != GRPC_ERROR_NONE) {
2797     error = self->failure_error_;
2798     self->failure_error_ = GRPC_ERROR_NONE;
2799   } else {
2800     error = GRPC_ERROR_REF(error);
2801   }
2802   Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
2803                error);
2804 }
2805 
2806 void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)2807     InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2808         grpc_transport_stream_op_batch* batch) {
2809   recv_trailing_metadata_ =
2810       batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2811   original_recv_trailing_metadata_ready_ =
2812       batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2813   GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
2814                     RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
2815                     grpc_schedule_on_exec_ctx);
2816   batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2817       &recv_trailing_metadata_ready_;
2818 }
2819 
CreateSubchannelCall()2820 void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
2821   SubchannelCall::Args call_args = {
2822       std::move(connected_subchannel_), pollent_, path_, call_start_time_,
2823       deadline_, arena_,
2824       // TODO(roth): When we implement hedging support, we will probably
2825       // need to use a separate call context for each subchannel call.
2826       call_context_, call_combiner_};
2827   grpc_error_handle error = GRPC_ERROR_NONE;
2828   subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
2829   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2830     gpr_log(GPR_INFO,
2831             "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
2832             this, subchannel_call_.get(), grpc_error_std_string(error).c_str());
2833   }
2834   if (on_call_destruction_complete_ != nullptr) {
2835     subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
2836     on_call_destruction_complete_ = nullptr;
2837   }
2838   if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2839     PendingBatchesFail(error, YieldCallCombiner);
2840   } else {
2841     PendingBatchesResume();
2842   }
2843 }
2844 
2845 // A class to handle the call combiner cancellation callback for a
2846 // queued pick.
2847 // TODO(roth): When we implement hedging support, we won't be able to
2848 // register a call combiner cancellation closure for each LB pick,
2849 // because there may be multiple LB picks happening in parallel.
2850 // Instead, we will probably need to maintain a list in the CallData
2851 // object of pending LB picks to be cancelled when the closure runs.
2852 class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
2853  public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)2854   explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
2855       : lb_call_(std::move(lb_call)) {
2856     GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
2857     GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
2858     lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
2859   }
2860 
2861  private:
CancelLocked(void * arg,grpc_error_handle error)2862   static void CancelLocked(void* arg, grpc_error_handle error) {
2863     auto* self = static_cast<LbQueuedCallCanceller*>(arg);
2864     auto* lb_call = self->lb_call_.get();
2865     auto* chand = lb_call->chand_;
2866     {
2867       MutexLock lock(&chand->data_plane_mu_);
2868       if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2869         gpr_log(GPR_INFO,
2870                 "chand=%p lb_call=%p: cancelling queued pick: "
2871                 "error=%s self=%p calld->pick_canceller=%p",
2872                 chand, lb_call, grpc_error_std_string(error).c_str(), self,
2873                 lb_call->lb_call_canceller_);
2874       }
2875       if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2876         // Remove pick from list of queued picks.
2877         lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
2878         // Fail pending batches on the call.
2879         lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
2880                                     YieldCallCombinerIfPendingBatchesFound);
2881       }
2882     }
2883     GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
2884     delete self;
2885   }
2886 
2887   RefCountedPtr<LoadBalancedCall> lb_call_;
2888   grpc_closure closure_;
2889 };
2890 
MaybeRemoveCallFromLbQueuedCallsLocked()2891 void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
2892   if (!queued_pending_lb_pick_) return;
2893   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2894     gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2895             chand_, this);
2896   }
2897   chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
2898   queued_pending_lb_pick_ = false;
2899   // Lame the call combiner canceller.
2900   lb_call_canceller_ = nullptr;
2901 }
2902 
MaybeAddCallToLbQueuedCallsLocked()2903 void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
2904   if (queued_pending_lb_pick_) return;
2905   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2906     gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2907             chand_, this);
2908   }
2909   queued_pending_lb_pick_ = true;
2910   queued_call_.lb_call = this;
2911   chand_->AddLbQueuedCall(&queued_call_, pollent_);
2912   // Register call combiner cancellation callback.
2913   lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
2914 }
2915 
AsyncPickDone(grpc_error_handle error)2916 void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) {
2917   GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
2918   ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
2919 }
2920 
PickDone(void * arg,grpc_error_handle error)2921 void ClientChannel::LoadBalancedCall::PickDone(void* arg,
2922                                                grpc_error_handle error) {
2923   auto* self = static_cast<LoadBalancedCall*>(arg);
2924   if (error != GRPC_ERROR_NONE) {
2925     if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2926       gpr_log(GPR_INFO,
2927               "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2928               self->chand_, self, grpc_error_std_string(error).c_str());
2929     }
2930     self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
2931     return;
2932   }
2933   self->CreateSubchannelCall();
2934 }
2935 
2936 namespace {
2937 
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)2938 const char* PickResultTypeName(
2939     LoadBalancingPolicy::PickResult::ResultType type) {
2940   switch (type) {
2941     case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
2942       return "COMPLETE";
2943     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
2944       return "QUEUE";
2945     case LoadBalancingPolicy::PickResult::PICK_FAILED:
2946       return "FAILED";
2947   }
2948   GPR_UNREACHABLE_CODE(return "UNKNOWN");
2949 }
2950 
2951 }  // namespace
2952 
PickSubchannel(void * arg,grpc_error_handle error)2953 void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg,
2954                                                      grpc_error_handle error) {
2955   auto* self = static_cast<LoadBalancedCall*>(arg);
2956   bool pick_complete;
2957   {
2958     MutexLock lock(&self->chand_->data_plane_mu_);
2959     pick_complete = self->PickSubchannelLocked(&error);
2960   }
2961   if (pick_complete) {
2962     PickDone(self, error);
2963     GRPC_ERROR_UNREF(error);
2964   }
2965 }
2966 
PickSubchannelLocked(grpc_error_handle * error)2967 bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
2968     grpc_error_handle* error) {
2969   GPR_ASSERT(connected_subchannel_ == nullptr);
2970   GPR_ASSERT(subchannel_call_ == nullptr);
2971   // Grab initial metadata.
2972   auto& send_initial_metadata =
2973       pending_batches_[0]->payload->send_initial_metadata;
2974   grpc_metadata_batch* initial_metadata_batch =
2975       send_initial_metadata.send_initial_metadata;
2976   const uint32_t send_initial_metadata_flags =
2977       send_initial_metadata.send_initial_metadata_flags;
2978   // Perform LB pick.
2979   LoadBalancingPolicy::PickArgs pick_args;
2980   pick_args.path = StringViewFromSlice(path_);
2981   LbCallState lb_call_state(this);
2982   pick_args.call_state = &lb_call_state;
2983   Metadata initial_metadata(this, initial_metadata_batch);
2984   pick_args.initial_metadata = &initial_metadata;
2985   auto result = chand_->picker_->Pick(pick_args);
2986   if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2987     gpr_log(
2988         GPR_INFO,
2989         "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
2990         chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
2991         grpc_error_std_string(result.error).c_str());
2992   }
2993   switch (result.type) {
2994     case LoadBalancingPolicy::PickResult::PICK_FAILED: {
2995       // If we're shutting down, fail all RPCs.
2996       grpc_error_handle disconnect_error = chand_->disconnect_error();
2997       if (disconnect_error != GRPC_ERROR_NONE) {
2998         GRPC_ERROR_UNREF(result.error);
2999         MaybeRemoveCallFromLbQueuedCallsLocked();
3000         *error = GRPC_ERROR_REF(disconnect_error);
3001         return true;
3002       }
3003       // If wait_for_ready is false, then the error indicates the RPC
3004       // attempt's final status.
3005       if ((send_initial_metadata_flags &
3006            GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3007         grpc_error_handle new_error =
3008             GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3009                 "Failed to pick subchannel", &result.error, 1);
3010         GRPC_ERROR_UNREF(result.error);
3011         *error = new_error;
3012         MaybeRemoveCallFromLbQueuedCallsLocked();
3013         return true;
3014       }
3015       // If wait_for_ready is true, then queue to retry when we get a new
3016       // picker.
3017       GRPC_ERROR_UNREF(result.error);
3018     }
3019     // Fallthrough
3020     case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3021       MaybeAddCallToLbQueuedCallsLocked();
3022       return false;
3023     default:  // PICK_COMPLETE
3024       MaybeRemoveCallFromLbQueuedCallsLocked();
3025       // Handle drops.
3026       if (GPR_UNLIKELY(result.subchannel == nullptr)) {
3027         result.error = grpc_error_set_int(
3028             grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3029                                    "Call dropped by load balancing policy"),
3030                                GRPC_ERROR_INT_GRPC_STATUS,
3031                                GRPC_STATUS_UNAVAILABLE),
3032             GRPC_ERROR_INT_LB_POLICY_DROP, 1);
3033       } else {
3034         // Grab a ref to the connected subchannel while we're still
3035         // holding the data plane mutex.
3036         connected_subchannel_ =
3037             chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
3038         GPR_ASSERT(connected_subchannel_ != nullptr);
3039       }
3040       lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
3041       *error = result.error;
3042       return true;
3043   }
3044 }
3045 
3046 }  // namespace grpc_core
3047