1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26
27 #include <set>
28
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_filter.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/sync.h"
65 #include "src/core/lib/iomgr/iomgr.h"
66 #include "src/core/lib/iomgr/polling_entity.h"
67 #include "src/core/lib/iomgr/work_serializer.h"
68 #include "src/core/lib/profiling/timers.h"
69 #include "src/core/lib/slice/slice_internal.h"
70 #include "src/core/lib/slice/slice_string_helpers.h"
71 #include "src/core/lib/surface/channel.h"
72 #include "src/core/lib/transport/connectivity_state.h"
73 #include "src/core/lib/transport/error_utils.h"
74 #include "src/core/lib/transport/metadata.h"
75 #include "src/core/lib/transport/metadata_batch.h"
76 #include "src/core/lib/transport/static_metadata.h"
77 #include "src/core/lib/transport/status_metadata.h"
78
79 //
80 // Client channel filter
81 //
82
83 namespace grpc_core {
84
85 using internal::ClientChannelGlobalParsedConfig;
86 using internal::ClientChannelMethodParsedConfig;
87 using internal::ClientChannelServiceConfigParser;
88
89 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
90 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
91
92 //
93 // ClientChannel::CallData definition
94 //
95
96 class ClientChannel::CallData {
97 public:
98 static grpc_error_handle Init(grpc_call_element* elem,
99 const grpc_call_element_args* args);
100 static void Destroy(grpc_call_element* elem,
101 const grpc_call_final_info* final_info,
102 grpc_closure* then_schedule_closure);
103 static void StartTransportStreamOpBatch(
104 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
105 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
106
107 // Invoked by channel for queued calls when name resolution is completed.
108 static void CheckResolution(void* arg, grpc_error_handle error);
109 // Helper function for applying the service config to a call while
110 // holding ClientChannel::resolution_mu_.
111 // Returns true if the service config has been applied to the call, in which
112 // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
113 // with the returned error.
114 bool CheckResolutionLocked(grpc_call_element* elem, grpc_error_handle* error)
115 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
116 // Schedules a callback to continue processing the call once
117 // resolution is complete. The callback will not run until after this
118 // method returns.
119 void AsyncResolutionDone(grpc_call_element* elem, grpc_error_handle error);
120
121 private:
122 class ResolverQueuedCallCanceller;
123
124 CallData(grpc_call_element* elem, const ClientChannel& chand,
125 const grpc_call_element_args& args);
126 ~CallData();
127
128 // Returns the index into pending_batches_ to be used for batch.
129 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
130 void PendingBatchesAdd(grpc_call_element* elem,
131 grpc_transport_stream_op_batch* batch);
132 static void FailPendingBatchInCallCombiner(void* arg,
133 grpc_error_handle error);
134 // A predicate type and some useful implementations for PendingBatchesFail().
135 typedef bool (*YieldCallCombinerPredicate)(
136 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)137 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
138 return true;
139 }
NoYieldCallCombiner(const CallCombinerClosureList &)140 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
141 return false;
142 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)143 static bool YieldCallCombinerIfPendingBatchesFound(
144 const CallCombinerClosureList& closures) {
145 return closures.size() > 0;
146 }
147 // Fails all pending batches.
148 // If yield_call_combiner_predicate returns true, assumes responsibility for
149 // yielding the call combiner.
150 void PendingBatchesFail(
151 grpc_call_element* elem, grpc_error_handle error,
152 YieldCallCombinerPredicate yield_call_combiner_predicate);
153 static void ResumePendingBatchInCallCombiner(void* arg,
154 grpc_error_handle ignored);
155 // Resumes all pending batches on lb_call_.
156 void PendingBatchesResume(grpc_call_element* elem);
157
158 // Applies service config to the call. Must be invoked once we know
159 // that the resolver has returned results to the channel.
160 // If an error is returned, the error indicates the status with which
161 // the call should be failed.
162 grpc_error_handle ApplyServiceConfigToCallLocked(
163 grpc_call_element* elem, grpc_metadata_batch* initial_metadata)
164 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
165 // Invoked when the resolver result is applied to the caller, on both
166 // success or failure.
167 static void ResolutionDone(void* arg, grpc_error_handle error);
168 // Removes the call (if present) from the channel's list of calls queued
169 // for name resolution.
170 void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem)
171 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
172 // Adds the call (if not already present) to the channel's list of
173 // calls queued for name resolution.
174 void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem)
175 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::resolution_mu_);
176
177 static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
178 void* arg, grpc_error_handle error);
179 void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
180 grpc_transport_stream_op_batch* batch);
181
182 void CreateDynamicCall(grpc_call_element* elem);
183
184 // State for handling deadlines.
185 // The code in deadline_filter.c requires this to be the first field.
186 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
187 // and this struct both independently store pointers to the call stack
188 // and call combiner. If/when we have time, find a way to avoid this
189 // without breaking the grpc_deadline_state abstraction.
190 grpc_deadline_state deadline_state_;
191
192 grpc_slice path_; // Request path.
193 gpr_cycle_counter call_start_time_;
194 grpc_millis deadline_;
195 Arena* arena_;
196 grpc_call_stack* owning_call_;
197 CallCombiner* call_combiner_;
198 grpc_call_context_element* call_context_;
199
200 grpc_polling_entity* pollent_ = nullptr;
201
202 grpc_closure pick_closure_;
203
204 // Accessed while holding ClientChannel::resolution_mu_.
205 bool service_config_applied_ ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) =
206 false;
207 bool queued_pending_resolver_result_
208 ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = false;
209 ClientChannel::ResolverQueuedCall resolver_queued_call_
210 ABSL_GUARDED_BY(&ClientChannel::resolution_mu_);
211 ResolverQueuedCallCanceller* resolver_call_canceller_
212 ABSL_GUARDED_BY(&ClientChannel::resolution_mu_) = nullptr;
213
214 std::function<void()> on_call_committed_;
215
216 grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
217 grpc_closure recv_initial_metadata_ready_;
218
219 RefCountedPtr<DynamicFilters> dynamic_filters_;
220 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
221
222 // Batches are added to this list when received from above.
223 // They are removed when we are done handling the batch (i.e., when
224 // either we have invoked all of the batch's callbacks or we have
225 // passed the batch down to the LB call and are not intercepting any of
226 // its callbacks).
227 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
228
229 // Set when we get a cancel_stream op.
230 grpc_error_handle cancel_error_ = GRPC_ERROR_NONE;
231 };
232
233 //
234 // Filter vtable
235 //
236
237 const grpc_channel_filter ClientChannel::kFilterVtable = {
238 ClientChannel::CallData::StartTransportStreamOpBatch,
239 ClientChannel::StartTransportOp,
240 sizeof(ClientChannel::CallData),
241 ClientChannel::CallData::Init,
242 ClientChannel::CallData::SetPollent,
243 ClientChannel::CallData::Destroy,
244 sizeof(ClientChannel),
245 ClientChannel::Init,
246 ClientChannel::Destroy,
247 ClientChannel::GetChannelInfo,
248 "client-channel",
249 };
250
251 //
252 // dynamic termination filter
253 //
254
255 namespace {
256
257 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL.
ClientChannelArgCopy(void * p)258 void* ClientChannelArgCopy(void* p) { return p; }
ClientChannelArgDestroy(void *)259 void ClientChannelArgDestroy(void* /*p*/) {}
ClientChannelArgCmp(void * p,void * q)260 int ClientChannelArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
261 const grpc_arg_pointer_vtable kClientChannelArgPointerVtable = {
262 ClientChannelArgCopy, ClientChannelArgDestroy, ClientChannelArgCmp};
263
264 // Channel arg pointer vtable for GRPC_ARG_SERVICE_CONFIG_OBJ.
ServiceConfigObjArgCopy(void * p)265 void* ServiceConfigObjArgCopy(void* p) {
266 auto* service_config = static_cast<ServiceConfig*>(p);
267 service_config->Ref().release();
268 return p;
269 }
ServiceConfigObjArgDestroy(void * p)270 void ServiceConfigObjArgDestroy(void* p) {
271 auto* service_config = static_cast<ServiceConfig*>(p);
272 service_config->Unref();
273 }
ServiceConfigObjArgCmp(void * p,void * q)274 int ServiceConfigObjArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
275 const grpc_arg_pointer_vtable kServiceConfigObjArgPointerVtable = {
276 ServiceConfigObjArgCopy, ServiceConfigObjArgDestroy,
277 ServiceConfigObjArgCmp};
278
279 class DynamicTerminationFilter {
280 public:
281 class CallData;
282
283 static const grpc_channel_filter kFilterVtable;
284
Init(grpc_channel_element * elem,grpc_channel_element_args * args)285 static grpc_error_handle Init(grpc_channel_element* elem,
286 grpc_channel_element_args* args) {
287 GPR_ASSERT(args->is_last);
288 GPR_ASSERT(elem->filter == &kFilterVtable);
289 new (elem->channel_data) DynamicTerminationFilter(args->channel_args);
290 return GRPC_ERROR_NONE;
291 }
292
Destroy(grpc_channel_element * elem)293 static void Destroy(grpc_channel_element* elem) {
294 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
295 chand->~DynamicTerminationFilter();
296 }
297
298 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)299 static void StartTransportOp(grpc_channel_element* /*elem*/,
300 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)301 static void GetChannelInfo(grpc_channel_element* /*elem*/,
302 const grpc_channel_info* /*info*/) {}
303
304 private:
DynamicTerminationFilter(const grpc_channel_args * args)305 explicit DynamicTerminationFilter(const grpc_channel_args* args)
306 : chand_(grpc_channel_args_find_pointer<ClientChannel>(
307 args, GRPC_ARG_CLIENT_CHANNEL)) {}
308
309 ClientChannel* chand_;
310 };
311
312 class DynamicTerminationFilter::CallData {
313 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)314 static grpc_error_handle Init(grpc_call_element* elem,
315 const grpc_call_element_args* args) {
316 new (elem->call_data) CallData(*args);
317 return GRPC_ERROR_NONE;
318 }
319
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)320 static void Destroy(grpc_call_element* elem,
321 const grpc_call_final_info* /*final_info*/,
322 grpc_closure* then_schedule_closure) {
323 auto* calld = static_cast<CallData*>(elem->call_data);
324 RefCountedPtr<SubchannelCall> subchannel_call;
325 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
326 subchannel_call = calld->lb_call_->subchannel_call();
327 }
328 calld->~CallData();
329 if (GPR_LIKELY(subchannel_call != nullptr)) {
330 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
331 } else {
332 // TODO(yashkt) : This can potentially be a Closure::Run
333 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
334 }
335 }
336
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)337 static void StartTransportStreamOpBatch(
338 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
339 auto* calld = static_cast<CallData*>(elem->call_data);
340 calld->lb_call_->StartTransportStreamOpBatch(batch);
341 }
342
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)343 static void SetPollent(grpc_call_element* elem,
344 grpc_polling_entity* pollent) {
345 auto* calld = static_cast<CallData*>(elem->call_data);
346 auto* chand = static_cast<DynamicTerminationFilter*>(elem->channel_data);
347 ClientChannel* client_channel = chand->chand_;
348 grpc_call_element_args args = {
349 calld->owning_call_, nullptr,
350 calld->call_context_, calld->path_,
351 calld->call_start_time_, calld->deadline_,
352 calld->arena_, calld->call_combiner_};
353 calld->lb_call_ =
354 client_channel->CreateLoadBalancedCall(args, pollent, nullptr);
355 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
356 gpr_log(GPR_INFO,
357 "chand=%p dynamic_termination_calld=%p: create lb_call=%p", chand,
358 client_channel, calld->lb_call_.get());
359 }
360 }
361
362 private:
CallData(const grpc_call_element_args & args)363 explicit CallData(const grpc_call_element_args& args)
364 : path_(grpc_slice_ref_internal(args.path)),
365 call_start_time_(args.start_time),
366 deadline_(args.deadline),
367 arena_(args.arena),
368 owning_call_(args.call_stack),
369 call_combiner_(args.call_combiner),
370 call_context_(args.context) {}
371
~CallData()372 ~CallData() { grpc_slice_unref_internal(path_); }
373
374 grpc_slice path_; // Request path.
375 gpr_cycle_counter call_start_time_;
376 grpc_millis deadline_;
377 Arena* arena_;
378 grpc_call_stack* owning_call_;
379 CallCombiner* call_combiner_;
380 grpc_call_context_element* call_context_;
381
382 RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call_;
383 };
384
385 const grpc_channel_filter DynamicTerminationFilter::kFilterVtable = {
386 DynamicTerminationFilter::CallData::StartTransportStreamOpBatch,
387 DynamicTerminationFilter::StartTransportOp,
388 sizeof(DynamicTerminationFilter::CallData),
389 DynamicTerminationFilter::CallData::Init,
390 DynamicTerminationFilter::CallData::SetPollent,
391 DynamicTerminationFilter::CallData::Destroy,
392 sizeof(DynamicTerminationFilter),
393 DynamicTerminationFilter::Init,
394 DynamicTerminationFilter::Destroy,
395 DynamicTerminationFilter::GetChannelInfo,
396 "dynamic_filter_termination",
397 };
398
399 } // namespace
400
401 //
402 // ClientChannel::ResolverResultHandler
403 //
404
405 class ClientChannel::ResolverResultHandler : public Resolver::ResultHandler {
406 public:
ResolverResultHandler(ClientChannel * chand)407 explicit ResolverResultHandler(ClientChannel* chand) : chand_(chand) {
408 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
409 }
410
~ResolverResultHandler()411 ~ResolverResultHandler() override {
412 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
413 gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
414 }
415 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
416 }
417
ReturnResult(Resolver::Result result)418 void ReturnResult(Resolver::Result result) override
419 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
420 chand_->OnResolverResultChangedLocked(std::move(result));
421 }
422
ReturnError(grpc_error_handle error)423 void ReturnError(grpc_error_handle error) override
424 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
425 chand_->OnResolverErrorLocked(error);
426 }
427
428 private:
429 ClientChannel* chand_;
430 };
431
432 //
433 // ClientChannel::SubchannelWrapper
434 //
435
436 // This class is a wrapper for Subchannel that hides details of the
437 // channel's implementation (such as the health check service name and
438 // connected subchannel) from the LB policy API.
439 //
440 // Note that no synchronization is needed here, because even if the
441 // underlying subchannel is shared between channels, this wrapper will only
442 // be used within one channel, so it will always be synchronized by the
443 // control plane work_serializer.
444 class ClientChannel::SubchannelWrapper : public SubchannelInterface {
445 public:
SubchannelWrapper(ClientChannel * chand,RefCountedPtr<Subchannel> subchannel,absl::optional<std::string> health_check_service_name)446 SubchannelWrapper(ClientChannel* chand, RefCountedPtr<Subchannel> subchannel,
447 absl::optional<std::string> health_check_service_name)
448 : SubchannelInterface(
449 GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
450 ? "SubchannelWrapper"
451 : nullptr),
452 chand_(chand),
453 subchannel_(std::move(subchannel)),
454 health_check_service_name_(std::move(health_check_service_name)) {
455 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
456 gpr_log(GPR_INFO,
457 "chand=%p: creating subchannel wrapper %p for subchannel %p",
458 chand, this, subchannel_.get());
459 }
460 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
461 auto* subchannel_node = subchannel_->channelz_node();
462 if (subchannel_node != nullptr) {
463 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
464 if (it == chand_->subchannel_refcount_map_.end()) {
465 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
466 it = chand_->subchannel_refcount_map_.emplace(subchannel_.get(), 0)
467 .first;
468 }
469 ++it->second;
470 }
471 chand_->subchannel_wrappers_.insert(this);
472 }
473
~SubchannelWrapper()474 ~SubchannelWrapper() override {
475 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
476 gpr_log(GPR_INFO,
477 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
478 chand_, this, subchannel_.get());
479 }
480 chand_->subchannel_wrappers_.erase(this);
481 auto* subchannel_node = subchannel_->channelz_node();
482 if (subchannel_node != nullptr) {
483 auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
484 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
485 --it->second;
486 if (it->second == 0) {
487 chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
488 chand_->subchannel_refcount_map_.erase(it);
489 }
490 }
491 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
492 }
493
CheckConnectivityState()494 grpc_connectivity_state CheckConnectivityState() override
495 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
496 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
497 grpc_connectivity_state connectivity_state =
498 subchannel_->CheckConnectivityState(health_check_service_name_,
499 &connected_subchannel);
500 MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
501 return connectivity_state;
502 }
503
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)504 void WatchConnectivityState(
505 grpc_connectivity_state initial_state,
506 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
507 auto& watcher_wrapper = watcher_map_[watcher.get()];
508 GPR_ASSERT(watcher_wrapper == nullptr);
509 watcher_wrapper = new WatcherWrapper(std::move(watcher),
510 Ref(DEBUG_LOCATION, "WatcherWrapper"),
511 initial_state);
512 subchannel_->WatchConnectivityState(
513 initial_state, health_check_service_name_,
514 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
515 watcher_wrapper));
516 }
517
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)518 void CancelConnectivityStateWatch(
519 ConnectivityStateWatcherInterface* watcher) override {
520 auto it = watcher_map_.find(watcher);
521 GPR_ASSERT(it != watcher_map_.end());
522 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
523 it->second);
524 watcher_map_.erase(it);
525 }
526
AttemptToConnect()527 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
528
ResetBackoff()529 void ResetBackoff() override { subchannel_->ResetBackoff(); }
530
channel_args()531 const grpc_channel_args* channel_args() override {
532 return subchannel_->channel_args();
533 }
534
ThrottleKeepaliveTime(int new_keepalive_time)535 void ThrottleKeepaliveTime(int new_keepalive_time) {
536 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
537 }
538
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)539 void UpdateHealthCheckServiceName(
540 absl::optional<std::string> health_check_service_name) {
541 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
542 gpr_log(GPR_INFO,
543 "chand=%p: subchannel wrapper %p: updating health check service "
544 "name from \"%s\" to \"%s\"",
545 chand_, this, health_check_service_name_->c_str(),
546 health_check_service_name->c_str());
547 }
548 for (auto& p : watcher_map_) {
549 WatcherWrapper*& watcher_wrapper = p.second;
550 // Cancel the current watcher and create a new one using the new
551 // health check service name.
552 // TODO(roth): If there is not already an existing health watch
553 // call for the new name, then the watcher will initially report
554 // state CONNECTING. If the LB policy is currently reporting
555 // state READY, this may cause it to switch to CONNECTING before
556 // switching back to READY. This could cause a small delay for
557 // RPCs being started on the channel. If/when this becomes a
558 // problem, we may be able to handle it by waiting for the new
559 // watcher to report READY before we use it to replace the old one.
560 WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
561 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
562 watcher_wrapper);
563 watcher_wrapper = replacement;
564 subchannel_->WatchConnectivityState(
565 replacement->last_seen_state(), health_check_service_name,
566 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
567 replacement));
568 }
569 // Save the new health check service name.
570 health_check_service_name_ = std::move(health_check_service_name);
571 }
572
573 // Caller must be holding the control-plane work_serializer.
connected_subchannel() const574 ConnectedSubchannel* connected_subchannel() const
575 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
576 return connected_subchannel_.get();
577 }
578
579 // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const580 ConnectedSubchannel* connected_subchannel_in_data_plane() const
581 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
582 return connected_subchannel_in_data_plane_.get();
583 }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)584 void set_connected_subchannel_in_data_plane(
585 RefCountedPtr<ConnectedSubchannel> connected_subchannel)
586 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_) {
587 connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
588 }
589
590 private:
591 // Subchannel and SubchannelInterface have different interfaces for
592 // their respective ConnectivityStateWatcherInterface classes.
593 // The one in Subchannel updates the ConnectedSubchannel along with
594 // the state, whereas the one in SubchannelInterface does not expose
595 // the ConnectedSubchannel.
596 //
597 // This wrapper provides a bridge between the two. It implements
598 // Subchannel::ConnectivityStateWatcherInterface and wraps
599 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
600 // that was passed in by the LB policy. We pass an instance of this
601 // class to the underlying Subchannel, and when we get updates from
602 // the subchannel, we pass those on to the wrapped watcher to return
603 // the update to the LB policy. This allows us to set the connected
604 // subchannel before passing the result back to the LB policy.
605 class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
606 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)607 WatcherWrapper(
608 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
609 watcher,
610 RefCountedPtr<SubchannelWrapper> parent,
611 grpc_connectivity_state initial_state)
612 : watcher_(std::move(watcher)),
613 parent_(std::move(parent)),
614 last_seen_state_(initial_state) {}
615
~WatcherWrapper()616 ~WatcherWrapper() override {
617 auto* parent = parent_.release(); // ref owned by lambda
618 parent->chand_->work_serializer_->Run(
619 [parent]()
620 ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
621 parent->Unref(DEBUG_LOCATION, "WatcherWrapper");
622 },
623 DEBUG_LOCATION);
624 }
625
OnConnectivityStateChange()626 void OnConnectivityStateChange() override {
627 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
628 gpr_log(GPR_INFO,
629 "chand=%p: connectivity change for subchannel wrapper %p "
630 "subchannel %p; hopping into work_serializer",
631 parent_->chand_, parent_.get(), parent_->subchannel_.get());
632 }
633 Ref().release(); // ref owned by lambda
634 parent_->chand_->work_serializer_->Run(
635 [this]()
636 ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
637 ApplyUpdateInControlPlaneWorkSerializer();
638 Unref();
639 },
640 DEBUG_LOCATION);
641 }
642
interested_parties()643 grpc_pollset_set* interested_parties() override {
644 SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
645 watcher_.get();
646 if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
647 return watcher->interested_parties();
648 }
649
MakeReplacement()650 WatcherWrapper* MakeReplacement() {
651 auto* replacement =
652 new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
653 replacement_ = replacement;
654 return replacement;
655 }
656
last_seen_state() const657 grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
658
659 private:
ApplyUpdateInControlPlaneWorkSerializer()660 void ApplyUpdateInControlPlaneWorkSerializer()
661 ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->chand_->work_serializer_) {
662 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
663 gpr_log(GPR_INFO,
664 "chand=%p: processing connectivity change in work serializer "
665 "for subchannel wrapper %p subchannel %p "
666 "watcher=%p",
667 parent_->chand_, parent_.get(), parent_->subchannel_.get(),
668 watcher_.get());
669 }
670 ConnectivityStateChange state_change = PopConnectivityStateChange();
671 absl::optional<absl::Cord> keepalive_throttling =
672 state_change.status.GetPayload(kKeepaliveThrottlingKey);
673 if (keepalive_throttling.has_value()) {
674 int new_keepalive_time = -1;
675 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
676 &new_keepalive_time)) {
677 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
678 parent_->chand_->keepalive_time_ = new_keepalive_time;
679 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
680 gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
681 parent_->chand_, parent_->chand_->keepalive_time_);
682 }
683 // Propagate the new keepalive time to all subchannels. This is so
684 // that new transports created by any subchannel (and not just the
685 // subchannel that received the GOAWAY), use the new keepalive time.
686 for (auto* subchannel_wrapper :
687 parent_->chand_->subchannel_wrappers_) {
688 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
689 }
690 }
691 } else {
692 gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
693 parent_->chand_,
694 std::string(keepalive_throttling.value()).c_str());
695 }
696 }
697 // Ignore update if the parent WatcherWrapper has been replaced
698 // since this callback was scheduled.
699 if (watcher_ != nullptr) {
700 last_seen_state_ = state_change.state;
701 parent_->MaybeUpdateConnectedSubchannel(
702 std::move(state_change.connected_subchannel));
703 watcher_->OnConnectivityStateChange(state_change.state);
704 }
705 }
706
707 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
708 watcher_;
709 RefCountedPtr<SubchannelWrapper> parent_;
710 grpc_connectivity_state last_seen_state_;
711 WatcherWrapper* replacement_ = nullptr;
712 };
713
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)714 void MaybeUpdateConnectedSubchannel(
715 RefCountedPtr<ConnectedSubchannel> connected_subchannel)
716 ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::work_serializer_) {
717 // Update the connected subchannel only if the channel is not shutting
718 // down. This is because once the channel is shutting down, we
719 // ignore picker updates from the LB policy, which means that
720 // UpdateStateAndPickerLocked() will never process the entries
721 // in chand_->pending_subchannel_updates_. So we don't want to add
722 // entries there that will never be processed, since that would
723 // leave dangling refs to the channel and prevent its destruction.
724 grpc_error_handle disconnect_error = chand_->disconnect_error();
725 if (disconnect_error != GRPC_ERROR_NONE) return;
726 // Not shutting down, so do the update.
727 if (connected_subchannel_ != connected_subchannel) {
728 connected_subchannel_ = std::move(connected_subchannel);
729 // Record the new connected subchannel so that it can be updated
730 // in the data plane mutex the next time the picker is updated.
731 chand_->pending_subchannel_updates_[Ref(
732 DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
733 }
734 }
735
736 ClientChannel* chand_;
737 RefCountedPtr<Subchannel> subchannel_;
738 absl::optional<std::string> health_check_service_name_;
739 // Maps from the address of the watcher passed to us by the LB policy
740 // to the address of the WrapperWatcher that we passed to the underlying
741 // subchannel. This is needed so that when the LB policy calls
742 // CancelConnectivityStateWatch() with its watcher, we know the
743 // corresponding WrapperWatcher to cancel on the underlying subchannel.
744 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
745 // To be accessed only in the control plane work_serializer.
746 RefCountedPtr<ConnectedSubchannel> connected_subchannel_
747 ABSL_GUARDED_BY(&ClientChannel::work_serializer_);
748 // To be accessed only in the data plane mutex.
749 RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_
750 ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_);
751 };
752
753 //
754 // ClientChannel::ExternalConnectivityWatcher
755 //
756
ExternalConnectivityWatcher(ClientChannel * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)757 ClientChannel::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
758 ClientChannel* chand, grpc_polling_entity pollent,
759 grpc_connectivity_state* state, grpc_closure* on_complete,
760 grpc_closure* watcher_timer_init)
761 : chand_(chand),
762 pollent_(pollent),
763 initial_state_(*state),
764 state_(state),
765 on_complete_(on_complete),
766 watcher_timer_init_(watcher_timer_init) {
767 grpc_polling_entity_add_to_pollset_set(&pollent_,
768 chand_->interested_parties_);
769 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
770 {
771 MutexLock lock(&chand_->external_watchers_mu_);
772 // Will be deleted when the watch is complete.
773 GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
774 // Store a ref to the watcher in the external_watchers_ map.
775 chand->external_watchers_[on_complete] =
776 Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
777 }
778 // Pass the ref from creating the object to Start().
779 chand_->work_serializer_->Run(
780 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
781 // The ref is passed to AddWatcherLocked().
782 AddWatcherLocked();
783 },
784 DEBUG_LOCATION);
785 }
786
~ExternalConnectivityWatcher()787 ClientChannel::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
788 grpc_polling_entity_del_from_pollset_set(&pollent_,
789 chand_->interested_parties_);
790 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
791 "ExternalConnectivityWatcher");
792 }
793
794 void ClientChannel::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ClientChannel * chand,grpc_closure * on_complete,bool cancel)795 RemoveWatcherFromExternalWatchersMap(ClientChannel* chand,
796 grpc_closure* on_complete,
797 bool cancel) {
798 RefCountedPtr<ExternalConnectivityWatcher> watcher;
799 {
800 MutexLock lock(&chand->external_watchers_mu_);
801 auto it = chand->external_watchers_.find(on_complete);
802 if (it != chand->external_watchers_.end()) {
803 watcher = std::move(it->second);
804 chand->external_watchers_.erase(it);
805 }
806 }
807 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
808 // the mutex before calling it.
809 if (watcher != nullptr && cancel) watcher->Cancel();
810 }
811
Notify(grpc_connectivity_state state,const absl::Status &)812 void ClientChannel::ExternalConnectivityWatcher::Notify(
813 grpc_connectivity_state state, const absl::Status& /* status */) {
814 bool done = false;
815 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
816 MemoryOrder::RELAXED)) {
817 return; // Already done.
818 }
819 // Remove external watcher.
820 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
821 chand_, on_complete_, /*cancel=*/false);
822 // Report new state to the user.
823 *state_ = state;
824 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
825 // Hop back into the work_serializer to clean up.
826 // Not needed in state SHUTDOWN, because the tracker will
827 // automatically remove all watchers in that case.
828 if (state != GRPC_CHANNEL_SHUTDOWN) {
829 chand_->work_serializer_->Run(
830 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
831 RemoveWatcherLocked();
832 },
833 DEBUG_LOCATION);
834 }
835 }
836
Cancel()837 void ClientChannel::ExternalConnectivityWatcher::Cancel() {
838 bool done = false;
839 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
840 MemoryOrder::RELAXED)) {
841 return; // Already done.
842 }
843 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
844 // Hop back into the work_serializer to clean up.
845 chand_->work_serializer_->Run(
846 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
847 RemoveWatcherLocked();
848 },
849 DEBUG_LOCATION);
850 }
851
AddWatcherLocked()852 void ClientChannel::ExternalConnectivityWatcher::AddWatcherLocked() {
853 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
854 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
855 chand_->state_tracker_.AddWatcher(
856 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
857 }
858
RemoveWatcherLocked()859 void ClientChannel::ExternalConnectivityWatcher::RemoveWatcherLocked() {
860 chand_->state_tracker_.RemoveWatcher(this);
861 }
862
863 //
864 // ClientChannel::ConnectivityWatcherAdder
865 //
866
867 class ClientChannel::ConnectivityWatcherAdder {
868 public:
ConnectivityWatcherAdder(ClientChannel * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)869 ConnectivityWatcherAdder(
870 ClientChannel* chand, grpc_connectivity_state initial_state,
871 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
872 : chand_(chand),
873 initial_state_(initial_state),
874 watcher_(std::move(watcher)) {
875 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
876 chand_->work_serializer_->Run(
877 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
878 AddWatcherLocked();
879 },
880 DEBUG_LOCATION);
881 }
882
883 private:
AddWatcherLocked()884 void AddWatcherLocked()
885 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
886 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
887 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
888 delete this;
889 }
890
891 ClientChannel* chand_;
892 grpc_connectivity_state initial_state_;
893 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
894 };
895
896 //
897 // ClientChannel::ConnectivityWatcherRemover
898 //
899
900 class ClientChannel::ConnectivityWatcherRemover {
901 public:
ConnectivityWatcherRemover(ClientChannel * chand,AsyncConnectivityStateWatcherInterface * watcher)902 ConnectivityWatcherRemover(ClientChannel* chand,
903 AsyncConnectivityStateWatcherInterface* watcher)
904 : chand_(chand), watcher_(watcher) {
905 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
906 chand_->work_serializer_->Run(
907 [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
908 RemoveWatcherLocked();
909 },
910 DEBUG_LOCATION);
911 }
912
913 private:
RemoveWatcherLocked()914 void RemoveWatcherLocked()
915 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
916 chand_->state_tracker_.RemoveWatcher(watcher_);
917 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
918 "ConnectivityWatcherRemover");
919 delete this;
920 }
921
922 ClientChannel* chand_;
923 AsyncConnectivityStateWatcherInterface* watcher_;
924 };
925
926 //
927 // ClientChannel::ClientChannelControlHelper
928 //
929
930 class ClientChannel::ClientChannelControlHelper
931 : public LoadBalancingPolicy::ChannelControlHelper {
932 public:
ClientChannelControlHelper(ClientChannel * chand)933 explicit ClientChannelControlHelper(ClientChannel* chand) : chand_(chand) {
934 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
935 }
936
~ClientChannelControlHelper()937 ~ClientChannelControlHelper() override {
938 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
939 "ClientChannelControlHelper");
940 }
941
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)942 RefCountedPtr<SubchannelInterface> CreateSubchannel(
943 ServerAddress address, const grpc_channel_args& args) override
944 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
945 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
946 // Determine health check service name.
947 bool inhibit_health_checking = grpc_channel_args_find_bool(
948 &args, GRPC_ARG_INHIBIT_HEALTH_CHECKING, false);
949 absl::optional<std::string> health_check_service_name;
950 if (!inhibit_health_checking) {
951 health_check_service_name = chand_->health_check_service_name_;
952 }
953 // Remove channel args that should not affect subchannel uniqueness.
954 static const char* args_to_remove[] = {
955 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
956 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
957 };
958 // Add channel args needed for the subchannel.
959 absl::InlinedVector<grpc_arg, 3> args_to_add = {
960 Subchannel::CreateSubchannelAddressArg(&address.address()),
961 SubchannelPoolInterface::CreateChannelArg(
962 chand_->subchannel_pool_.get()),
963 };
964 if (address.args() != nullptr) {
965 for (size_t j = 0; j < address.args()->num_args; ++j) {
966 args_to_add.emplace_back(address.args()->args[j]);
967 }
968 }
969 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
970 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
971 args_to_add.data(), args_to_add.size());
972 gpr_free(args_to_add[0].value.string);
973 // Create subchannel.
974 RefCountedPtr<Subchannel> subchannel =
975 chand_->client_channel_factory_->CreateSubchannel(new_args);
976 grpc_channel_args_destroy(new_args);
977 if (subchannel == nullptr) return nullptr;
978 // Make sure the subchannel has updated keepalive time.
979 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
980 // Create and return wrapper for the subchannel.
981 return MakeRefCounted<SubchannelWrapper>(
982 chand_, std::move(subchannel), std::move(health_check_service_name));
983 }
984
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)985 void UpdateState(
986 grpc_connectivity_state state, const absl::Status& status,
987 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override
988 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
989 if (chand_->resolver_ == nullptr) return; // Shutting down.
990 grpc_error_handle disconnect_error = chand_->disconnect_error();
991 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
992 const char* extra = disconnect_error == GRPC_ERROR_NONE
993 ? ""
994 : " (ignoring -- channel shutting down)";
995 gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
996 chand_, ConnectivityStateName(state), status.ToString().c_str(),
997 picker.get(), extra);
998 }
999 // Do update only if not shutting down.
1000 if (disconnect_error == GRPC_ERROR_NONE) {
1001 chand_->UpdateStateAndPickerLocked(state, status, "helper",
1002 std::move(picker));
1003 }
1004 }
1005
RequestReresolution()1006 void RequestReresolution() override
1007 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
1008 if (chand_->resolver_ == nullptr) return; // Shutting down.
1009 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1010 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1011 }
1012 chand_->resolver_->RequestReresolutionLocked();
1013 }
1014
AddTraceEvent(TraceSeverity severity,absl::string_view message)1015 void AddTraceEvent(TraceSeverity severity, absl::string_view message) override
1016 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_) {
1017 if (chand_->resolver_ == nullptr) return; // Shutting down.
1018 if (chand_->channelz_node_ != nullptr) {
1019 chand_->channelz_node_->AddTraceEvent(
1020 ConvertSeverityEnum(severity),
1021 grpc_slice_from_copied_buffer(message.data(), message.size()));
1022 }
1023 }
1024
1025 private:
ConvertSeverityEnum(TraceSeverity severity)1026 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1027 TraceSeverity severity) {
1028 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1029 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1030 return channelz::ChannelTrace::Error;
1031 }
1032
1033 ClientChannel* chand_;
1034 };
1035
1036 //
1037 // ClientChannel implementation
1038 //
1039
GetFromChannel(grpc_channel * channel)1040 ClientChannel* ClientChannel::GetFromChannel(grpc_channel* channel) {
1041 grpc_channel_element* elem =
1042 grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
1043 if (elem->filter != &kFilterVtable) return nullptr;
1044 return static_cast<ClientChannel*>(elem->channel_data);
1045 }
1046
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1047 grpc_error_handle ClientChannel::Init(grpc_channel_element* elem,
1048 grpc_channel_element_args* args) {
1049 GPR_ASSERT(args->is_last);
1050 GPR_ASSERT(elem->filter == &kFilterVtable);
1051 grpc_error_handle error = GRPC_ERROR_NONE;
1052 new (elem->channel_data) ClientChannel(args, &error);
1053 return error;
1054 }
1055
Destroy(grpc_channel_element * elem)1056 void ClientChannel::Destroy(grpc_channel_element* elem) {
1057 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1058 chand->~ClientChannel();
1059 }
1060
1061 namespace {
1062
GetEnableRetries(const grpc_channel_args * args)1063 bool GetEnableRetries(const grpc_channel_args* args) {
1064 return grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES, false);
1065 }
1066
GetSubchannelPool(const grpc_channel_args * args)1067 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1068 const grpc_channel_args* args) {
1069 const bool use_local_subchannel_pool = grpc_channel_args_find_bool(
1070 args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, false);
1071 if (use_local_subchannel_pool) {
1072 return MakeRefCounted<LocalSubchannelPool>();
1073 }
1074 return GlobalSubchannelPool::instance();
1075 }
1076
GetChannelzNode(const grpc_channel_args * args)1077 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1078 return grpc_channel_args_find_pointer<channelz::ChannelNode>(
1079 args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1080 }
1081
1082 } // namespace
1083
ClientChannel(grpc_channel_element_args * args,grpc_error_handle * error)1084 ClientChannel::ClientChannel(grpc_channel_element_args* args,
1085 grpc_error_handle* error)
1086 : deadline_checking_enabled_(
1087 grpc_deadline_checking_enabled(args->channel_args)),
1088 enable_retries_(GetEnableRetries(args->channel_args)),
1089 owning_stack_(args->channel_stack),
1090 client_channel_factory_(
1091 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1092 channelz_node_(GetChannelzNode(args->channel_args)),
1093 interested_parties_(grpc_pollset_set_create()),
1094 work_serializer_(std::make_shared<WorkSerializer>()),
1095 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1096 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1097 disconnect_error_(GRPC_ERROR_NONE) {
1098 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1099 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1100 this, owning_stack_);
1101 }
1102 // Start backup polling.
1103 grpc_client_channel_start_backup_polling(interested_parties_);
1104 // Check client channel factory.
1105 if (client_channel_factory_ == nullptr) {
1106 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1107 "Missing client channel factory in args for client channel filter");
1108 return;
1109 }
1110 // Get server name to resolve, using proxy mapper if needed.
1111 const char* server_uri =
1112 grpc_channel_args_find_string(args->channel_args, GRPC_ARG_SERVER_URI);
1113 if (server_uri == nullptr) {
1114 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1115 "server URI channel arg missing or wrong type in client channel "
1116 "filter");
1117 return;
1118 }
1119 // Get default service config. If none is specified via the client API,
1120 // we use an empty config.
1121 const char* service_config_json = grpc_channel_args_find_string(
1122 args->channel_args, GRPC_ARG_SERVICE_CONFIG);
1123 if (service_config_json == nullptr) service_config_json = "{}";
1124 *error = GRPC_ERROR_NONE;
1125 default_service_config_ =
1126 ServiceConfig::Create(args->channel_args, service_config_json, error);
1127 if (*error != GRPC_ERROR_NONE) {
1128 default_service_config_.reset();
1129 return;
1130 }
1131 absl::StatusOr<URI> uri = URI::Parse(server_uri);
1132 if (uri.ok() && !uri->path().empty()) {
1133 server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1134 }
1135 char* proxy_name = nullptr;
1136 grpc_channel_args* new_args = nullptr;
1137 ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1138 &new_args);
1139 target_uri_.reset(proxy_name != nullptr ? proxy_name
1140 : gpr_strdup(server_uri));
1141 // Strip out service config channel arg, so that it doesn't affect
1142 // subchannel uniqueness when the args flow down to that layer.
1143 const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1144 channel_args_ = grpc_channel_args_copy_and_remove(
1145 new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1146 grpc_channel_args_destroy(new_args);
1147 keepalive_time_ = grpc_channel_args_find_integer(
1148 channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1149 {-1 /* default value, unset */, 1, INT_MAX});
1150 if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1151 std::string error_message =
1152 absl::StrCat("the target uri is not valid: ", target_uri_.get());
1153 *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1154 return;
1155 }
1156 *error = GRPC_ERROR_NONE;
1157 }
1158
~ClientChannel()1159 ClientChannel::~ClientChannel() {
1160 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1161 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1162 }
1163 DestroyResolverAndLbPolicyLocked();
1164 grpc_channel_args_destroy(channel_args_);
1165 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1166 // Stop backup polling.
1167 grpc_client_channel_stop_backup_polling(interested_parties_);
1168 grpc_pollset_set_destroy(interested_parties_);
1169 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1170 }
1171
1172 RefCountedPtr<ClientChannel::LoadBalancedCall>
CreateLoadBalancedCall(const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete)1173 ClientChannel::CreateLoadBalancedCall(
1174 const grpc_call_element_args& args, grpc_polling_entity* pollent,
1175 grpc_closure* on_call_destruction_complete) {
1176 return args.arena->New<LoadBalancedCall>(this, args, pollent,
1177 on_call_destruction_complete);
1178 }
1179
1180 namespace {
1181
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1182 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1183 const Resolver::Result& resolver_result,
1184 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1185 // Prefer the LB policy config found in the service config.
1186 if (parsed_service_config->parsed_lb_config() != nullptr) {
1187 return parsed_service_config->parsed_lb_config();
1188 }
1189 // Try the deprecated LB policy name from the service config.
1190 // If not, try the setting from channel args.
1191 const char* policy_name = nullptr;
1192 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1193 policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1194 } else {
1195 policy_name = grpc_channel_args_find_string(resolver_result.args,
1196 GRPC_ARG_LB_POLICY_NAME);
1197 }
1198 // Use pick_first if nothing was specified and we didn't select grpclb
1199 // above.
1200 if (policy_name == nullptr) policy_name = "pick_first";
1201 // Now that we have the policy name, construct an empty config for it.
1202 Json config_json = Json::Array{Json::Object{
1203 {policy_name, Json::Object{}},
1204 }};
1205 grpc_error_handle parse_error = GRPC_ERROR_NONE;
1206 auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1207 config_json, &parse_error);
1208 // The policy name came from one of three places:
1209 // - The deprecated loadBalancingPolicy field in the service config,
1210 // in which case the code in ClientChannelServiceConfigParser
1211 // already verified that the policy does not require a config.
1212 // - One of the hard-coded values here, all of which are known to not
1213 // require a config.
1214 // - A channel arg, in which case the application did something that
1215 // is a misuse of our API.
1216 // In the first two cases, these assertions will always be true. In
1217 // the last case, this is probably fine for now.
1218 // TODO(roth): If the last case becomes a problem, add better error
1219 // handling here.
1220 GPR_ASSERT(lb_policy_config != nullptr);
1221 GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
1222 return lb_policy_config;
1223 }
1224
1225 } // namespace
1226
OnResolverResultChangedLocked(Resolver::Result result)1227 void ClientChannel::OnResolverResultChangedLocked(Resolver::Result result) {
1228 // Handle race conditions.
1229 if (resolver_ == nullptr) return;
1230 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1231 gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
1232 }
1233 // We only want to trace the address resolution in the follow cases:
1234 // (a) Address resolution resulted in service config change.
1235 // (b) Address resolution that causes number of backends to go from
1236 // zero to non-zero.
1237 // (c) Address resolution that causes number of backends to go from
1238 // non-zero to zero.
1239 // (d) Address resolution that causes a new LB policy to be created.
1240 //
1241 // We track a list of strings to eventually be concatenated and traced.
1242 absl::InlinedVector<const char*, 3> trace_strings;
1243 if (result.addresses.empty() && previous_resolution_contained_addresses_) {
1244 trace_strings.push_back("Address list became empty");
1245 } else if (!result.addresses.empty() &&
1246 !previous_resolution_contained_addresses_) {
1247 trace_strings.push_back("Address list became non-empty");
1248 }
1249 previous_resolution_contained_addresses_ = !result.addresses.empty();
1250 std::string service_config_error_string_storage;
1251 if (result.service_config_error != GRPC_ERROR_NONE) {
1252 service_config_error_string_storage =
1253 grpc_error_std_string(result.service_config_error);
1254 trace_strings.push_back(service_config_error_string_storage.c_str());
1255 }
1256 // Choose the service config.
1257 RefCountedPtr<ServiceConfig> service_config;
1258 RefCountedPtr<ConfigSelector> config_selector;
1259 if (result.service_config_error != GRPC_ERROR_NONE) {
1260 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1261 gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
1262 this, grpc_error_std_string(result.service_config_error).c_str());
1263 }
1264 // If the service config was invalid, then fallback to the
1265 // previously returned service config.
1266 if (saved_service_config_ != nullptr) {
1267 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1268 gpr_log(GPR_INFO,
1269 "chand=%p: resolver returned invalid service config. "
1270 "Continuing to use previous service config.",
1271 this);
1272 }
1273 service_config = saved_service_config_;
1274 config_selector = saved_config_selector_;
1275 } else {
1276 // We received an invalid service config and we don't have a
1277 // previous service config to fall back to. Put the channel into
1278 // TRANSIENT_FAILURE.
1279 OnResolverErrorLocked(GRPC_ERROR_REF(result.service_config_error));
1280 trace_strings.push_back("no valid service config");
1281 }
1282 } else if (result.service_config == nullptr) {
1283 // Resolver did not return any service config.
1284 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1285 gpr_log(GPR_INFO,
1286 "chand=%p: resolver returned no service config. Using default "
1287 "service config for channel.",
1288 this);
1289 }
1290 service_config = default_service_config_;
1291 } else {
1292 // Use ServiceConfig and ConfigSelector returned by resolver.
1293 service_config = result.service_config;
1294 config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
1295 }
1296 if (service_config != nullptr) {
1297 // Extract global config for client channel.
1298 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
1299 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
1300 service_config->GetGlobalParsedConfig(
1301 internal::ClientChannelServiceConfigParser::ParserIndex()));
1302 // Choose LB policy config.
1303 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
1304 ChooseLbPolicy(result, parsed_service_config);
1305 // Check if the ServiceConfig has changed.
1306 const bool service_config_changed =
1307 saved_service_config_ == nullptr ||
1308 service_config->json_string() != saved_service_config_->json_string();
1309 // Check if the ConfigSelector has changed.
1310 const bool config_selector_changed = !ConfigSelector::Equals(
1311 saved_config_selector_.get(), config_selector.get());
1312 // If either has changed, apply the global parameters now.
1313 if (service_config_changed || config_selector_changed) {
1314 // Update service config in control plane.
1315 UpdateServiceConfigInControlPlaneLocked(
1316 std::move(service_config), std::move(config_selector),
1317 parsed_service_config, lb_policy_config->name());
1318 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1319 gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
1320 }
1321 // Create or update LB policy, as needed.
1322 CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
1323 std::move(result));
1324 if (service_config_changed || config_selector_changed) {
1325 // Start using new service config for calls.
1326 // This needs to happen after the LB policy has been updated, since
1327 // the ConfigSelector may need the LB policy to know about new
1328 // destinations before it can send RPCs to those destinations.
1329 UpdateServiceConfigInDataPlaneLocked();
1330 // TODO(ncteisen): might be worth somehow including a snippet of the
1331 // config in the trace, at the risk of bloating the trace logs.
1332 trace_strings.push_back("Service config changed");
1333 }
1334 }
1335 // Add channel trace event.
1336 if (!trace_strings.empty()) {
1337 std::string message =
1338 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
1339 if (channelz_node_ != nullptr) {
1340 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
1341 grpc_slice_from_cpp_string(message));
1342 }
1343 }
1344 }
1345
OnResolverErrorLocked(grpc_error_handle error)1346 void ClientChannel::OnResolverErrorLocked(grpc_error_handle error) {
1347 if (resolver_ == nullptr) {
1348 GRPC_ERROR_UNREF(error);
1349 return;
1350 }
1351 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1352 gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
1353 grpc_error_std_string(error).c_str());
1354 }
1355 // If we already have an LB policy from a previous resolution
1356 // result, then we continue to let it set the connectivity state.
1357 // Otherwise, we go into TRANSIENT_FAILURE.
1358 if (lb_policy_ == nullptr) {
1359 grpc_error_handle state_error =
1360 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
1361 "Resolver transient failure", &error, 1);
1362 {
1363 MutexLock lock(&resolution_mu_);
1364 // Update resolver transient failure.
1365 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1366 resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
1367 // Process calls that were queued waiting for the resolver result.
1368 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1369 call = call->next) {
1370 grpc_call_element* elem = call->elem;
1371 CallData* calld = static_cast<CallData*>(elem->call_data);
1372 grpc_error_handle error = GRPC_ERROR_NONE;
1373 if (calld->CheckResolutionLocked(elem, &error)) {
1374 calld->AsyncResolutionDone(elem, error);
1375 }
1376 }
1377 }
1378 // Update connectivity state.
1379 UpdateStateAndPickerLocked(
1380 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
1381 "resolver failure",
1382 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1383 state_error));
1384 }
1385 GRPC_ERROR_UNREF(error);
1386 }
1387
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)1388 void ClientChannel::CreateOrUpdateLbPolicyLocked(
1389 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
1390 Resolver::Result result) {
1391 // Construct update.
1392 LoadBalancingPolicy::UpdateArgs update_args;
1393 update_args.addresses = std::move(result.addresses);
1394 update_args.config = std::move(lb_policy_config);
1395 // Remove the config selector from channel args so that we're not holding
1396 // unnecessary refs that cause it to be destroyed somewhere other than in the
1397 // WorkSerializer.
1398 const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
1399 update_args.args =
1400 grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
1401 // Create policy if needed.
1402 if (lb_policy_ == nullptr) {
1403 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
1404 }
1405 // Update the policy.
1406 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1407 gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
1408 lb_policy_.get());
1409 }
1410 lb_policy_->UpdateLocked(std::move(update_args));
1411 }
1412
1413 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)1414 OrphanablePtr<LoadBalancingPolicy> ClientChannel::CreateLbPolicyLocked(
1415 const grpc_channel_args& args) {
1416 LoadBalancingPolicy::Args lb_policy_args;
1417 lb_policy_args.work_serializer = work_serializer_;
1418 lb_policy_args.channel_control_helper =
1419 absl::make_unique<ClientChannelControlHelper>(this);
1420 lb_policy_args.args = &args;
1421 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1422 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
1423 &grpc_client_channel_routing_trace);
1424 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1425 gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
1426 lb_policy.get());
1427 }
1428 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1429 interested_parties_);
1430 return lb_policy;
1431 }
1432
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)1433 void ClientChannel::AddResolverQueuedCall(ResolverQueuedCall* call,
1434 grpc_polling_entity* pollent) {
1435 // Add call to queued calls list.
1436 call->next = resolver_queued_calls_;
1437 resolver_queued_calls_ = call;
1438 // Add call's pollent to channel's interested_parties, so that I/O
1439 // can be done under the call's CQ.
1440 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1441 }
1442
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)1443 void ClientChannel::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
1444 grpc_polling_entity* pollent) {
1445 // Remove call's pollent from channel's interested_parties.
1446 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1447 // Remove from queued calls list.
1448 for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
1449 call = &(*call)->next) {
1450 if (*call == to_remove) {
1451 *call = to_remove->next;
1452 return;
1453 }
1454 }
1455 }
1456
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)1457 void ClientChannel::UpdateServiceConfigInControlPlaneLocked(
1458 RefCountedPtr<ServiceConfig> service_config,
1459 RefCountedPtr<ConfigSelector> config_selector,
1460 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
1461 const char* lb_policy_name) {
1462 UniquePtr<char> service_config_json(
1463 gpr_strdup(service_config->json_string().c_str()));
1464 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1465 gpr_log(GPR_INFO,
1466 "chand=%p: resolver returned updated service config: \"%s\"", this,
1467 service_config_json.get());
1468 }
1469 // Save service config.
1470 saved_service_config_ = std::move(service_config);
1471 // Update health check service name if needed.
1472 if (health_check_service_name_ !=
1473 parsed_service_config->health_check_service_name()) {
1474 health_check_service_name_ =
1475 parsed_service_config->health_check_service_name();
1476 // Update health check service name used by existing subchannel wrappers.
1477 for (auto* subchannel_wrapper : subchannel_wrappers_) {
1478 subchannel_wrapper->UpdateHealthCheckServiceName(
1479 health_check_service_name_);
1480 }
1481 }
1482 // Swap out the data used by GetChannelInfo().
1483 UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
1484 {
1485 MutexLock lock(&info_mu_);
1486 info_lb_policy_name_ = std::move(lb_policy_name_owned);
1487 info_service_config_json_ = std::move(service_config_json);
1488 }
1489 // Save config selector.
1490 saved_config_selector_ = std::move(config_selector);
1491 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1492 gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
1493 saved_config_selector_.get());
1494 }
1495 }
1496
UpdateServiceConfigInDataPlaneLocked()1497 void ClientChannel::UpdateServiceConfigInDataPlaneLocked() {
1498 // Grab ref to service config.
1499 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
1500 // Grab ref to config selector. Use default if resolver didn't supply one.
1501 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
1502 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1503 gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
1504 saved_config_selector_.get());
1505 }
1506 if (config_selector == nullptr) {
1507 config_selector =
1508 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
1509 }
1510 // Construct dynamic filter stack.
1511 std::vector<const grpc_channel_filter*> filters =
1512 config_selector->GetFilters();
1513 if (enable_retries_) {
1514 filters.push_back(&kRetryFilterVtable);
1515 } else {
1516 filters.push_back(&DynamicTerminationFilter::kFilterVtable);
1517 }
1518 absl::InlinedVector<grpc_arg, 2> args_to_add = {
1519 grpc_channel_arg_pointer_create(
1520 const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL), this,
1521 &kClientChannelArgPointerVtable),
1522 grpc_channel_arg_pointer_create(
1523 const_cast<char*>(GRPC_ARG_SERVICE_CONFIG_OBJ), service_config.get(),
1524 &kServiceConfigObjArgPointerVtable),
1525 };
1526 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
1527 channel_args_, args_to_add.data(), args_to_add.size());
1528 new_args = config_selector->ModifyChannelArgs(new_args);
1529 RefCountedPtr<DynamicFilters> dynamic_filters =
1530 DynamicFilters::Create(new_args, std::move(filters));
1531 GPR_ASSERT(dynamic_filters != nullptr);
1532 grpc_channel_args_destroy(new_args);
1533 // Grab data plane lock to update service config.
1534 //
1535 // We defer unreffing the old values (and deallocating memory) until
1536 // after releasing the lock to keep the critical section small.
1537 std::set<grpc_call_element*> calls_pending_resolver_result;
1538 {
1539 MutexLock lock(&resolution_mu_);
1540 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1541 resolver_transient_failure_error_ = GRPC_ERROR_NONE;
1542 // Update service config.
1543 received_service_config_data_ = true;
1544 // Old values will be unreffed after lock is released.
1545 service_config_.swap(service_config);
1546 config_selector_.swap(config_selector);
1547 dynamic_filters_.swap(dynamic_filters);
1548 // Process calls that were queued waiting for the resolver result.
1549 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
1550 call = call->next) {
1551 grpc_call_element* elem = call->elem;
1552 CallData* calld = static_cast<CallData*>(elem->call_data);
1553 grpc_error_handle error = GRPC_ERROR_NONE;
1554 if (calld->CheckResolutionLocked(elem, &error)) {
1555 calld->AsyncResolutionDone(elem, error);
1556 }
1557 }
1558 }
1559 // Old values will be unreffed after lock is released when they go out
1560 // of scope.
1561 }
1562
CreateResolverLocked()1563 void ClientChannel::CreateResolverLocked() {
1564 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1565 gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
1566 }
1567 resolver_ = ResolverRegistry::CreateResolver(
1568 target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
1569 absl::make_unique<ResolverResultHandler>(this));
1570 // Since the validity of the args was checked when the channel was created,
1571 // CreateResolver() must return a non-null result.
1572 GPR_ASSERT(resolver_ != nullptr);
1573 UpdateStateAndPickerLocked(
1574 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
1575 absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
1576 resolver_->StartLocked();
1577 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1578 gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
1579 }
1580 }
1581
DestroyResolverAndLbPolicyLocked()1582 void ClientChannel::DestroyResolverAndLbPolicyLocked() {
1583 if (resolver_ != nullptr) {
1584 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1585 gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
1586 resolver_.get());
1587 }
1588 resolver_.reset();
1589 if (lb_policy_ != nullptr) {
1590 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1591 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
1592 lb_policy_.get());
1593 }
1594 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
1595 interested_parties_);
1596 lb_policy_.reset();
1597 }
1598 }
1599 }
1600
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1601 void ClientChannel::UpdateStateAndPickerLocked(
1602 grpc_connectivity_state state, const absl::Status& status,
1603 const char* reason,
1604 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
1605 // Special case for IDLE and SHUTDOWN states.
1606 if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
1607 saved_service_config_.reset();
1608 saved_config_selector_.reset();
1609 // Acquire resolution lock to update config selector and associated state.
1610 // To minimize lock contention, we wait to unref these objects until
1611 // after we release the lock.
1612 RefCountedPtr<ServiceConfig> service_config_to_unref;
1613 RefCountedPtr<ConfigSelector> config_selector_to_unref;
1614 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
1615 {
1616 MutexLock lock(&resolution_mu_);
1617 received_service_config_data_ = false;
1618 service_config_to_unref = std::move(service_config_);
1619 config_selector_to_unref = std::move(config_selector_);
1620 dynamic_filters_to_unref = std::move(dynamic_filters_);
1621 }
1622 }
1623 // Update connectivity state.
1624 state_tracker_.SetState(state, status, reason);
1625 if (channelz_node_ != nullptr) {
1626 channelz_node_->SetConnectivityState(state);
1627 channelz_node_->AddTraceEvent(
1628 channelz::ChannelTrace::Severity::Info,
1629 grpc_slice_from_static_string(
1630 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
1631 state)));
1632 }
1633 // Grab data plane lock to do subchannel updates and update the picker.
1634 //
1635 // Note that we want to minimize the work done while holding the data
1636 // plane lock, to keep the critical section small. So, for all of the
1637 // objects that we might wind up unreffing here, we actually hold onto
1638 // the refs until after we release the lock, and then unref them at
1639 // that point. This includes the following:
1640 // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
1641 // - ownership of the existing picker in picker_
1642 {
1643 MutexLock lock(&data_plane_mu_);
1644 // Handle subchannel updates.
1645 for (auto& p : pending_subchannel_updates_) {
1646 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1647 gpr_log(GPR_INFO,
1648 "chand=%p: updating subchannel wrapper %p data plane "
1649 "connected_subchannel to %p",
1650 this, p.first.get(), p.second.get());
1651 }
1652 // Note: We do not remove the entry from pending_subchannel_updates_
1653 // here, since this would unref the subchannel wrapper; instead,
1654 // we wait until we've released the lock to clear the map.
1655 p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
1656 }
1657 // Swap out the picker.
1658 // Note: Original value will be destroyed after the lock is released.
1659 picker_.swap(picker);
1660 // Re-process queued picks.
1661 for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
1662 call = call->next) {
1663 grpc_error_handle error = GRPC_ERROR_NONE;
1664 if (call->lb_call->PickSubchannelLocked(&error)) {
1665 call->lb_call->AsyncPickDone(error);
1666 }
1667 }
1668 }
1669 // Clear the pending update map after releasing the lock, to keep the
1670 // critical section small.
1671 pending_subchannel_updates_.clear();
1672 }
1673
DoPingLocked(grpc_transport_op * op)1674 grpc_error_handle ClientChannel::DoPingLocked(grpc_transport_op* op) {
1675 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
1676 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
1677 }
1678 LoadBalancingPolicy::PickResult result;
1679 {
1680 MutexLock lock(&data_plane_mu_);
1681 result = picker_->Pick(LoadBalancingPolicy::PickArgs());
1682 }
1683 ConnectedSubchannel* connected_subchannel = nullptr;
1684 if (result.subchannel != nullptr) {
1685 SubchannelWrapper* subchannel =
1686 static_cast<SubchannelWrapper*>(result.subchannel.get());
1687 connected_subchannel = subchannel->connected_subchannel();
1688 }
1689 if (connected_subchannel != nullptr) {
1690 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
1691 } else {
1692 if (result.error == GRPC_ERROR_NONE) {
1693 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1694 "LB policy dropped call on ping");
1695 }
1696 }
1697 return result.error;
1698 }
1699
StartTransportOpLocked(grpc_transport_op * op)1700 void ClientChannel::StartTransportOpLocked(grpc_transport_op* op) {
1701 // Connectivity watch.
1702 if (op->start_connectivity_watch != nullptr) {
1703 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
1704 std::move(op->start_connectivity_watch));
1705 }
1706 if (op->stop_connectivity_watch != nullptr) {
1707 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
1708 }
1709 // Ping.
1710 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
1711 grpc_error_handle error = DoPingLocked(op);
1712 if (error != GRPC_ERROR_NONE) {
1713 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
1714 GRPC_ERROR_REF(error));
1715 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
1716 }
1717 op->bind_pollset = nullptr;
1718 op->send_ping.on_initiate = nullptr;
1719 op->send_ping.on_ack = nullptr;
1720 }
1721 // Reset backoff.
1722 if (op->reset_connect_backoff) {
1723 if (lb_policy_ != nullptr) {
1724 lb_policy_->ResetBackoffLocked();
1725 }
1726 }
1727 // Disconnect or enter IDLE.
1728 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
1729 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1730 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
1731 grpc_error_std_string(op->disconnect_with_error).c_str());
1732 }
1733 DestroyResolverAndLbPolicyLocked();
1734 intptr_t value;
1735 if (grpc_error_get_int(op->disconnect_with_error,
1736 GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
1737 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
1738 if (disconnect_error() == GRPC_ERROR_NONE) {
1739 // Enter IDLE state.
1740 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
1741 "channel entering IDLE", nullptr);
1742 }
1743 GRPC_ERROR_UNREF(op->disconnect_with_error);
1744 } else {
1745 // Disconnect.
1746 GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
1747 GRPC_ERROR_NONE);
1748 disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
1749 UpdateStateAndPickerLocked(
1750 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
1751 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
1752 GRPC_ERROR_REF(op->disconnect_with_error)));
1753 }
1754 }
1755 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
1756 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
1757 }
1758
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)1759 void ClientChannel::StartTransportOp(grpc_channel_element* elem,
1760 grpc_transport_op* op) {
1761 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1762 GPR_ASSERT(op->set_accept_stream == false);
1763 // Handle bind_pollset.
1764 if (op->bind_pollset != nullptr) {
1765 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
1766 }
1767 // Pop into control plane work_serializer for remaining ops.
1768 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
1769 chand->work_serializer_->Run(
1770 [chand, op]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) {
1771 chand->StartTransportOpLocked(op);
1772 },
1773 DEBUG_LOCATION);
1774 }
1775
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)1776 void ClientChannel::GetChannelInfo(grpc_channel_element* elem,
1777 const grpc_channel_info* info) {
1778 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1779 MutexLock lock(&chand->info_mu_);
1780 if (info->lb_policy_name != nullptr) {
1781 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
1782 }
1783 if (info->service_config_json != nullptr) {
1784 *info->service_config_json =
1785 gpr_strdup(chand->info_service_config_json_.get());
1786 }
1787 }
1788
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)1789 void ClientChannel::AddLbQueuedCall(LbQueuedCall* call,
1790 grpc_polling_entity* pollent) {
1791 // Add call to queued picks list.
1792 call->next = lb_queued_calls_;
1793 lb_queued_calls_ = call;
1794 // Add call's pollent to channel's interested_parties, so that I/O
1795 // can be done under the call's CQ.
1796 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
1797 }
1798
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)1799 void ClientChannel::RemoveLbQueuedCall(LbQueuedCall* to_remove,
1800 grpc_polling_entity* pollent) {
1801 // Remove call's pollent from channel's interested_parties.
1802 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
1803 // Remove from queued picks list.
1804 for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
1805 call = &(*call)->next) {
1806 if (*call == to_remove) {
1807 *call = to_remove->next;
1808 return;
1809 }
1810 }
1811 }
1812
1813 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const1814 ClientChannel::GetConnectedSubchannelInDataPlane(
1815 SubchannelInterface* subchannel) const {
1816 SubchannelWrapper* subchannel_wrapper =
1817 static_cast<SubchannelWrapper*>(subchannel);
1818 ConnectedSubchannel* connected_subchannel =
1819 subchannel_wrapper->connected_subchannel_in_data_plane();
1820 if (connected_subchannel == nullptr) return nullptr;
1821 return connected_subchannel->Ref();
1822 }
1823
TryToConnectLocked()1824 void ClientChannel::TryToConnectLocked() {
1825 if (lb_policy_ != nullptr) {
1826 lb_policy_->ExitIdleLocked();
1827 } else if (resolver_ == nullptr) {
1828 CreateResolverLocked();
1829 }
1830 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
1831 }
1832
CheckConnectivityState(bool try_to_connect)1833 grpc_connectivity_state ClientChannel::CheckConnectivityState(
1834 bool try_to_connect) {
1835 // state_tracker_ is guarded by work_serializer_, which we're not
1836 // holding here. But the one method of state_tracker_ that *is*
1837 // thread-safe to call without external synchronization is the state()
1838 // method, so we can disable thread-safety analysis for this one read.
1839 grpc_connectivity_state out = ABSL_TS_UNCHECKED_READ(state_tracker_).state();
1840 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
1841 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
1842 work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
1843 work_serializer_) { TryToConnectLocked(); },
1844 DEBUG_LOCATION);
1845 }
1846 return out;
1847 }
1848
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1849 void ClientChannel::AddConnectivityWatcher(
1850 grpc_connectivity_state initial_state,
1851 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
1852 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
1853 }
1854
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)1855 void ClientChannel::RemoveConnectivityWatcher(
1856 AsyncConnectivityStateWatcherInterface* watcher) {
1857 new ConnectivityWatcherRemover(this, watcher);
1858 }
1859
1860 //
1861 // CallData implementation
1862 //
1863
CallData(grpc_call_element * elem,const ClientChannel & chand,const grpc_call_element_args & args)1864 ClientChannel::CallData::CallData(grpc_call_element* elem,
1865 const ClientChannel& chand,
1866 const grpc_call_element_args& args)
1867 : deadline_state_(elem, args,
1868 GPR_LIKELY(chand.deadline_checking_enabled_)
1869 ? args.deadline
1870 : GRPC_MILLIS_INF_FUTURE),
1871 path_(grpc_slice_ref_internal(args.path)),
1872 call_start_time_(args.start_time),
1873 deadline_(args.deadline),
1874 arena_(args.arena),
1875 owning_call_(args.call_stack),
1876 call_combiner_(args.call_combiner),
1877 call_context_(args.context) {
1878 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1879 gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
1880 }
1881 }
1882
~CallData()1883 ClientChannel::CallData::~CallData() {
1884 grpc_slice_unref_internal(path_);
1885 GRPC_ERROR_UNREF(cancel_error_);
1886 // Make sure there are no remaining pending batches.
1887 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1888 GPR_ASSERT(pending_batches_[i] == nullptr);
1889 }
1890 }
1891
Init(grpc_call_element * elem,const grpc_call_element_args * args)1892 grpc_error_handle ClientChannel::CallData::Init(
1893 grpc_call_element* elem, const grpc_call_element_args* args) {
1894 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1895 new (elem->call_data) CallData(elem, *chand, *args);
1896 return GRPC_ERROR_NONE;
1897 }
1898
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1899 void ClientChannel::CallData::Destroy(
1900 grpc_call_element* elem, const grpc_call_final_info* /*final_info*/,
1901 grpc_closure* then_schedule_closure) {
1902 CallData* calld = static_cast<CallData*>(elem->call_data);
1903 RefCountedPtr<DynamicFilters::Call> dynamic_call =
1904 std::move(calld->dynamic_call_);
1905 calld->~CallData();
1906 if (GPR_LIKELY(dynamic_call != nullptr)) {
1907 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
1908 } else {
1909 // TODO(yashkt) : This can potentially be a Closure::Run
1910 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1911 }
1912 }
1913
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1914 void ClientChannel::CallData::StartTransportStreamOpBatch(
1915 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1916 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
1917 CallData* calld = static_cast<CallData*>(elem->call_data);
1918 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
1919 if (GPR_LIKELY(chand->deadline_checking_enabled_)) {
1920 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
1921 }
1922 // Intercept recv_initial_metadata for config selector on-committed callback.
1923 if (batch->recv_initial_metadata) {
1924 calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
1925 }
1926 // If we've previously been cancelled, immediately fail any new batches.
1927 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
1928 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1929 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
1930 chand, calld,
1931 grpc_error_std_string(calld->cancel_error_).c_str());
1932 }
1933 // Note: This will release the call combiner.
1934 grpc_transport_stream_op_batch_finish_with_failure(
1935 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1936 return;
1937 }
1938 // Handle cancellation.
1939 if (GPR_UNLIKELY(batch->cancel_stream)) {
1940 // Stash a copy of cancel_error in our call data, so that we can use
1941 // it for subsequent operations. This ensures that if the call is
1942 // cancelled before any batches are passed down (e.g., if the deadline
1943 // is in the past when the call starts), we can return the right
1944 // error to the caller when the first batch does get passed down.
1945 GRPC_ERROR_UNREF(calld->cancel_error_);
1946 calld->cancel_error_ =
1947 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
1948 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1949 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
1950 calld, grpc_error_std_string(calld->cancel_error_).c_str());
1951 }
1952 // If we do not have a dynamic call (i.e., name resolution has not
1953 // yet completed), fail all pending batches. Otherwise, send the
1954 // cancellation down to the dynamic call.
1955 if (calld->dynamic_call_ == nullptr) {
1956 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
1957 NoYieldCallCombiner);
1958 // Note: This will release the call combiner.
1959 grpc_transport_stream_op_batch_finish_with_failure(
1960 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
1961 } else {
1962 // Note: This will release the call combiner.
1963 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
1964 }
1965 return;
1966 }
1967 // Add the batch to the pending list.
1968 calld->PendingBatchesAdd(elem, batch);
1969 // Check if we've already created a dynamic call.
1970 // Note that once we have done so, we do not need to acquire the channel's
1971 // resolution mutex, which is more efficient (especially for streaming calls).
1972 if (calld->dynamic_call_ != nullptr) {
1973 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1974 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
1975 chand, calld, calld->dynamic_call_.get());
1976 }
1977 calld->PendingBatchesResume(elem);
1978 return;
1979 }
1980 // We do not yet have a dynamic call.
1981 // For batches containing a send_initial_metadata op, acquire the
1982 // channel's resolution mutex to apply the service config to the call,
1983 // after which we will create a dynamic call.
1984 if (GPR_LIKELY(batch->send_initial_metadata)) {
1985 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1986 gpr_log(GPR_INFO,
1987 "chand=%p calld=%p: grabbing resolution mutex to apply service "
1988 "config",
1989 chand, calld);
1990 }
1991 CheckResolution(elem, GRPC_ERROR_NONE);
1992 } else {
1993 // For all other batches, release the call combiner.
1994 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
1995 gpr_log(GPR_INFO,
1996 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
1997 calld);
1998 }
1999 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2000 "batch does not include send_initial_metadata");
2001 }
2002 }
2003
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2004 void ClientChannel::CallData::SetPollent(grpc_call_element* elem,
2005 grpc_polling_entity* pollent) {
2006 CallData* calld = static_cast<CallData*>(elem->call_data);
2007 calld->pollent_ = pollent;
2008 }
2009
2010 //
2011 // pending_batches management
2012 //
2013
GetBatchIndex(grpc_transport_stream_op_batch * batch)2014 size_t ClientChannel::CallData::GetBatchIndex(
2015 grpc_transport_stream_op_batch* batch) {
2016 // Note: It is important the send_initial_metadata be the first entry
2017 // here, since the code in ApplyServiceConfigToCallLocked() and
2018 // CheckResolutionLocked() assumes it will be.
2019 if (batch->send_initial_metadata) return 0;
2020 if (batch->send_message) return 1;
2021 if (batch->send_trailing_metadata) return 2;
2022 if (batch->recv_initial_metadata) return 3;
2023 if (batch->recv_message) return 4;
2024 if (batch->recv_trailing_metadata) return 5;
2025 GPR_UNREACHABLE_CODE(return (size_t)-1);
2026 }
2027
2028 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2029 void ClientChannel::CallData::PendingBatchesAdd(
2030 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2031 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2032 const size_t idx = GetBatchIndex(batch);
2033 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2034 gpr_log(GPR_INFO,
2035 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2036 this, idx);
2037 }
2038 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2039 GPR_ASSERT(pending == nullptr);
2040 pending = batch;
2041 }
2042
2043 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2044 void ClientChannel::CallData::FailPendingBatchInCallCombiner(
2045 void* arg, grpc_error_handle error) {
2046 grpc_transport_stream_op_batch* batch =
2047 static_cast<grpc_transport_stream_op_batch*>(arg);
2048 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2049 // Note: This will release the call combiner.
2050 grpc_transport_stream_op_batch_finish_with_failure(
2051 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2052 }
2053
2054 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2055 void ClientChannel::CallData::PendingBatchesFail(
2056 grpc_call_element* elem, grpc_error_handle error,
2057 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2058 GPR_ASSERT(error != GRPC_ERROR_NONE);
2059 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2060 size_t num_batches = 0;
2061 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2062 if (pending_batches_[i] != nullptr) ++num_batches;
2063 }
2064 gpr_log(GPR_INFO,
2065 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2066 elem->channel_data, this, num_batches,
2067 grpc_error_std_string(error).c_str());
2068 }
2069 CallCombinerClosureList closures;
2070 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2071 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2072 if (batch != nullptr) {
2073 batch->handler_private.extra_arg = this;
2074 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2075 FailPendingBatchInCallCombiner, batch,
2076 grpc_schedule_on_exec_ctx);
2077 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2078 "PendingBatchesFail");
2079 batch = nullptr;
2080 }
2081 }
2082 if (yield_call_combiner_predicate(closures)) {
2083 closures.RunClosures(call_combiner_);
2084 } else {
2085 closures.RunClosuresWithoutYielding(call_combiner_);
2086 }
2087 GRPC_ERROR_UNREF(error);
2088 }
2089
2090 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2091 void ClientChannel::CallData::ResumePendingBatchInCallCombiner(
2092 void* arg, grpc_error_handle /*ignored*/) {
2093 grpc_transport_stream_op_batch* batch =
2094 static_cast<grpc_transport_stream_op_batch*>(arg);
2095 auto* elem =
2096 static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2097 auto* calld = static_cast<CallData*>(elem->call_data);
2098 // Note: This will release the call combiner.
2099 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2100 }
2101
2102 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2103 void ClientChannel::CallData::PendingBatchesResume(grpc_call_element* elem) {
2104 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2105 // Retries not enabled; send down batches as-is.
2106 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2107 size_t num_batches = 0;
2108 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2109 if (pending_batches_[i] != nullptr) ++num_batches;
2110 }
2111 gpr_log(GPR_INFO,
2112 "chand=%p calld=%p: starting %" PRIuPTR
2113 " pending batches on dynamic_call=%p",
2114 chand, this, num_batches, dynamic_call_.get());
2115 }
2116 CallCombinerClosureList closures;
2117 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2118 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2119 if (batch != nullptr) {
2120 batch->handler_private.extra_arg = elem;
2121 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2122 ResumePendingBatchInCallCombiner, batch, nullptr);
2123 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2124 "PendingBatchesResume");
2125 batch = nullptr;
2126 }
2127 }
2128 // Note: This will release the call combiner.
2129 closures.RunClosures(call_combiner_);
2130 }
2131
2132 //
2133 // name resolution
2134 //
2135
2136 // A class to handle the call combiner cancellation callback for a
2137 // queued pick.
2138 class ClientChannel::CallData::ResolverQueuedCallCanceller {
2139 public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2140 explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2141 auto* calld = static_cast<CallData*>(elem->call_data);
2142 GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2143 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2144 grpc_schedule_on_exec_ctx);
2145 calld->call_combiner_->SetNotifyOnCancel(&closure_);
2146 }
2147
2148 private:
CancelLocked(void * arg,grpc_error_handle error)2149 static void CancelLocked(void* arg, grpc_error_handle error) {
2150 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2151 auto* chand = static_cast<ClientChannel*>(self->elem_->channel_data);
2152 auto* calld = static_cast<CallData*>(self->elem_->call_data);
2153 {
2154 MutexLock lock(&chand->resolution_mu_);
2155 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2156 gpr_log(GPR_INFO,
2157 "chand=%p calld=%p: cancelling resolver queued pick: "
2158 "error=%s self=%p calld->resolver_pick_canceller=%p",
2159 chand, calld, grpc_error_std_string(error).c_str(), self,
2160 calld->resolver_call_canceller_);
2161 }
2162 if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2163 // Remove pick from list of queued picks.
2164 calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2165 // Fail pending batches on the call.
2166 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2167 YieldCallCombinerIfPendingBatchesFound);
2168 }
2169 }
2170 GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2171 delete self;
2172 }
2173
2174 grpc_call_element* elem_;
2175 grpc_closure closure_;
2176 };
2177
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2178 void ClientChannel::CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2179 grpc_call_element* elem) {
2180 if (!queued_pending_resolver_result_) return;
2181 auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2182 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2183 gpr_log(GPR_INFO,
2184 "chand=%p calld=%p: removing from resolver queued picks list",
2185 chand, this);
2186 }
2187 chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2188 queued_pending_resolver_result_ = false;
2189 // Lame the call combiner canceller.
2190 resolver_call_canceller_ = nullptr;
2191 }
2192
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2193 void ClientChannel::CallData::MaybeAddCallToResolverQueuedCallsLocked(
2194 grpc_call_element* elem) {
2195 if (queued_pending_resolver_result_) return;
2196 auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2197 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2198 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2199 chand, this);
2200 }
2201 queued_pending_resolver_result_ = true;
2202 resolver_queued_call_.elem = elem;
2203 chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2204 // Register call combiner cancellation callback.
2205 resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2206 }
2207
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2208 grpc_error_handle ClientChannel::CallData::ApplyServiceConfigToCallLocked(
2209 grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2210 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2211 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2212 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2213 chand, this);
2214 }
2215 ConfigSelector* config_selector = chand->config_selector_.get();
2216 if (config_selector != nullptr) {
2217 // Use the ConfigSelector to determine the config for the call.
2218 ConfigSelector::CallConfig call_config =
2219 config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2220 if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2221 on_call_committed_ = std::move(call_config.on_call_committed);
2222 // Create a ServiceConfigCallData for the call. This stores a ref to the
2223 // ServiceConfig and caches the right set of parsed configs to use for
2224 // the call. The MethodConfig will store itself in the call context,
2225 // so that it can be accessed by filters in the subchannel, and it
2226 // will be cleaned up when the call ends.
2227 auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2228 std::move(call_config.service_config), call_config.method_configs,
2229 std::move(call_config.call_attributes), call_context_);
2230 // Apply our own method params to the call.
2231 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
2232 service_config_call_data->GetMethodParsedConfig(
2233 internal::ClientChannelServiceConfigParser::ParserIndex()));
2234 if (method_params != nullptr) {
2235 // If the deadline from the service config is shorter than the one
2236 // from the client API, reset the deadline timer.
2237 if (chand->deadline_checking_enabled_ && method_params->timeout() != 0) {
2238 const grpc_millis per_method_deadline =
2239 grpc_cycle_counter_to_millis_round_up(call_start_time_) +
2240 method_params->timeout();
2241 if (per_method_deadline < deadline_) {
2242 deadline_ = per_method_deadline;
2243 grpc_deadline_state_reset(elem, deadline_);
2244 }
2245 }
2246 // If the service config set wait_for_ready and the application
2247 // did not explicitly set it, use the value from the service config.
2248 uint32_t* send_initial_metadata_flags =
2249 &pending_batches_[0]
2250 ->payload->send_initial_metadata.send_initial_metadata_flags;
2251 if (method_params->wait_for_ready().has_value() &&
2252 !(*send_initial_metadata_flags &
2253 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
2254 if (method_params->wait_for_ready().value()) {
2255 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2256 } else {
2257 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
2258 }
2259 }
2260 }
2261 // Set the dynamic filter stack.
2262 dynamic_filters_ = chand->dynamic_filters_;
2263 }
2264 return GRPC_ERROR_NONE;
2265 }
2266
2267 void ClientChannel::CallData::
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error_handle error)2268 RecvInitialMetadataReadyForConfigSelectorCommitCallback(
2269 void* arg, grpc_error_handle error) {
2270 auto* self = static_cast<CallData*>(arg);
2271 if (self->on_call_committed_ != nullptr) {
2272 self->on_call_committed_();
2273 self->on_call_committed_ = nullptr;
2274 }
2275 // Chain to original callback.
2276 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
2277 GRPC_ERROR_REF(error));
2278 }
2279
2280 // TODO(roth): Consider not intercepting this callback unless we
2281 // actually need to, if this causes a performance problem.
2282 void ClientChannel::CallData::
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)2283 InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
2284 grpc_transport_stream_op_batch* batch) {
2285 original_recv_initial_metadata_ready_ =
2286 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
2287 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
2288 RecvInitialMetadataReadyForConfigSelectorCommitCallback,
2289 this, nullptr);
2290 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
2291 &recv_initial_metadata_ready_;
2292 }
2293
AsyncResolutionDone(grpc_call_element * elem,grpc_error_handle error)2294 void ClientChannel::CallData::AsyncResolutionDone(grpc_call_element* elem,
2295 grpc_error_handle error) {
2296 GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
2297 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
2298 }
2299
ResolutionDone(void * arg,grpc_error_handle error)2300 void ClientChannel::CallData::ResolutionDone(void* arg,
2301 grpc_error_handle error) {
2302 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2303 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2304 CallData* calld = static_cast<CallData*>(elem->call_data);
2305 if (error != GRPC_ERROR_NONE) {
2306 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2307 gpr_log(GPR_INFO,
2308 "chand=%p calld=%p: error applying config to call: error=%s",
2309 chand, calld, grpc_error_std_string(error).c_str());
2310 }
2311 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
2312 return;
2313 }
2314 calld->CreateDynamicCall(elem);
2315 }
2316
CheckResolution(void * arg,grpc_error_handle error)2317 void ClientChannel::CallData::CheckResolution(void* arg,
2318 grpc_error_handle error) {
2319 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
2320 CallData* calld = static_cast<CallData*>(elem->call_data);
2321 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2322 bool resolution_complete;
2323 {
2324 MutexLock lock(&chand->resolution_mu_);
2325 resolution_complete = calld->CheckResolutionLocked(elem, &error);
2326 }
2327 if (resolution_complete) {
2328 ResolutionDone(elem, error);
2329 GRPC_ERROR_UNREF(error);
2330 }
2331 }
2332
CheckResolutionLocked(grpc_call_element * elem,grpc_error_handle * error)2333 bool ClientChannel::CallData::CheckResolutionLocked(grpc_call_element* elem,
2334 grpc_error_handle* error) {
2335 ClientChannel* chand = static_cast<ClientChannel*>(elem->channel_data);
2336 // If we're still in IDLE, we need to start resolving.
2337 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
2338 // Bounce into the control plane work serializer to start resolving,
2339 // in case we are still in IDLE state. Since we are holding on to the
2340 // resolution mutex here, we offload it on the ExecCtx so that we don't
2341 // deadlock with ourselves.
2342 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "CheckResolutionLocked");
2343 ExecCtx::Run(
2344 DEBUG_LOCATION,
2345 GRPC_CLOSURE_CREATE(
2346 [](void* arg, grpc_error_handle /*error*/) {
2347 auto* chand = static_cast<ClientChannel*>(arg);
2348 chand->work_serializer_->Run(
2349 [chand]()
2350 ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand->work_serializer_) {
2351 chand->CheckConnectivityState(/*try_to_connect=*/true);
2352 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack_,
2353 "CheckResolutionLocked");
2354 },
2355 DEBUG_LOCATION);
2356 },
2357 chand, nullptr),
2358 GRPC_ERROR_NONE);
2359 }
2360 // Get send_initial_metadata batch and flags.
2361 auto& send_initial_metadata =
2362 pending_batches_[0]->payload->send_initial_metadata;
2363 grpc_metadata_batch* initial_metadata_batch =
2364 send_initial_metadata.send_initial_metadata;
2365 const uint32_t send_initial_metadata_flags =
2366 send_initial_metadata.send_initial_metadata_flags;
2367 // If we don't yet have a resolver result, we need to queue the call
2368 // until we get one.
2369 if (GPR_UNLIKELY(!chand->received_service_config_data_)) {
2370 // If the resolver returned transient failure before returning the
2371 // first service config, fail any non-wait_for_ready calls.
2372 grpc_error_handle resolver_error = chand->resolver_transient_failure_error_;
2373 if (resolver_error != GRPC_ERROR_NONE &&
2374 (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
2375 0) {
2376 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2377 *error = GRPC_ERROR_REF(resolver_error);
2378 return true;
2379 }
2380 // Either the resolver has not yet returned a result, or it has
2381 // returned transient failure but the call is wait_for_ready. In
2382 // either case, queue the call.
2383 MaybeAddCallToResolverQueuedCallsLocked(elem);
2384 return false;
2385 }
2386 // Apply service config to call if not yet applied.
2387 if (GPR_LIKELY(!service_config_applied_)) {
2388 service_config_applied_ = true;
2389 *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
2390 }
2391 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
2392 return true;
2393 }
2394
CreateDynamicCall(grpc_call_element * elem)2395 void ClientChannel::CallData::CreateDynamicCall(grpc_call_element* elem) {
2396 auto* chand = static_cast<ClientChannel*>(elem->channel_data);
2397 DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
2398 pollent_,
2399 path_,
2400 call_start_time_,
2401 deadline_,
2402 arena_,
2403 call_context_,
2404 call_combiner_};
2405 grpc_error_handle error = GRPC_ERROR_NONE;
2406 DynamicFilters* channel_stack = args.channel_stack.get();
2407 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2408 gpr_log(
2409 GPR_INFO,
2410 "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
2411 chand, this, channel_stack);
2412 }
2413 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
2414 if (error != GRPC_ERROR_NONE) {
2415 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2416 gpr_log(GPR_INFO,
2417 "chand=%p calld=%p: failed to create dynamic call: error=%s",
2418 chand, this, grpc_error_std_string(error).c_str());
2419 }
2420 PendingBatchesFail(elem, error, YieldCallCombiner);
2421 return;
2422 }
2423 PendingBatchesResume(elem);
2424 }
2425
2426 //
2427 // ClientChannel::LoadBalancedCall::Metadata
2428 //
2429
2430 class ClientChannel::LoadBalancedCall::Metadata
2431 : public LoadBalancingPolicy::MetadataInterface {
2432 public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)2433 Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
2434 : lb_call_(lb_call), batch_(batch) {}
2435
Add(absl::string_view key,absl::string_view value)2436 void Add(absl::string_view key, absl::string_view value) override {
2437 grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
2438 lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
2439 linked_mdelem->md = grpc_mdelem_from_slices(
2440 ExternallyManagedSlice(key.data(), key.size()),
2441 ExternallyManagedSlice(value.data(), value.size()));
2442 GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
2443 GRPC_ERROR_NONE);
2444 }
2445
begin() const2446 iterator begin() const override {
2447 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
2448 "iterator size too large");
2449 return iterator(
2450 this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
2451 }
end() const2452 iterator end() const override {
2453 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
2454 "iterator size too large");
2455 return iterator(this, 0);
2456 }
2457
erase(iterator it)2458 iterator erase(iterator it) override {
2459 grpc_linked_mdelem* linked_mdelem =
2460 reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
2461 intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
2462 grpc_metadata_batch_remove(batch_, linked_mdelem);
2463 return iterator(this, handle);
2464 }
2465
2466 private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const2467 grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
2468 if (entry != nullptr && batch_->idx.named.path == entry) {
2469 return entry->next;
2470 }
2471 return entry;
2472 }
2473
IteratorHandleNext(intptr_t handle) const2474 intptr_t IteratorHandleNext(intptr_t handle) const override {
2475 grpc_linked_mdelem* linked_mdelem =
2476 reinterpret_cast<grpc_linked_mdelem*>(handle);
2477 return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
2478 }
2479
IteratorHandleGet(intptr_t handle) const2480 std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
2481 intptr_t handle) const override {
2482 grpc_linked_mdelem* linked_mdelem =
2483 reinterpret_cast<grpc_linked_mdelem*>(handle);
2484 return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
2485 StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
2486 }
2487
2488 LoadBalancedCall* lb_call_;
2489 grpc_metadata_batch* batch_;
2490 };
2491
2492 //
2493 // ClientChannel::LoadBalancedCall::LbCallState
2494 //
2495
2496 class ClientChannel::LoadBalancedCall::LbCallState
2497 : public LoadBalancingPolicy::CallState {
2498 public:
LbCallState(LoadBalancedCall * lb_call)2499 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
2500
Alloc(size_t size)2501 void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
2502
GetBackendMetricData()2503 const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
2504 override {
2505 if (lb_call_->backend_metric_data_ == nullptr) {
2506 grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
2507 .x_endpoint_load_metrics_bin;
2508 if (md != nullptr) {
2509 lb_call_->backend_metric_data_ =
2510 ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
2511 }
2512 }
2513 return lb_call_->backend_metric_data_;
2514 }
2515
ExperimentalGetCallAttribute(const char * key)2516 absl::string_view ExperimentalGetCallAttribute(const char* key) override {
2517 auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
2518 lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
2519 auto& call_attributes = service_config_call_data->call_attributes();
2520 auto it = call_attributes.find(key);
2521 if (it == call_attributes.end()) return absl::string_view();
2522 return it->second;
2523 }
2524
2525 private:
2526 LoadBalancedCall* lb_call_;
2527 };
2528
2529 //
2530 // LoadBalancedCall
2531 //
2532
LoadBalancedCall(ClientChannel * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,grpc_closure * on_call_destruction_complete)2533 ClientChannel::LoadBalancedCall::LoadBalancedCall(
2534 ClientChannel* chand, const grpc_call_element_args& args,
2535 grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete)
2536 : RefCounted(GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
2537 ? "LoadBalancedCall"
2538 : nullptr),
2539 chand_(chand),
2540 path_(grpc_slice_ref_internal(args.path)),
2541 call_start_time_(args.start_time),
2542 deadline_(args.deadline),
2543 arena_(args.arena),
2544 owning_call_(args.call_stack),
2545 call_combiner_(args.call_combiner),
2546 call_context_(args.context),
2547 pollent_(pollent),
2548 on_call_destruction_complete_(on_call_destruction_complete) {}
2549
~LoadBalancedCall()2550 ClientChannel::LoadBalancedCall::~LoadBalancedCall() {
2551 grpc_slice_unref_internal(path_);
2552 GRPC_ERROR_UNREF(cancel_error_);
2553 GRPC_ERROR_UNREF(failure_error_);
2554 if (backend_metric_data_ != nullptr) {
2555 backend_metric_data_
2556 ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
2557 }
2558 // Make sure there are no remaining pending batches.
2559 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2560 GPR_ASSERT(pending_batches_[i] == nullptr);
2561 }
2562 if (on_call_destruction_complete_ != nullptr) {
2563 ExecCtx::Run(DEBUG_LOCATION, on_call_destruction_complete_,
2564 GRPC_ERROR_NONE);
2565 }
2566 }
2567
GetBatchIndex(grpc_transport_stream_op_batch * batch)2568 size_t ClientChannel::LoadBalancedCall::GetBatchIndex(
2569 grpc_transport_stream_op_batch* batch) {
2570 // Note: It is important the send_initial_metadata be the first entry
2571 // here, since the code in PickSubchannelLocked() assumes it will be.
2572 if (batch->send_initial_metadata) return 0;
2573 if (batch->send_message) return 1;
2574 if (batch->send_trailing_metadata) return 2;
2575 if (batch->recv_initial_metadata) return 3;
2576 if (batch->recv_message) return 4;
2577 if (batch->recv_trailing_metadata) return 5;
2578 GPR_UNREACHABLE_CODE(return (size_t)-1);
2579 }
2580
2581 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)2582 void ClientChannel::LoadBalancedCall::PendingBatchesAdd(
2583 grpc_transport_stream_op_batch* batch) {
2584 const size_t idx = GetBatchIndex(batch);
2585 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2586 gpr_log(GPR_INFO,
2587 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
2588 chand_, this, idx);
2589 }
2590 GPR_ASSERT(pending_batches_[idx] == nullptr);
2591 pending_batches_[idx] = batch;
2592 }
2593
2594 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2595 void ClientChannel::LoadBalancedCall::FailPendingBatchInCallCombiner(
2596 void* arg, grpc_error_handle error) {
2597 grpc_transport_stream_op_batch* batch =
2598 static_cast<grpc_transport_stream_op_batch*>(arg);
2599 auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
2600 // Note: This will release the call combiner.
2601 grpc_transport_stream_op_batch_finish_with_failure(
2602 batch, GRPC_ERROR_REF(error), self->call_combiner_);
2603 }
2604
2605 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error,YieldCallCombinerPredicate yield_call_combiner_predicate)2606 void ClientChannel::LoadBalancedCall::PendingBatchesFail(
2607 grpc_error_handle error,
2608 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2609 GPR_ASSERT(error != GRPC_ERROR_NONE);
2610 GRPC_ERROR_UNREF(failure_error_);
2611 failure_error_ = error;
2612 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2613 size_t num_batches = 0;
2614 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2615 if (pending_batches_[i] != nullptr) ++num_batches;
2616 }
2617 gpr_log(GPR_INFO,
2618 "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
2619 chand_, this, num_batches, grpc_error_std_string(error).c_str());
2620 }
2621 CallCombinerClosureList closures;
2622 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2623 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2624 if (batch != nullptr) {
2625 batch->handler_private.extra_arg = this;
2626 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2627 FailPendingBatchInCallCombiner, batch,
2628 grpc_schedule_on_exec_ctx);
2629 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2630 "PendingBatchesFail");
2631 batch = nullptr;
2632 }
2633 }
2634 if (yield_call_combiner_predicate(closures)) {
2635 closures.RunClosures(call_combiner_);
2636 } else {
2637 closures.RunClosuresWithoutYielding(call_combiner_);
2638 }
2639 }
2640
2641 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error_handle)2642 void ClientChannel::LoadBalancedCall::ResumePendingBatchInCallCombiner(
2643 void* arg, grpc_error_handle /*ignored*/) {
2644 grpc_transport_stream_op_batch* batch =
2645 static_cast<grpc_transport_stream_op_batch*>(arg);
2646 SubchannelCall* subchannel_call =
2647 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
2648 // Note: This will release the call combiner.
2649 subchannel_call->StartTransportStreamOpBatch(batch);
2650 }
2651
2652 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()2653 void ClientChannel::LoadBalancedCall::PendingBatchesResume() {
2654 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2655 size_t num_batches = 0;
2656 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2657 if (pending_batches_[i] != nullptr) ++num_batches;
2658 }
2659 gpr_log(GPR_INFO,
2660 "chand=%p lb_call=%p: starting %" PRIuPTR
2661 " pending batches on subchannel_call=%p",
2662 chand_, this, num_batches, subchannel_call_.get());
2663 }
2664 CallCombinerClosureList closures;
2665 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2666 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2667 if (batch != nullptr) {
2668 batch->handler_private.extra_arg = subchannel_call_.get();
2669 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2670 ResumePendingBatchInCallCombiner, batch,
2671 grpc_schedule_on_exec_ctx);
2672 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2673 "PendingBatchesResume");
2674 batch = nullptr;
2675 }
2676 }
2677 // Note: This will release the call combiner.
2678 closures.RunClosures(call_combiner_);
2679 }
2680
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)2681 void ClientChannel::LoadBalancedCall::StartTransportStreamOpBatch(
2682 grpc_transport_stream_op_batch* batch) {
2683 // Intercept recv_trailing_metadata_ready for LB callback.
2684 if (batch->recv_trailing_metadata) {
2685 InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
2686 }
2687 // If we've previously been cancelled, immediately fail any new batches.
2688 if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
2689 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2690 gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
2691 chand_, this, grpc_error_std_string(cancel_error_).c_str());
2692 }
2693 // Note: This will release the call combiner.
2694 grpc_transport_stream_op_batch_finish_with_failure(
2695 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2696 return;
2697 }
2698 // Handle cancellation.
2699 if (GPR_UNLIKELY(batch->cancel_stream)) {
2700 // Stash a copy of cancel_error in our call data, so that we can use
2701 // it for subsequent operations. This ensures that if the call is
2702 // cancelled before any batches are passed down (e.g., if the deadline
2703 // is in the past when the call starts), we can return the right
2704 // error to the caller when the first batch does get passed down.
2705 GRPC_ERROR_UNREF(cancel_error_);
2706 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2707 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2708 gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
2709 chand_, this, grpc_error_std_string(cancel_error_).c_str());
2710 }
2711 // If we do not have a subchannel call (i.e., a pick has not yet
2712 // been started), fail all pending batches. Otherwise, send the
2713 // cancellation down to the subchannel call.
2714 if (subchannel_call_ == nullptr) {
2715 PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
2716 // Note: This will release the call combiner.
2717 grpc_transport_stream_op_batch_finish_with_failure(
2718 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
2719 } else {
2720 // Note: This will release the call combiner.
2721 subchannel_call_->StartTransportStreamOpBatch(batch);
2722 }
2723 return;
2724 }
2725 // Add the batch to the pending list.
2726 PendingBatchesAdd(batch);
2727 // Check if we've already gotten a subchannel call.
2728 // Note that once we have picked a subchannel, we do not need to acquire
2729 // the channel's data plane mutex, which is more efficient (especially for
2730 // streaming calls).
2731 if (subchannel_call_ != nullptr) {
2732 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2733 gpr_log(GPR_INFO,
2734 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
2735 chand_, this, subchannel_call_.get());
2736 }
2737 PendingBatchesResume();
2738 return;
2739 }
2740 // We do not yet have a subchannel call.
2741 // For batches containing a send_initial_metadata op, acquire the
2742 // channel's data plane mutex to pick a subchannel.
2743 if (GPR_LIKELY(batch->send_initial_metadata)) {
2744 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2745 gpr_log(GPR_INFO,
2746 "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
2747 chand_, this);
2748 }
2749 PickSubchannel(this, GRPC_ERROR_NONE);
2750 } else {
2751 // For all other batches, release the call combiner.
2752 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2753 gpr_log(GPR_INFO,
2754 "chand=%p lb_call=%p: saved batch, yielding call combiner",
2755 chand_, this);
2756 }
2757 GRPC_CALL_COMBINER_STOP(call_combiner_,
2758 "batch does not include send_initial_metadata");
2759 }
2760 }
2761
2762 void ClientChannel::LoadBalancedCall::
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error_handle error)2763 RecvTrailingMetadataReadyForLoadBalancingPolicy(void* arg,
2764 grpc_error_handle error) {
2765 auto* self = static_cast<LoadBalancedCall*>(arg);
2766 if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
2767 // Set error if call did not succeed.
2768 grpc_error_handle error_for_lb = GRPC_ERROR_NONE;
2769 if (error != GRPC_ERROR_NONE) {
2770 error_for_lb = error;
2771 } else {
2772 const auto& fields = self->recv_trailing_metadata_->idx.named;
2773 GPR_ASSERT(fields.grpc_status != nullptr);
2774 grpc_status_code status =
2775 grpc_get_status_code_from_metadata(fields.grpc_status->md);
2776 std::string msg;
2777 if (status != GRPC_STATUS_OK) {
2778 error_for_lb = grpc_error_set_int(
2779 GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
2780 GRPC_ERROR_INT_GRPC_STATUS, status);
2781 if (fields.grpc_message != nullptr) {
2782 error_for_lb = grpc_error_set_str(
2783 error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
2784 grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
2785 }
2786 }
2787 }
2788 // Invoke callback to LB policy.
2789 Metadata trailing_metadata(self, self->recv_trailing_metadata_);
2790 LbCallState lb_call_state(self);
2791 self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
2792 &lb_call_state);
2793 if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
2794 }
2795 // Chain to original callback.
2796 if (self->failure_error_ != GRPC_ERROR_NONE) {
2797 error = self->failure_error_;
2798 self->failure_error_ = GRPC_ERROR_NONE;
2799 } else {
2800 error = GRPC_ERROR_REF(error);
2801 }
2802 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
2803 error);
2804 }
2805
2806 void ClientChannel::LoadBalancedCall::
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)2807 InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
2808 grpc_transport_stream_op_batch* batch) {
2809 recv_trailing_metadata_ =
2810 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
2811 original_recv_trailing_metadata_ready_ =
2812 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
2813 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
2814 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
2815 grpc_schedule_on_exec_ctx);
2816 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
2817 &recv_trailing_metadata_ready_;
2818 }
2819
CreateSubchannelCall()2820 void ClientChannel::LoadBalancedCall::CreateSubchannelCall() {
2821 SubchannelCall::Args call_args = {
2822 std::move(connected_subchannel_), pollent_, path_, call_start_time_,
2823 deadline_, arena_,
2824 // TODO(roth): When we implement hedging support, we will probably
2825 // need to use a separate call context for each subchannel call.
2826 call_context_, call_combiner_};
2827 grpc_error_handle error = GRPC_ERROR_NONE;
2828 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
2829 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2830 gpr_log(GPR_INFO,
2831 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
2832 this, subchannel_call_.get(), grpc_error_std_string(error).c_str());
2833 }
2834 if (on_call_destruction_complete_ != nullptr) {
2835 subchannel_call_->SetAfterCallStackDestroy(on_call_destruction_complete_);
2836 on_call_destruction_complete_ = nullptr;
2837 }
2838 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
2839 PendingBatchesFail(error, YieldCallCombiner);
2840 } else {
2841 PendingBatchesResume();
2842 }
2843 }
2844
2845 // A class to handle the call combiner cancellation callback for a
2846 // queued pick.
2847 // TODO(roth): When we implement hedging support, we won't be able to
2848 // register a call combiner cancellation closure for each LB pick,
2849 // because there may be multiple LB picks happening in parallel.
2850 // Instead, we will probably need to maintain a list in the CallData
2851 // object of pending LB picks to be cancelled when the closure runs.
2852 class ClientChannel::LoadBalancedCall::LbQueuedCallCanceller {
2853 public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)2854 explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
2855 : lb_call_(std::move(lb_call)) {
2856 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
2857 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
2858 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
2859 }
2860
2861 private:
CancelLocked(void * arg,grpc_error_handle error)2862 static void CancelLocked(void* arg, grpc_error_handle error) {
2863 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
2864 auto* lb_call = self->lb_call_.get();
2865 auto* chand = lb_call->chand_;
2866 {
2867 MutexLock lock(&chand->data_plane_mu_);
2868 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2869 gpr_log(GPR_INFO,
2870 "chand=%p lb_call=%p: cancelling queued pick: "
2871 "error=%s self=%p calld->pick_canceller=%p",
2872 chand, lb_call, grpc_error_std_string(error).c_str(), self,
2873 lb_call->lb_call_canceller_);
2874 }
2875 if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2876 // Remove pick from list of queued picks.
2877 lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
2878 // Fail pending batches on the call.
2879 lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
2880 YieldCallCombinerIfPendingBatchesFound);
2881 }
2882 }
2883 GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
2884 delete self;
2885 }
2886
2887 RefCountedPtr<LoadBalancedCall> lb_call_;
2888 grpc_closure closure_;
2889 };
2890
MaybeRemoveCallFromLbQueuedCallsLocked()2891 void ClientChannel::LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
2892 if (!queued_pending_lb_pick_) return;
2893 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2894 gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
2895 chand_, this);
2896 }
2897 chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
2898 queued_pending_lb_pick_ = false;
2899 // Lame the call combiner canceller.
2900 lb_call_canceller_ = nullptr;
2901 }
2902
MaybeAddCallToLbQueuedCallsLocked()2903 void ClientChannel::LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
2904 if (queued_pending_lb_pick_) return;
2905 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2906 gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
2907 chand_, this);
2908 }
2909 queued_pending_lb_pick_ = true;
2910 queued_call_.lb_call = this;
2911 chand_->AddLbQueuedCall(&queued_call_, pollent_);
2912 // Register call combiner cancellation callback.
2913 lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
2914 }
2915
AsyncPickDone(grpc_error_handle error)2916 void ClientChannel::LoadBalancedCall::AsyncPickDone(grpc_error_handle error) {
2917 GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
2918 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
2919 }
2920
PickDone(void * arg,grpc_error_handle error)2921 void ClientChannel::LoadBalancedCall::PickDone(void* arg,
2922 grpc_error_handle error) {
2923 auto* self = static_cast<LoadBalancedCall*>(arg);
2924 if (error != GRPC_ERROR_NONE) {
2925 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2926 gpr_log(GPR_INFO,
2927 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
2928 self->chand_, self, grpc_error_std_string(error).c_str());
2929 }
2930 self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
2931 return;
2932 }
2933 self->CreateSubchannelCall();
2934 }
2935
2936 namespace {
2937
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)2938 const char* PickResultTypeName(
2939 LoadBalancingPolicy::PickResult::ResultType type) {
2940 switch (type) {
2941 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
2942 return "COMPLETE";
2943 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
2944 return "QUEUE";
2945 case LoadBalancingPolicy::PickResult::PICK_FAILED:
2946 return "FAILED";
2947 }
2948 GPR_UNREACHABLE_CODE(return "UNKNOWN");
2949 }
2950
2951 } // namespace
2952
PickSubchannel(void * arg,grpc_error_handle error)2953 void ClientChannel::LoadBalancedCall::PickSubchannel(void* arg,
2954 grpc_error_handle error) {
2955 auto* self = static_cast<LoadBalancedCall*>(arg);
2956 bool pick_complete;
2957 {
2958 MutexLock lock(&self->chand_->data_plane_mu_);
2959 pick_complete = self->PickSubchannelLocked(&error);
2960 }
2961 if (pick_complete) {
2962 PickDone(self, error);
2963 GRPC_ERROR_UNREF(error);
2964 }
2965 }
2966
PickSubchannelLocked(grpc_error_handle * error)2967 bool ClientChannel::LoadBalancedCall::PickSubchannelLocked(
2968 grpc_error_handle* error) {
2969 GPR_ASSERT(connected_subchannel_ == nullptr);
2970 GPR_ASSERT(subchannel_call_ == nullptr);
2971 // Grab initial metadata.
2972 auto& send_initial_metadata =
2973 pending_batches_[0]->payload->send_initial_metadata;
2974 grpc_metadata_batch* initial_metadata_batch =
2975 send_initial_metadata.send_initial_metadata;
2976 const uint32_t send_initial_metadata_flags =
2977 send_initial_metadata.send_initial_metadata_flags;
2978 // Perform LB pick.
2979 LoadBalancingPolicy::PickArgs pick_args;
2980 pick_args.path = StringViewFromSlice(path_);
2981 LbCallState lb_call_state(this);
2982 pick_args.call_state = &lb_call_state;
2983 Metadata initial_metadata(this, initial_metadata_batch);
2984 pick_args.initial_metadata = &initial_metadata;
2985 auto result = chand_->picker_->Pick(pick_args);
2986 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2987 gpr_log(
2988 GPR_INFO,
2989 "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
2990 chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
2991 grpc_error_std_string(result.error).c_str());
2992 }
2993 switch (result.type) {
2994 case LoadBalancingPolicy::PickResult::PICK_FAILED: {
2995 // If we're shutting down, fail all RPCs.
2996 grpc_error_handle disconnect_error = chand_->disconnect_error();
2997 if (disconnect_error != GRPC_ERROR_NONE) {
2998 GRPC_ERROR_UNREF(result.error);
2999 MaybeRemoveCallFromLbQueuedCallsLocked();
3000 *error = GRPC_ERROR_REF(disconnect_error);
3001 return true;
3002 }
3003 // If wait_for_ready is false, then the error indicates the RPC
3004 // attempt's final status.
3005 if ((send_initial_metadata_flags &
3006 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
3007 grpc_error_handle new_error =
3008 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
3009 "Failed to pick subchannel", &result.error, 1);
3010 GRPC_ERROR_UNREF(result.error);
3011 *error = new_error;
3012 MaybeRemoveCallFromLbQueuedCallsLocked();
3013 return true;
3014 }
3015 // If wait_for_ready is true, then queue to retry when we get a new
3016 // picker.
3017 GRPC_ERROR_UNREF(result.error);
3018 }
3019 // Fallthrough
3020 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
3021 MaybeAddCallToLbQueuedCallsLocked();
3022 return false;
3023 default: // PICK_COMPLETE
3024 MaybeRemoveCallFromLbQueuedCallsLocked();
3025 // Handle drops.
3026 if (GPR_UNLIKELY(result.subchannel == nullptr)) {
3027 result.error = grpc_error_set_int(
3028 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
3029 "Call dropped by load balancing policy"),
3030 GRPC_ERROR_INT_GRPC_STATUS,
3031 GRPC_STATUS_UNAVAILABLE),
3032 GRPC_ERROR_INT_LB_POLICY_DROP, 1);
3033 } else {
3034 // Grab a ref to the connected subchannel while we're still
3035 // holding the data plane mutex.
3036 connected_subchannel_ =
3037 chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
3038 GPR_ASSERT(connected_subchannel_ != nullptr);
3039 }
3040 lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
3041 *error = result.error;
3042 return true;
3043 }
3044 }
3045
3046 } // namespace grpc_core
3047