1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/client_channel.h"
20
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <stdbool.h>
24 #include <stdio.h>
25 #include <string.h>
26
27 #include <set>
28
29 #include "absl/strings/numbers.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/strings/str_join.h"
32 #include "absl/strings/string_view.h"
33
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/sync.h>
38
39 #include "absl/container/inlined_vector.h"
40 #include "absl/types/optional.h"
41
42 #include "src/core/ext/filters/client_channel/backend_metric.h"
43 #include "src/core/ext/filters/client_channel/backup_poller.h"
44 #include "src/core/ext/filters/client_channel/config_selector.h"
45 #include "src/core/ext/filters/client_channel/dynamic_filters.h"
46 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
47 #include "src/core/ext/filters/client_channel/http_connect_handshaker.h"
48 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
49 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
50 #include "src/core/ext/filters/client_channel/local_subchannel_pool.h"
51 #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
52 #include "src/core/ext/filters/client_channel/resolver_registry.h"
53 #include "src/core/ext/filters/client_channel/resolver_result_parsing.h"
54 #include "src/core/ext/filters/client_channel/retry_throttle.h"
55 #include "src/core/ext/filters/client_channel/service_config.h"
56 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
57 #include "src/core/ext/filters/client_channel/subchannel.h"
58 #include "src/core/ext/filters/deadline/deadline_filter.h"
59 #include "src/core/lib/backoff/backoff.h"
60 #include "src/core/lib/channel/channel_args.h"
61 #include "src/core/lib/channel/connected_channel.h"
62 #include "src/core/lib/channel/status_util.h"
63 #include "src/core/lib/gpr/string.h"
64 #include "src/core/lib/gprpp/manual_constructor.h"
65 #include "src/core/lib/gprpp/sync.h"
66 #include "src/core/lib/iomgr/iomgr.h"
67 #include "src/core/lib/iomgr/polling_entity.h"
68 #include "src/core/lib/iomgr/work_serializer.h"
69 #include "src/core/lib/profiling/timers.h"
70 #include "src/core/lib/slice/slice_internal.h"
71 #include "src/core/lib/slice/slice_string_helpers.h"
72 #include "src/core/lib/surface/channel.h"
73 #include "src/core/lib/transport/connectivity_state.h"
74 #include "src/core/lib/transport/error_utils.h"
75 #include "src/core/lib/transport/metadata.h"
76 #include "src/core/lib/transport/metadata_batch.h"
77 #include "src/core/lib/transport/static_metadata.h"
78 #include "src/core/lib/transport/status_metadata.h"
79
80 //
81 // Client channel filter
82 //
83
84 // By default, we buffer 256 KiB per RPC for retries.
85 // TODO(roth): Do we have any data to suggest a better value?
86 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
87
88 // This value was picked arbitrarily. It can be changed if there is
89 // any even moderately compelling reason to do so.
90 #define RETRY_BACKOFF_JITTER 0.2
91
92 // Max number of batches that can be pending on a call at any given
93 // time. This includes one batch for each of the following ops:
94 // recv_initial_metadata
95 // send_initial_metadata
96 // recv_message
97 // send_message
98 // recv_trailing_metadata
99 // send_trailing_metadata
100 #define MAX_PENDING_BATCHES 6
101
102 // Channel arg containing a pointer to the ChannelData object.
103 #define GRPC_ARG_CLIENT_CHANNEL_DATA "grpc.internal.client_channel_data"
104
105 // Channel arg containing a pointer to the RetryThrottleData object.
106 #define GRPC_ARG_RETRY_THROTTLE_DATA "grpc.internal.retry_throttle_data"
107
108 namespace grpc_core {
109
110 using internal::ClientChannelGlobalParsedConfig;
111 using internal::ClientChannelMethodParsedConfig;
112 using internal::ClientChannelServiceConfigParser;
113 using internal::ServerRetryThrottleData;
114
115 TraceFlag grpc_client_channel_call_trace(false, "client_channel_call");
116 TraceFlag grpc_client_channel_routing_trace(false, "client_channel_routing");
117
118 namespace {
119
120 //
121 // ChannelData definition
122 //
123
124 class LoadBalancedCall;
125
126 class ChannelData {
127 public:
128 struct ResolverQueuedCall {
129 grpc_call_element* elem;
130 ResolverQueuedCall* next = nullptr;
131 };
132 struct LbQueuedCall {
133 LoadBalancedCall* lb_call;
134 LbQueuedCall* next = nullptr;
135 };
136
137 static grpc_error* Init(grpc_channel_element* elem,
138 grpc_channel_element_args* args);
139 static void Destroy(grpc_channel_element* elem);
140 static void StartTransportOp(grpc_channel_element* elem,
141 grpc_transport_op* op);
142 static void GetChannelInfo(grpc_channel_element* elem,
143 const grpc_channel_info* info);
144
deadline_checking_enabled() const145 bool deadline_checking_enabled() const { return deadline_checking_enabled_; }
enable_retries() const146 bool enable_retries() const { return enable_retries_; }
per_rpc_retry_buffer_size() const147 size_t per_rpc_retry_buffer_size() const {
148 return per_rpc_retry_buffer_size_;
149 }
owning_stack() const150 grpc_channel_stack* owning_stack() const { return owning_stack_; }
151
152 // Note: Does NOT return a new ref.
disconnect_error() const153 grpc_error* disconnect_error() const {
154 return disconnect_error_.Load(MemoryOrder::ACQUIRE);
155 }
156
resolution_mu() const157 Mutex* resolution_mu() const { return &resolution_mu_; }
158 // These methods all require holding resolution_mu_.
159 void AddResolverQueuedCall(ResolverQueuedCall* call,
160 grpc_polling_entity* pollent);
161 void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
162 grpc_polling_entity* pollent);
received_service_config_data() const163 bool received_service_config_data() const {
164 return received_service_config_data_;
165 }
resolver_transient_failure_error() const166 grpc_error* resolver_transient_failure_error() const {
167 return resolver_transient_failure_error_;
168 }
service_config() const169 RefCountedPtr<ServiceConfig> service_config() const {
170 return service_config_;
171 }
config_selector() const172 ConfigSelector* config_selector() const { return config_selector_.get(); }
dynamic_filters() const173 RefCountedPtr<DynamicFilters> dynamic_filters() const {
174 return dynamic_filters_;
175 }
176
data_plane_mu() const177 Mutex* data_plane_mu() const { return &data_plane_mu_; }
178 // These methods all require holding data_plane_mu_.
picker() const179 LoadBalancingPolicy::SubchannelPicker* picker() const {
180 return picker_.get();
181 }
182 void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent);
183 void RemoveLbQueuedCall(LbQueuedCall* to_remove,
184 grpc_polling_entity* pollent);
185 RefCountedPtr<ConnectedSubchannel> GetConnectedSubchannelInDataPlane(
186 SubchannelInterface* subchannel) const;
187
work_serializer() const188 WorkSerializer* work_serializer() const { return work_serializer_.get(); }
189
190 grpc_connectivity_state CheckConnectivityState(bool try_to_connect);
191
AddExternalConnectivityWatcher(grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)192 void AddExternalConnectivityWatcher(grpc_polling_entity pollent,
193 grpc_connectivity_state* state,
194 grpc_closure* on_complete,
195 grpc_closure* watcher_timer_init) {
196 new ExternalConnectivityWatcher(this, pollent, state, on_complete,
197 watcher_timer_init);
198 }
199
RemoveExternalConnectivityWatcher(grpc_closure * on_complete,bool cancel)200 void RemoveExternalConnectivityWatcher(grpc_closure* on_complete,
201 bool cancel) {
202 ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap(
203 this, on_complete, cancel);
204 }
205
NumExternalConnectivityWatchers() const206 int NumExternalConnectivityWatchers() const {
207 MutexLock lock(&external_watchers_mu_);
208 return static_cast<int>(external_watchers_.size());
209 }
210
211 void AddConnectivityWatcher(
212 grpc_connectivity_state initial_state,
213 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
214 void RemoveConnectivityWatcher(
215 AsyncConnectivityStateWatcherInterface* watcher);
216
217 private:
218 class SubchannelWrapper;
219 class ClientChannelControlHelper;
220 class ConnectivityWatcherAdder;
221 class ConnectivityWatcherRemover;
222
223 // Represents a pending connectivity callback from an external caller
224 // via grpc_client_channel_watch_connectivity_state().
225 class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface {
226 public:
227 ExternalConnectivityWatcher(ChannelData* chand, grpc_polling_entity pollent,
228 grpc_connectivity_state* state,
229 grpc_closure* on_complete,
230 grpc_closure* watcher_timer_init);
231
232 ~ExternalConnectivityWatcher() override;
233
234 // Removes the watcher from the external_watchers_ map.
235 static void RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
236 grpc_closure* on_complete,
237 bool cancel);
238
239 void Notify(grpc_connectivity_state state,
240 const absl::Status& /* status */) override;
241
242 void Cancel();
243
244 private:
245 // Adds the watcher to state_tracker_. Consumes the ref that is passed to it
246 // from Start().
247 void AddWatcherLocked();
248 void RemoveWatcherLocked();
249
250 ChannelData* chand_;
251 grpc_polling_entity pollent_;
252 grpc_connectivity_state initial_state_;
253 grpc_connectivity_state* state_;
254 grpc_closure* on_complete_;
255 grpc_closure* watcher_timer_init_;
256 Atomic<bool> done_{false};
257 };
258
259 class ResolverResultHandler : public Resolver::ResultHandler {
260 public:
ResolverResultHandler(ChannelData * chand)261 explicit ResolverResultHandler(ChannelData* chand) : chand_(chand) {
262 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ResolverResultHandler");
263 }
264
~ResolverResultHandler()265 ~ResolverResultHandler() override {
266 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
267 gpr_log(GPR_INFO, "chand=%p: resolver shutdown complete", chand_);
268 }
269 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ResolverResultHandler");
270 }
271
ReturnResult(Resolver::Result result)272 void ReturnResult(Resolver::Result result) override {
273 chand_->OnResolverResultChangedLocked(std::move(result));
274 }
275
ReturnError(grpc_error * error)276 void ReturnError(grpc_error* error) override {
277 chand_->OnResolverErrorLocked(error);
278 }
279
280 private:
281 ChannelData* chand_;
282 };
283
284 ChannelData(grpc_channel_element_args* args, grpc_error** error);
285 ~ChannelData();
286
287 // Note: All methods with "Locked" suffix must be invoked from within
288 // work_serializer_.
289
290 void OnResolverResultChangedLocked(Resolver::Result result);
291 void OnResolverErrorLocked(grpc_error* error);
292
293 void CreateOrUpdateLbPolicyLocked(
294 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
295 Resolver::Result result);
296 OrphanablePtr<LoadBalancingPolicy> CreateLbPolicyLocked(
297 const grpc_channel_args& args);
298
299 void UpdateStateAndPickerLocked(
300 grpc_connectivity_state state, const absl::Status& status,
301 const char* reason,
302 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker);
303
304 void UpdateServiceConfigInControlPlaneLocked(
305 RefCountedPtr<ServiceConfig> service_config,
306 RefCountedPtr<ConfigSelector> config_selector,
307 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
308 const char* lb_policy_name);
309
310 void UpdateServiceConfigInDataPlaneLocked();
311
312 void CreateResolverLocked();
313 void DestroyResolverAndLbPolicyLocked();
314
315 grpc_error* DoPingLocked(grpc_transport_op* op);
316
317 void StartTransportOpLocked(grpc_transport_op* op);
318
319 void TryToConnectLocked();
320
321 //
322 // Fields set at construction and never modified.
323 //
324 const bool deadline_checking_enabled_;
325 const bool enable_retries_;
326 const size_t per_rpc_retry_buffer_size_;
327 grpc_channel_stack* owning_stack_;
328 ClientChannelFactory* client_channel_factory_;
329 const grpc_channel_args* channel_args_;
330 RefCountedPtr<ServiceConfig> default_service_config_;
331 std::string server_name_;
332 UniquePtr<char> target_uri_;
333 channelz::ChannelNode* channelz_node_;
334
335 //
336 // Fields related to name resolution. Guarded by resolution_mu_.
337 //
338 mutable Mutex resolution_mu_;
339 // Linked list of calls queued waiting for resolver result.
340 ResolverQueuedCall* resolver_queued_calls_ = nullptr;
341 // Data from service config.
342 grpc_error* resolver_transient_failure_error_ = GRPC_ERROR_NONE;
343 bool received_service_config_data_ = false;
344 RefCountedPtr<ServiceConfig> service_config_;
345 RefCountedPtr<ConfigSelector> config_selector_;
346 RefCountedPtr<DynamicFilters> dynamic_filters_;
347
348 //
349 // Fields used in the data plane. Guarded by data_plane_mu_.
350 //
351 mutable Mutex data_plane_mu_;
352 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker_;
353 // Linked list of calls queued waiting for LB pick.
354 LbQueuedCall* lb_queued_calls_ = nullptr;
355
356 //
357 // Fields used in the control plane. Guarded by work_serializer.
358 //
359 std::shared_ptr<WorkSerializer> work_serializer_;
360 grpc_pollset_set* interested_parties_;
361 ConnectivityStateTracker state_tracker_;
362 OrphanablePtr<Resolver> resolver_;
363 bool previous_resolution_contained_addresses_ = false;
364 RefCountedPtr<ServiceConfig> saved_service_config_;
365 RefCountedPtr<ConfigSelector> saved_config_selector_;
366 absl::optional<std::string> health_check_service_name_;
367 OrphanablePtr<LoadBalancingPolicy> lb_policy_;
368 RefCountedPtr<SubchannelPoolInterface> subchannel_pool_;
369 // The number of SubchannelWrapper instances referencing a given Subchannel.
370 std::map<Subchannel*, int> subchannel_refcount_map_;
371 // The set of SubchannelWrappers that currently exist.
372 // No need to hold a ref, since the map is updated in the control-plane
373 // work_serializer when the SubchannelWrappers are created and destroyed.
374 std::set<SubchannelWrapper*> subchannel_wrappers_;
375 // Pending ConnectedSubchannel updates for each SubchannelWrapper.
376 // Updates are queued here in the control plane work_serializer and then
377 // applied in the data plane mutex when the picker is updated.
378 std::map<RefCountedPtr<SubchannelWrapper>, RefCountedPtr<ConnectedSubchannel>>
379 pending_subchannel_updates_;
380 int keepalive_time_ = -1;
381
382 //
383 // Fields accessed from both data plane mutex and control plane
384 // work_serializer.
385 //
386 Atomic<grpc_error*> disconnect_error_;
387
388 //
389 // Fields guarded by a mutex, since they need to be accessed
390 // synchronously via get_channel_info().
391 //
392 Mutex info_mu_;
393 UniquePtr<char> info_lb_policy_name_;
394 UniquePtr<char> info_service_config_json_;
395
396 //
397 // Fields guarded by a mutex, since they need to be accessed
398 // synchronously via grpc_channel_num_external_connectivity_watchers().
399 //
400 mutable Mutex external_watchers_mu_;
401 std::map<grpc_closure*, RefCountedPtr<ExternalConnectivityWatcher>>
402 external_watchers_;
403 };
404
405 //
406 // CallData definition
407 //
408
409 class CallData {
410 public:
411 static grpc_error* Init(grpc_call_element* elem,
412 const grpc_call_element_args* args);
413 static void Destroy(grpc_call_element* elem,
414 const grpc_call_final_info* final_info,
415 grpc_closure* then_schedule_closure);
416 static void StartTransportStreamOpBatch(
417 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
418 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
419
420 // Invoked by channel for queued calls when name resolution is completed.
421 static void CheckResolution(void* arg, grpc_error* error);
422 // Helper function for applying the service config to a call while
423 // holding ChannelData::resolution_mu_.
424 // Returns true if the service config has been applied to the call, in which
425 // case the caller must invoke ResolutionDone() or AsyncResolutionDone()
426 // with the returned error.
427 bool CheckResolutionLocked(grpc_call_element* elem, grpc_error** error);
428 // Schedules a callback to continue processing the call once
429 // resolution is complete. The callback will not run until after this
430 // method returns.
431 void AsyncResolutionDone(grpc_call_element* elem, grpc_error* error);
432
433 private:
434 class ResolverQueuedCallCanceller;
435
436 CallData(grpc_call_element* elem, const ChannelData& chand,
437 const grpc_call_element_args& args);
438 ~CallData();
439
440 // Returns the index into pending_batches_ to be used for batch.
441 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
442 void PendingBatchesAdd(grpc_call_element* elem,
443 grpc_transport_stream_op_batch* batch);
444 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
445 // A predicate type and some useful implementations for PendingBatchesFail().
446 typedef bool (*YieldCallCombinerPredicate)(
447 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)448 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
449 return true;
450 }
NoYieldCallCombiner(const CallCombinerClosureList &)451 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
452 return false;
453 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)454 static bool YieldCallCombinerIfPendingBatchesFound(
455 const CallCombinerClosureList& closures) {
456 return closures.size() > 0;
457 }
458 // Fails all pending batches.
459 // If yield_call_combiner_predicate returns true, assumes responsibility for
460 // yielding the call combiner.
461 void PendingBatchesFail(
462 grpc_call_element* elem, grpc_error* error,
463 YieldCallCombinerPredicate yield_call_combiner_predicate);
464 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
465 // Resumes all pending batches on lb_call_.
466 void PendingBatchesResume(grpc_call_element* elem);
467
468 // Applies service config to the call. Must be invoked once we know
469 // that the resolver has returned results to the channel.
470 // If an error is returned, the error indicates the status with which
471 // the call should be failed.
472 grpc_error* ApplyServiceConfigToCallLocked(
473 grpc_call_element* elem, grpc_metadata_batch* initial_metadata);
474 // Invoked when the resolver result is applied to the caller, on both
475 // success or failure.
476 static void ResolutionDone(void* arg, grpc_error* error);
477 // Removes the call (if present) from the channel's list of calls queued
478 // for name resolution.
479 void MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element* elem);
480 // Adds the call (if not already present) to the channel's list of
481 // calls queued for name resolution.
482 void MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element* elem);
483
484 static void RecvInitialMetadataReadyForConfigSelectorCommitCallback(
485 void* arg, grpc_error* error);
486 void InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
487 grpc_transport_stream_op_batch* batch);
488
489 void CreateDynamicCall(grpc_call_element* elem);
490
491 // State for handling deadlines.
492 // The code in deadline_filter.c requires this to be the first field.
493 // TODO(roth): This is slightly sub-optimal in that grpc_deadline_state
494 // and this struct both independently store pointers to the call stack
495 // and call combiner. If/when we have time, find a way to avoid this
496 // without breaking the grpc_deadline_state abstraction.
497 grpc_deadline_state deadline_state_;
498
499 grpc_slice path_; // Request path.
500 gpr_cycle_counter call_start_time_;
501 grpc_millis deadline_;
502 Arena* arena_;
503 grpc_call_stack* owning_call_;
504 CallCombiner* call_combiner_;
505 grpc_call_context_element* call_context_;
506
507 grpc_polling_entity* pollent_ = nullptr;
508
509 grpc_closure pick_closure_;
510
511 // Accessed while holding ChannelData::resolution_mu_.
512 bool service_config_applied_ = false;
513 bool queued_pending_resolver_result_ = false;
514 ChannelData::ResolverQueuedCall resolver_queued_call_;
515 ResolverQueuedCallCanceller* resolver_call_canceller_ = nullptr;
516
517 std::function<void()> on_call_committed_;
518
519 grpc_closure* original_recv_initial_metadata_ready_ = nullptr;
520 grpc_closure recv_initial_metadata_ready_;
521
522 RefCountedPtr<DynamicFilters> dynamic_filters_;
523 RefCountedPtr<DynamicFilters::Call> dynamic_call_;
524
525 // Batches are added to this list when received from above.
526 // They are removed when we are done handling the batch (i.e., when
527 // either we have invoked all of the batch's callbacks or we have
528 // passed the batch down to the LB call and are not intercepting any of
529 // its callbacks).
530 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
531
532 // Set when we get a cancel_stream op.
533 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
534 };
535
536 //
537 // RetryingCall definition
538 //
539
540 class RetryingCall {
541 public:
542 RetryingCall(
543 ChannelData* chand, const grpc_call_element_args& args,
544 grpc_polling_entity* pollent,
545 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
546 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy);
547 ~RetryingCall();
548
549 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
550
551 RefCountedPtr<SubchannelCall> subchannel_call() const;
552
553 private:
554 // State used for starting a retryable batch on a subchannel call.
555 // This provides its own grpc_transport_stream_op_batch and other data
556 // structures needed to populate the ops in the batch.
557 // We allocate one struct on the arena for each attempt at starting a
558 // batch on a given subchannel call.
559 struct SubchannelCallBatchData {
560 // Creates a SubchannelCallBatchData object on the call's arena with the
561 // specified refcount. If set_on_complete is true, the batch's
562 // on_complete callback will be set to point to on_complete();
563 // otherwise, the batch's on_complete callback will be null.
564 static SubchannelCallBatchData* Create(RetryingCall* call, int refcount,
565 bool set_on_complete);
566
Unrefgrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallBatchData567 void Unref() {
568 if (gpr_unref(&refs)) Destroy();
569 }
570
571 SubchannelCallBatchData(RetryingCall* call, int refcount,
572 bool set_on_complete);
573 // All dtor code must be added in `Destroy()`. This is because we may
574 // call closures in `SubchannelCallBatchData` after they are unrefed by
575 // `Unref()`, and msan would complain about accessing this class
576 // after calling dtor. As a result we cannot call the `dtor` in `Unref()`.
577 // TODO(soheil): We should try to call the dtor in `Unref()`.
~SubchannelCallBatchDatagrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallBatchData578 ~SubchannelCallBatchData() { Destroy(); }
579 void Destroy();
580
581 gpr_refcount refs;
582 grpc_call_element* elem;
583 RetryingCall* call;
584 RefCountedPtr<LoadBalancedCall> lb_call;
585 // The batch to use in the subchannel call.
586 // Its payload field points to SubchannelCallRetryState::batch_payload.
587 grpc_transport_stream_op_batch batch;
588 // For intercepting on_complete.
589 grpc_closure on_complete;
590 };
591
592 // Retry state associated with a subchannel call.
593 // Stored in the parent_data of the subchannel call object.
594 struct SubchannelCallRetryState {
SubchannelCallRetryStategrpc_core::__anonee6b702a0111::RetryingCall::SubchannelCallRetryState595 explicit SubchannelCallRetryState(grpc_call_context_element* context)
596 : batch_payload(context),
597 started_send_initial_metadata(false),
598 completed_send_initial_metadata(false),
599 started_send_trailing_metadata(false),
600 completed_send_trailing_metadata(false),
601 started_recv_initial_metadata(false),
602 completed_recv_initial_metadata(false),
603 started_recv_trailing_metadata(false),
604 completed_recv_trailing_metadata(false),
605 retry_dispatched(false) {}
606
607 // SubchannelCallBatchData.batch.payload points to this.
608 grpc_transport_stream_op_batch_payload batch_payload;
609 // For send_initial_metadata.
610 // Note that we need to make a copy of the initial metadata for each
611 // subchannel call instead of just referring to the copy in call_data,
612 // because filters in the subchannel stack will probably add entries,
613 // so we need to start in a pristine state for each attempt of the call.
614 grpc_linked_mdelem* send_initial_metadata_storage;
615 grpc_metadata_batch send_initial_metadata;
616 // For send_message.
617 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
618 ManualConstructor<ByteStreamCache::CachingByteStream> send_message;
619 // For send_trailing_metadata.
620 grpc_linked_mdelem* send_trailing_metadata_storage;
621 grpc_metadata_batch send_trailing_metadata;
622 // For intercepting recv_initial_metadata.
623 grpc_metadata_batch recv_initial_metadata;
624 grpc_closure recv_initial_metadata_ready;
625 bool trailing_metadata_available = false;
626 // For intercepting recv_message.
627 grpc_closure recv_message_ready;
628 OrphanablePtr<ByteStream> recv_message;
629 // For intercepting recv_trailing_metadata.
630 grpc_metadata_batch recv_trailing_metadata;
631 grpc_transport_stream_stats collect_stats;
632 grpc_closure recv_trailing_metadata_ready;
633 // These fields indicate which ops have been started and completed on
634 // this subchannel call.
635 size_t started_send_message_count = 0;
636 size_t completed_send_message_count = 0;
637 size_t started_recv_message_count = 0;
638 size_t completed_recv_message_count = 0;
639 bool started_send_initial_metadata : 1;
640 bool completed_send_initial_metadata : 1;
641 bool started_send_trailing_metadata : 1;
642 bool completed_send_trailing_metadata : 1;
643 bool started_recv_initial_metadata : 1;
644 bool completed_recv_initial_metadata : 1;
645 bool started_recv_trailing_metadata : 1;
646 bool completed_recv_trailing_metadata : 1;
647 // State for callback processing.
648 SubchannelCallBatchData* recv_initial_metadata_ready_deferred_batch =
649 nullptr;
650 grpc_error* recv_initial_metadata_error = GRPC_ERROR_NONE;
651 SubchannelCallBatchData* recv_message_ready_deferred_batch = nullptr;
652 grpc_error* recv_message_error = GRPC_ERROR_NONE;
653 SubchannelCallBatchData* recv_trailing_metadata_internal_batch = nullptr;
654 // NOTE: Do not move this next to the metadata bitfields above. That would
655 // save space but will also result in a data race because compiler
656 // will generate a 2 byte store which overwrites the meta-data
657 // fields upon setting this field.
658 bool retry_dispatched : 1;
659 };
660
661 // Pending batches stored in call data.
662 struct PendingBatch {
663 // The pending batch. If nullptr, this slot is empty.
664 grpc_transport_stream_op_batch* batch = nullptr;
665 // Indicates whether payload for send ops has been cached in CallData.
666 bool send_ops_cached = false;
667 };
668
669 // Caches data for send ops so that it can be retried later, if not
670 // already cached.
671 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
672 void FreeCachedSendInitialMetadata();
673 // Frees cached send_message at index idx.
674 void FreeCachedSendMessage(size_t idx);
675 void FreeCachedSendTrailingMetadata();
676 // Frees cached send ops that have already been completed after
677 // committing the call.
678 void FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState* retry_state);
679 // Frees cached send ops that were completed by the completed batch in
680 // batch_data. Used when batches are completed after the call is committed.
681 void FreeCachedSendOpDataForCompletedBatch(
682 SubchannelCallBatchData* batch_data,
683 SubchannelCallRetryState* retry_state);
684
685 // Returns the index into pending_batches_ to be used for batch.
686 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
687 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
688 void PendingBatchClear(PendingBatch* pending);
689 void MaybeClearPendingBatch(PendingBatch* pending);
690 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
691 // A predicate type and some useful implementations for PendingBatchesFail().
692 typedef bool (*YieldCallCombinerPredicate)(
693 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)694 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
695 return true;
696 }
NoYieldCallCombiner(const CallCombinerClosureList &)697 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
698 return false;
699 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)700 static bool YieldCallCombinerIfPendingBatchesFound(
701 const CallCombinerClosureList& closures) {
702 return closures.size() > 0;
703 }
704 // Fails all pending batches.
705 // If yield_call_combiner_predicate returns true, assumes responsibility for
706 // yielding the call combiner.
707 void PendingBatchesFail(
708 grpc_error* error,
709 YieldCallCombinerPredicate yield_call_combiner_predicate);
710 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
711 // Resumes all pending batches on lb_call_.
712 void PendingBatchesResume();
713 // Returns a pointer to the first pending batch for which predicate(batch)
714 // returns true, or null if not found.
715 template <typename Predicate>
716 PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
717
718 // Commits the call so that no further retry attempts will be performed.
719 void RetryCommit(SubchannelCallRetryState* retry_state);
720 // Starts a retry after appropriate back-off.
721 void DoRetry(SubchannelCallRetryState* retry_state,
722 grpc_millis server_pushback_ms);
723 // Returns true if the call is being retried.
724 bool MaybeRetry(SubchannelCallBatchData* batch_data, grpc_status_code status,
725 grpc_mdelem* server_pushback_md);
726
727 // Invokes recv_initial_metadata_ready for a subchannel batch.
728 static void InvokeRecvInitialMetadataCallback(void* arg, grpc_error* error);
729 // Intercepts recv_initial_metadata_ready callback for retries.
730 // Commits the call and returns the initial metadata up the stack.
731 static void RecvInitialMetadataReady(void* arg, grpc_error* error);
732
733 // Invokes recv_message_ready for a subchannel batch.
734 static void InvokeRecvMessageCallback(void* arg, grpc_error* error);
735 // Intercepts recv_message_ready callback for retries.
736 // Commits the call and returns the message up the stack.
737 static void RecvMessageReady(void* arg, grpc_error* error);
738
739 // Sets *status and *server_pushback_md based on md_batch and error.
740 // Only sets *server_pushback_md if server_pushback_md != nullptr.
741 void GetCallStatus(grpc_metadata_batch* md_batch, grpc_error* error,
742 grpc_status_code* status,
743 grpc_mdelem** server_pushback_md);
744 // Adds recv_trailing_metadata_ready closure to closures.
745 void AddClosureForRecvTrailingMetadataReady(
746 SubchannelCallBatchData* batch_data, grpc_error* error,
747 CallCombinerClosureList* closures);
748 // Adds any necessary closures for deferred recv_initial_metadata and
749 // recv_message callbacks to closures.
750 static void AddClosuresForDeferredRecvCallbacks(
751 SubchannelCallBatchData* batch_data,
752 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
753 // Returns true if any op in the batch was not yet started.
754 // Only looks at send ops, since recv ops are always started immediately.
755 bool PendingBatchIsUnstarted(PendingBatch* pending,
756 SubchannelCallRetryState* retry_state);
757 // For any pending batch containing an op that has not yet been started,
758 // adds the pending batch's completion closures to closures.
759 void AddClosuresToFailUnstartedPendingBatches(
760 SubchannelCallRetryState* retry_state, grpc_error* error,
761 CallCombinerClosureList* closures);
762 // Runs necessary closures upon completion of a call attempt.
763 void RunClosuresForCompletedCall(SubchannelCallBatchData* batch_data,
764 grpc_error* error);
765 // Intercepts recv_trailing_metadata_ready callback for retries.
766 // Commits the call and returns the trailing metadata up the stack.
767 static void RecvTrailingMetadataReady(void* arg, grpc_error* error);
768
769 // Adds the on_complete closure for the pending batch completed in
770 // batch_data to closures.
771 void AddClosuresForCompletedPendingBatch(SubchannelCallBatchData* batch_data,
772 grpc_error* error,
773 CallCombinerClosureList* closures);
774
775 // If there are any cached ops to replay or pending ops to start on the
776 // subchannel call, adds a closure to closures to invoke
777 // StartRetriableSubchannelBatches().
778 void AddClosuresForReplayOrPendingSendOps(
779 SubchannelCallBatchData* batch_data,
780 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
781
782 // Callback used to intercept on_complete from subchannel calls.
783 // Called only when retries are enabled.
784 static void OnComplete(void* arg, grpc_error* error);
785
786 static void StartBatchInCallCombiner(void* arg, grpc_error* ignored);
787 // Adds a closure to closures that will execute batch in the call combiner.
788 void AddClosureForSubchannelBatch(grpc_transport_stream_op_batch* batch,
789 CallCombinerClosureList* closures);
790 // Adds retriable send_initial_metadata op to batch_data.
791 void AddRetriableSendInitialMetadataOp(SubchannelCallRetryState* retry_state,
792 SubchannelCallBatchData* batch_data);
793 // Adds retriable send_message op to batch_data.
794 void AddRetriableSendMessageOp(SubchannelCallRetryState* retry_state,
795 SubchannelCallBatchData* batch_data);
796 // Adds retriable send_trailing_metadata op to batch_data.
797 void AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState* retry_state,
798 SubchannelCallBatchData* batch_data);
799 // Adds retriable recv_initial_metadata op to batch_data.
800 void AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState* retry_state,
801 SubchannelCallBatchData* batch_data);
802 // Adds retriable recv_message op to batch_data.
803 void AddRetriableRecvMessageOp(SubchannelCallRetryState* retry_state,
804 SubchannelCallBatchData* batch_data);
805 // Adds retriable recv_trailing_metadata op to batch_data.
806 void AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState* retry_state,
807 SubchannelCallBatchData* batch_data);
808 // Helper function used to start a recv_trailing_metadata batch. This
809 // is used in the case where a recv_initial_metadata or recv_message
810 // op fails in a way that we know the call is over but when the application
811 // has not yet started its own recv_trailing_metadata op.
812 void StartInternalRecvTrailingMetadata();
813 // If there are any cached send ops that need to be replayed on the
814 // current subchannel call, creates and returns a new subchannel batch
815 // to replay those ops. Otherwise, returns nullptr.
816 SubchannelCallBatchData* MaybeCreateSubchannelBatchForReplay(
817 SubchannelCallRetryState* retry_state);
818 // Adds subchannel batches for pending batches to closures.
819 void AddSubchannelBatchesForPendingBatches(
820 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures);
821 // Constructs and starts whatever subchannel batches are needed on the
822 // subchannel call.
823 static void StartRetriableSubchannelBatches(void* arg, grpc_error* ignored);
824
825 static void CreateLbCall(void* arg, grpc_error* error);
826
827 ChannelData* chand_;
828 grpc_polling_entity* pollent_;
829 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
830 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy_ = nullptr;
831 BackOff retry_backoff_;
832
833 grpc_slice path_; // Request path.
834 gpr_cycle_counter call_start_time_;
835 grpc_millis deadline_;
836 Arena* arena_;
837 grpc_call_stack* owning_call_;
838 CallCombiner* call_combiner_;
839 grpc_call_context_element* call_context_;
840
841 grpc_closure retry_closure_;
842
843 RefCountedPtr<LoadBalancedCall> lb_call_;
844
845 // Batches are added to this list when received from above.
846 // They are removed when we are done handling the batch (i.e., when
847 // either we have invoked all of the batch's callbacks or we have
848 // passed the batch down to the LB call and are not intercepting any of
849 // its callbacks).
850 // TODO(roth): Now that the retry code is split out into its own call
851 // object, revamp this to work in a cleaner way, since we no longer need
852 // for batches to ever wait for name resolution or LB picks.
853 PendingBatch pending_batches_[MAX_PENDING_BATCHES];
854 bool pending_send_initial_metadata_ : 1;
855 bool pending_send_message_ : 1;
856 bool pending_send_trailing_metadata_ : 1;
857
858 // Set when we get a cancel_stream op.
859 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
860
861 // Retry state.
862 bool enable_retries_ : 1;
863 bool retry_committed_ : 1;
864 bool last_attempt_got_server_pushback_ : 1;
865 int num_attempts_completed_ = 0;
866 size_t bytes_buffered_for_retry_ = 0;
867 grpc_timer retry_timer_;
868
869 // The number of pending retriable subchannel batches containing send ops.
870 // We hold a ref to the call stack while this is non-zero, since replay
871 // batches may not complete until after all callbacks have been returned
872 // to the surface, and we need to make sure that the call is not destroyed
873 // until all of these batches have completed.
874 // Note that we actually only need to track replay batches, but it's
875 // easier to track all batches with send ops.
876 int num_pending_retriable_subchannel_send_batches_ = 0;
877
878 // Cached data for retrying send ops.
879 // send_initial_metadata
880 bool seen_send_initial_metadata_ = false;
881 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
882 grpc_metadata_batch send_initial_metadata_;
883 uint32_t send_initial_metadata_flags_;
884 gpr_atm* peer_string_;
885 // send_message
886 // When we get a send_message op, we replace the original byte stream
887 // with a CachingByteStream that caches the slices to a local buffer for
888 // use in retries.
889 // Note: We inline the cache for the first 3 send_message ops and use
890 // dynamic allocation after that. This number was essentially picked
891 // at random; it could be changed in the future to tune performance.
892 absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
893 // send_trailing_metadata
894 bool seen_send_trailing_metadata_ = false;
895 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
896 grpc_metadata_batch send_trailing_metadata_;
897 };
898
899 //
900 // LoadBalancedCall definition
901 //
902
903 // This object is ref-counted, but it cannot inherit from RefCounted<>,
904 // because it is allocated on the arena and can't free its memory when
905 // its refcount goes to zero. So instead, it manually implements the
906 // same API as RefCounted<>, so that it can be used with RefCountedPtr<>.
907 class LoadBalancedCall {
908 public:
909 static RefCountedPtr<LoadBalancedCall> Create(
910 ChannelData* chand, const grpc_call_element_args& args,
911 grpc_polling_entity* pollent, size_t parent_data_size);
912
913 LoadBalancedCall(ChannelData* chand, const grpc_call_element_args& args,
914 grpc_polling_entity* pollent);
915 ~LoadBalancedCall();
916
917 // Interface of RefCounted<>.
918 RefCountedPtr<LoadBalancedCall> Ref() GRPC_MUST_USE_RESULT;
919 RefCountedPtr<LoadBalancedCall> Ref(const DebugLocation& location,
920 const char* reason) GRPC_MUST_USE_RESULT;
921 // When refcount drops to 0, destroys itself and the associated call stack,
922 // but does NOT free the memory because it's in the call arena.
923 void Unref();
924 void Unref(const DebugLocation& location, const char* reason);
925
926 void* GetParentData();
927
928 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
929
930 // Invoked by channel for queued LB picks when the picker is updated.
931 static void PickSubchannel(void* arg, grpc_error* error);
932 // Helper function for performing an LB pick while holding the data plane
933 // mutex. Returns true if the pick is complete, in which case the caller
934 // must invoke PickDone() or AsyncPickDone() with the returned error.
935 bool PickSubchannelLocked(grpc_error** error);
936 // Schedules a callback to process the completed pick. The callback
937 // will not run until after this method returns.
938 void AsyncPickDone(grpc_error* error);
939
subchannel_call() const940 RefCountedPtr<SubchannelCall> subchannel_call() const {
941 return subchannel_call_;
942 }
943
944 private:
945 // Allow RefCountedPtr<> to access IncrementRefCount().
946 template <typename T>
947 friend class ::grpc_core::RefCountedPtr;
948
949 class LbQueuedCallCanceller;
950 class Metadata;
951 class LbCallState;
952
953 // Interface of RefCounted<>.
954 void IncrementRefCount();
955 void IncrementRefCount(const DebugLocation& location, const char* reason);
956
957 // Returns the index into pending_batches_ to be used for batch.
958 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
959 void PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
960 static void FailPendingBatchInCallCombiner(void* arg, grpc_error* error);
961 // A predicate type and some useful implementations for PendingBatchesFail().
962 typedef bool (*YieldCallCombinerPredicate)(
963 const CallCombinerClosureList& closures);
YieldCallCombiner(const CallCombinerClosureList &)964 static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
965 return true;
966 }
NoYieldCallCombiner(const CallCombinerClosureList &)967 static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) {
968 return false;
969 }
YieldCallCombinerIfPendingBatchesFound(const CallCombinerClosureList & closures)970 static bool YieldCallCombinerIfPendingBatchesFound(
971 const CallCombinerClosureList& closures) {
972 return closures.size() > 0;
973 }
974 // Fails all pending batches.
975 // If yield_call_combiner_predicate returns true, assumes responsibility for
976 // yielding the call combiner.
977 void PendingBatchesFail(
978 grpc_error* error,
979 YieldCallCombinerPredicate yield_call_combiner_predicate);
980 static void ResumePendingBatchInCallCombiner(void* arg, grpc_error* ignored);
981 // Resumes all pending batches on subchannel_call_.
982 void PendingBatchesResume();
983
984 static void RecvTrailingMetadataReadyForLoadBalancingPolicy(
985 void* arg, grpc_error* error);
986 void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
987 grpc_transport_stream_op_batch* batch);
988
989 void CreateSubchannelCall();
990 // Invoked when a pick is completed, on both success or failure.
991 static void PickDone(void* arg, grpc_error* error);
992 // Removes the call from the channel's list of queued picks if present.
993 void MaybeRemoveCallFromLbQueuedCallsLocked();
994 // Adds the call to the channel's list of queued picks if not already present.
995 void MaybeAddCallToLbQueuedCallsLocked();
996
997 RefCount refs_;
998
999 ChannelData* chand_;
1000
1001 // TODO(roth): Instead of duplicating these fields in every filter
1002 // that uses any one of them, we should store them in the call
1003 // context. This will save per-call memory overhead.
1004 grpc_slice path_; // Request path.
1005 gpr_cycle_counter call_start_time_;
1006 grpc_millis deadline_;
1007 Arena* arena_;
1008 grpc_call_stack* owning_call_;
1009 CallCombiner* call_combiner_;
1010 grpc_call_context_element* call_context_;
1011
1012 // Set when we get a cancel_stream op.
1013 grpc_error* cancel_error_ = GRPC_ERROR_NONE;
1014
1015 grpc_polling_entity* pollent_ = nullptr;
1016
1017 grpc_closure pick_closure_;
1018
1019 // Accessed while holding ChannelData::data_plane_mu_.
1020 ChannelData::LbQueuedCall queued_call_;
1021 bool queued_pending_lb_pick_ = false;
1022 const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
1023 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1024 std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*,
1025 LoadBalancingPolicy::CallState*)>
1026 lb_recv_trailing_metadata_ready_;
1027 LbQueuedCallCanceller* lb_call_canceller_ = nullptr;
1028
1029 RefCountedPtr<SubchannelCall> subchannel_call_;
1030
1031 // For intercepting recv_trailing_metadata_ready for the LB policy.
1032 grpc_metadata_batch* recv_trailing_metadata_ = nullptr;
1033 grpc_closure recv_trailing_metadata_ready_;
1034 grpc_closure* original_recv_trailing_metadata_ready_ = nullptr;
1035
1036 // Batches are added to this list when received from above.
1037 // They are removed when we are done handling the batch (i.e., when
1038 // either we have invoked all of the batch's callbacks or we have
1039 // passed the batch down to the subchannel call and are not
1040 // intercepting any of its callbacks).
1041 grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {};
1042 };
1043
1044 //
1045 // dynamic termination filter
1046 //
1047
1048 // Channel arg pointer vtable for GRPC_ARG_CLIENT_CHANNEL_DATA.
ChannelDataArgCopy(void * p)1049 void* ChannelDataArgCopy(void* p) { return p; }
ChannelDataArgDestroy(void *)1050 void ChannelDataArgDestroy(void* /*p*/) {}
ChannelDataArgCmp(void * p,void * q)1051 int ChannelDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1052 const grpc_arg_pointer_vtable kChannelDataArgPointerVtable = {
1053 ChannelDataArgCopy, ChannelDataArgDestroy, ChannelDataArgCmp};
1054
1055 // Channel arg pointer vtable for GRPC_ARG_RETRY_THROTTLE_DATA.
RetryThrottleDataArgCopy(void * p)1056 void* RetryThrottleDataArgCopy(void* p) {
1057 auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1058 retry_throttle_data->Ref().release();
1059 return p;
1060 }
RetryThrottleDataArgDestroy(void * p)1061 void RetryThrottleDataArgDestroy(void* p) {
1062 auto* retry_throttle_data = static_cast<ServerRetryThrottleData*>(p);
1063 retry_throttle_data->Unref();
1064 }
RetryThrottleDataArgCmp(void * p,void * q)1065 int RetryThrottleDataArgCmp(void* p, void* q) { return GPR_ICMP(p, q); }
1066 const grpc_arg_pointer_vtable kRetryThrottleDataArgPointerVtable = {
1067 RetryThrottleDataArgCopy, RetryThrottleDataArgDestroy,
1068 RetryThrottleDataArgCmp};
1069
1070 class DynamicTerminationFilterChannelData {
1071 public:
1072 static grpc_error* Init(grpc_channel_element* elem,
1073 grpc_channel_element_args* args);
1074
Destroy(grpc_channel_element * elem)1075 static void Destroy(grpc_channel_element* elem) {
1076 auto* chand =
1077 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1078 chand->~DynamicTerminationFilterChannelData();
1079 }
1080
1081 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)1082 static void StartTransportOp(grpc_channel_element* /*elem*/,
1083 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)1084 static void GetChannelInfo(grpc_channel_element* /*elem*/,
1085 const grpc_channel_info* /*info*/) {}
1086
chand() const1087 ChannelData* chand() const { return chand_; }
retry_throttle_data() const1088 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data() const {
1089 return retry_throttle_data_;
1090 }
1091
1092 private:
GetRetryThrottleDataFromArgs(const grpc_channel_args * args)1093 static RefCountedPtr<ServerRetryThrottleData> GetRetryThrottleDataFromArgs(
1094 const grpc_channel_args* args) {
1095 auto* retry_throttle_data =
1096 grpc_channel_args_find_pointer<ServerRetryThrottleData>(
1097 args, GRPC_ARG_RETRY_THROTTLE_DATA);
1098 if (retry_throttle_data == nullptr) return nullptr;
1099 return retry_throttle_data->Ref();
1100 }
1101
DynamicTerminationFilterChannelData(const grpc_channel_args * args)1102 explicit DynamicTerminationFilterChannelData(const grpc_channel_args* args)
1103 : chand_(grpc_channel_args_find_pointer<ChannelData>(
1104 args, GRPC_ARG_CLIENT_CHANNEL_DATA)),
1105 retry_throttle_data_(GetRetryThrottleDataFromArgs(args)) {}
1106
1107 ChannelData* chand_;
1108 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
1109 };
1110
1111 class DynamicTerminationFilterCallData {
1112 public:
Init(grpc_call_element * elem,const grpc_call_element_args * args)1113 static grpc_error* Init(grpc_call_element* elem,
1114 const grpc_call_element_args* args) {
1115 new (elem->call_data) DynamicTerminationFilterCallData(*args);
1116 return GRPC_ERROR_NONE;
1117 }
1118
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1119 static void Destroy(grpc_call_element* elem,
1120 const grpc_call_final_info* /*final_info*/,
1121 grpc_closure* then_schedule_closure) {
1122 auto* calld =
1123 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1124 auto* chand =
1125 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1126 RefCountedPtr<SubchannelCall> subchannel_call;
1127 if (chand->chand()->enable_retries()) {
1128 if (GPR_LIKELY(calld->retrying_call_ != nullptr)) {
1129 subchannel_call = calld->retrying_call_->subchannel_call();
1130 calld->retrying_call_->~RetryingCall();
1131 }
1132 } else {
1133 if (GPR_LIKELY(calld->lb_call_ != nullptr)) {
1134 subchannel_call = calld->lb_call_->subchannel_call();
1135 }
1136 }
1137 calld->~DynamicTerminationFilterCallData();
1138 if (GPR_LIKELY(subchannel_call != nullptr)) {
1139 subchannel_call->SetAfterCallStackDestroy(then_schedule_closure);
1140 } else {
1141 // TODO(yashkt) : This can potentially be a Closure::Run
1142 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
1143 }
1144 }
1145
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1146 static void StartTransportStreamOpBatch(
1147 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1148 auto* calld =
1149 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1150 auto* chand =
1151 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1152 if (chand->chand()->enable_retries()) {
1153 calld->retrying_call_->StartTransportStreamOpBatch(batch);
1154 } else {
1155 calld->lb_call_->StartTransportStreamOpBatch(batch);
1156 }
1157 }
1158
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1159 static void SetPollent(grpc_call_element* elem,
1160 grpc_polling_entity* pollent) {
1161 auto* calld =
1162 static_cast<DynamicTerminationFilterCallData*>(elem->call_data);
1163 auto* chand =
1164 static_cast<DynamicTerminationFilterChannelData*>(elem->channel_data);
1165 ChannelData* client_channel = chand->chand();
1166 grpc_call_element_args args = {
1167 calld->owning_call_, nullptr,
1168 calld->call_context_, calld->path_,
1169 calld->call_start_time_, calld->deadline_,
1170 calld->arena_, calld->call_combiner_};
1171 if (client_channel->enable_retries()) {
1172 // Get retry settings from service config.
1173 auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1174 calld->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1175 GPR_ASSERT(svc_cfg_call_data != nullptr);
1176 auto* method_config = static_cast<const ClientChannelMethodParsedConfig*>(
1177 svc_cfg_call_data->GetMethodParsedConfig(
1178 ClientChannelServiceConfigParser::ParserIndex()));
1179 // Create retrying call.
1180 calld->retrying_call_ = calld->arena_->New<RetryingCall>(
1181 client_channel, args, pollent, chand->retry_throttle_data(),
1182 method_config == nullptr ? nullptr : method_config->retry_policy());
1183 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1184 gpr_log(
1185 GPR_INFO,
1186 "chand=%p dymamic_termination_calld=%p: create retrying_call=%p",
1187 client_channel, calld, calld->retrying_call_);
1188 }
1189 } else {
1190 calld->lb_call_ =
1191 LoadBalancedCall::Create(client_channel, args, pollent, 0);
1192 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1193 gpr_log(GPR_INFO,
1194 "chand=%p dynamic_termination_calld=%p: create lb_call=%p",
1195 chand, client_channel, calld->lb_call_.get());
1196 }
1197 }
1198 }
1199
1200 private:
DynamicTerminationFilterCallData(const grpc_call_element_args & args)1201 explicit DynamicTerminationFilterCallData(const grpc_call_element_args& args)
1202 : path_(grpc_slice_ref_internal(args.path)),
1203 call_start_time_(args.start_time),
1204 deadline_(args.deadline),
1205 arena_(args.arena),
1206 owning_call_(args.call_stack),
1207 call_combiner_(args.call_combiner),
1208 call_context_(args.context) {}
1209
~DynamicTerminationFilterCallData()1210 ~DynamicTerminationFilterCallData() { grpc_slice_unref_internal(path_); }
1211
1212 grpc_slice path_; // Request path.
1213 gpr_cycle_counter call_start_time_;
1214 grpc_millis deadline_;
1215 Arena* arena_;
1216 grpc_call_stack* owning_call_;
1217 CallCombiner* call_combiner_;
1218 grpc_call_context_element* call_context_;
1219
1220 RetryingCall* retrying_call_ = nullptr;
1221 RefCountedPtr<LoadBalancedCall> lb_call_;
1222 };
1223
1224 const grpc_channel_filter kDynamicTerminationFilterVtable = {
1225 DynamicTerminationFilterCallData::StartTransportStreamOpBatch,
1226 DynamicTerminationFilterChannelData::StartTransportOp,
1227 sizeof(DynamicTerminationFilterCallData),
1228 DynamicTerminationFilterCallData::Init,
1229 DynamicTerminationFilterCallData::SetPollent,
1230 DynamicTerminationFilterCallData::Destroy,
1231 sizeof(DynamicTerminationFilterChannelData),
1232 DynamicTerminationFilterChannelData::Init,
1233 DynamicTerminationFilterChannelData::Destroy,
1234 DynamicTerminationFilterChannelData::GetChannelInfo,
1235 "dynamic_filter_termination",
1236 };
1237
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1238 grpc_error* DynamicTerminationFilterChannelData::Init(
1239 grpc_channel_element* elem, grpc_channel_element_args* args) {
1240 GPR_ASSERT(args->is_last);
1241 GPR_ASSERT(elem->filter == &kDynamicTerminationFilterVtable);
1242 new (elem->channel_data)
1243 DynamicTerminationFilterChannelData(args->channel_args);
1244 return GRPC_ERROR_NONE;
1245 }
1246
1247 //
1248 // ChannelData::SubchannelWrapper
1249 //
1250
1251 // This class is a wrapper for Subchannel that hides details of the
1252 // channel's implementation (such as the health check service name and
1253 // connected subchannel) from the LB policy API.
1254 //
1255 // Note that no synchronization is needed here, because even if the
1256 // underlying subchannel is shared between channels, this wrapper will only
1257 // be used within one channel, so it will always be synchronized by the
1258 // control plane work_serializer.
1259 class ChannelData::SubchannelWrapper : public SubchannelInterface {
1260 public:
SubchannelWrapper(ChannelData * chand,Subchannel * subchannel,absl::optional<std::string> health_check_service_name)1261 SubchannelWrapper(ChannelData* chand, Subchannel* subchannel,
1262 absl::optional<std::string> health_check_service_name)
1263 : SubchannelInterface(
1264 GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
1265 ? "SubchannelWrapper"
1266 : nullptr),
1267 chand_(chand),
1268 subchannel_(subchannel),
1269 health_check_service_name_(std::move(health_check_service_name)) {
1270 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1271 gpr_log(GPR_INFO,
1272 "chand=%p: creating subchannel wrapper %p for subchannel %p",
1273 chand, this, subchannel_);
1274 }
1275 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "SubchannelWrapper");
1276 auto* subchannel_node = subchannel_->channelz_node();
1277 if (subchannel_node != nullptr) {
1278 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1279 if (it == chand_->subchannel_refcount_map_.end()) {
1280 chand_->channelz_node_->AddChildSubchannel(subchannel_node->uuid());
1281 it = chand_->subchannel_refcount_map_.emplace(subchannel_, 0).first;
1282 }
1283 ++it->second;
1284 }
1285 chand_->subchannel_wrappers_.insert(this);
1286 }
1287
~SubchannelWrapper()1288 ~SubchannelWrapper() override {
1289 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1290 gpr_log(GPR_INFO,
1291 "chand=%p: destroying subchannel wrapper %p for subchannel %p",
1292 chand_, this, subchannel_);
1293 }
1294 chand_->subchannel_wrappers_.erase(this);
1295 auto* subchannel_node = subchannel_->channelz_node();
1296 if (subchannel_node != nullptr) {
1297 auto it = chand_->subchannel_refcount_map_.find(subchannel_);
1298 GPR_ASSERT(it != chand_->subchannel_refcount_map_.end());
1299 --it->second;
1300 if (it->second == 0) {
1301 chand_->channelz_node_->RemoveChildSubchannel(subchannel_node->uuid());
1302 chand_->subchannel_refcount_map_.erase(it);
1303 }
1304 }
1305 GRPC_SUBCHANNEL_UNREF(subchannel_, "unref from LB");
1306 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "SubchannelWrapper");
1307 }
1308
CheckConnectivityState()1309 grpc_connectivity_state CheckConnectivityState() override {
1310 RefCountedPtr<ConnectedSubchannel> connected_subchannel;
1311 grpc_connectivity_state connectivity_state =
1312 subchannel_->CheckConnectivityState(health_check_service_name_,
1313 &connected_subchannel);
1314 MaybeUpdateConnectedSubchannel(std::move(connected_subchannel));
1315 return connectivity_state;
1316 }
1317
WatchConnectivityState(grpc_connectivity_state initial_state,std::unique_ptr<ConnectivityStateWatcherInterface> watcher)1318 void WatchConnectivityState(
1319 grpc_connectivity_state initial_state,
1320 std::unique_ptr<ConnectivityStateWatcherInterface> watcher) override {
1321 auto& watcher_wrapper = watcher_map_[watcher.get()];
1322 GPR_ASSERT(watcher_wrapper == nullptr);
1323 watcher_wrapper = new WatcherWrapper(std::move(watcher),
1324 Ref(DEBUG_LOCATION, "WatcherWrapper"),
1325 initial_state);
1326 subchannel_->WatchConnectivityState(
1327 initial_state, health_check_service_name_,
1328 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1329 watcher_wrapper));
1330 }
1331
CancelConnectivityStateWatch(ConnectivityStateWatcherInterface * watcher)1332 void CancelConnectivityStateWatch(
1333 ConnectivityStateWatcherInterface* watcher) override {
1334 auto it = watcher_map_.find(watcher);
1335 GPR_ASSERT(it != watcher_map_.end());
1336 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1337 it->second);
1338 watcher_map_.erase(it);
1339 }
1340
AttemptToConnect()1341 void AttemptToConnect() override { subchannel_->AttemptToConnect(); }
1342
ResetBackoff()1343 void ResetBackoff() override { subchannel_->ResetBackoff(); }
1344
channel_args()1345 const grpc_channel_args* channel_args() override {
1346 return subchannel_->channel_args();
1347 }
1348
ThrottleKeepaliveTime(int new_keepalive_time)1349 void ThrottleKeepaliveTime(int new_keepalive_time) {
1350 subchannel_->ThrottleKeepaliveTime(new_keepalive_time);
1351 }
1352
UpdateHealthCheckServiceName(absl::optional<std::string> health_check_service_name)1353 void UpdateHealthCheckServiceName(
1354 absl::optional<std::string> health_check_service_name) {
1355 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1356 gpr_log(GPR_INFO,
1357 "chand=%p: subchannel wrapper %p: updating health check service "
1358 "name from \"%s\" to \"%s\"",
1359 chand_, this, health_check_service_name_->c_str(),
1360 health_check_service_name->c_str());
1361 }
1362 for (auto& p : watcher_map_) {
1363 WatcherWrapper*& watcher_wrapper = p.second;
1364 // Cancel the current watcher and create a new one using the new
1365 // health check service name.
1366 // TODO(roth): If there is not already an existing health watch
1367 // call for the new name, then the watcher will initially report
1368 // state CONNECTING. If the LB policy is currently reporting
1369 // state READY, this may cause it to switch to CONNECTING before
1370 // switching back to READY. This could cause a small delay for
1371 // RPCs being started on the channel. If/when this becomes a
1372 // problem, we may be able to handle it by waiting for the new
1373 // watcher to report READY before we use it to replace the old one.
1374 WatcherWrapper* replacement = watcher_wrapper->MakeReplacement();
1375 subchannel_->CancelConnectivityStateWatch(health_check_service_name_,
1376 watcher_wrapper);
1377 watcher_wrapper = replacement;
1378 subchannel_->WatchConnectivityState(
1379 replacement->last_seen_state(), health_check_service_name,
1380 RefCountedPtr<Subchannel::ConnectivityStateWatcherInterface>(
1381 replacement));
1382 }
1383 // Save the new health check service name.
1384 health_check_service_name_ = std::move(health_check_service_name);
1385 }
1386
1387 // Caller must be holding the control-plane work_serializer.
connected_subchannel() const1388 ConnectedSubchannel* connected_subchannel() const {
1389 return connected_subchannel_.get();
1390 }
1391
1392 // Caller must be holding the data-plane mutex.
connected_subchannel_in_data_plane() const1393 ConnectedSubchannel* connected_subchannel_in_data_plane() const {
1394 return connected_subchannel_in_data_plane_.get();
1395 }
set_connected_subchannel_in_data_plane(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1396 void set_connected_subchannel_in_data_plane(
1397 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1398 connected_subchannel_in_data_plane_ = std::move(connected_subchannel);
1399 }
1400
1401 private:
1402 // Subchannel and SubchannelInterface have different interfaces for
1403 // their respective ConnectivityStateWatcherInterface classes.
1404 // The one in Subchannel updates the ConnectedSubchannel along with
1405 // the state, whereas the one in SubchannelInterface does not expose
1406 // the ConnectedSubchannel.
1407 //
1408 // This wrapper provides a bridge between the two. It implements
1409 // Subchannel::ConnectivityStateWatcherInterface and wraps
1410 // the instance of SubchannelInterface::ConnectivityStateWatcherInterface
1411 // that was passed in by the LB policy. We pass an instance of this
1412 // class to the underlying Subchannel, and when we get updates from
1413 // the subchannel, we pass those on to the wrapped watcher to return
1414 // the update to the LB policy. This allows us to set the connected
1415 // subchannel before passing the result back to the LB policy.
1416 class WatcherWrapper : public Subchannel::ConnectivityStateWatcherInterface {
1417 public:
WatcherWrapper(std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface> watcher,RefCountedPtr<SubchannelWrapper> parent,grpc_connectivity_state initial_state)1418 WatcherWrapper(
1419 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1420 watcher,
1421 RefCountedPtr<SubchannelWrapper> parent,
1422 grpc_connectivity_state initial_state)
1423 : watcher_(std::move(watcher)),
1424 parent_(std::move(parent)),
1425 last_seen_state_(initial_state) {}
1426
~WatcherWrapper()1427 ~WatcherWrapper() override {
1428 auto* parent = parent_.release(); // ref owned by lambda
1429 parent->chand_->work_serializer_->Run(
1430 [parent]() { parent->Unref(DEBUG_LOCATION, "WatcherWrapper"); },
1431 DEBUG_LOCATION);
1432 }
1433
OnConnectivityStateChange()1434 void OnConnectivityStateChange() override {
1435 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1436 gpr_log(GPR_INFO,
1437 "chand=%p: connectivity change for subchannel wrapper %p "
1438 "subchannel %p; hopping into work_serializer",
1439 parent_->chand_, parent_.get(), parent_->subchannel_);
1440 }
1441 Ref().release(); // ref owned by lambda
1442 parent_->chand_->work_serializer_->Run(
1443 [this]() {
1444 ApplyUpdateInControlPlaneWorkSerializer();
1445 Unref();
1446 },
1447 DEBUG_LOCATION);
1448 }
1449
interested_parties()1450 grpc_pollset_set* interested_parties() override {
1451 SubchannelInterface::ConnectivityStateWatcherInterface* watcher =
1452 watcher_.get();
1453 if (watcher_ == nullptr) watcher = replacement_->watcher_.get();
1454 return watcher->interested_parties();
1455 }
1456
MakeReplacement()1457 WatcherWrapper* MakeReplacement() {
1458 auto* replacement =
1459 new WatcherWrapper(std::move(watcher_), parent_, last_seen_state_);
1460 replacement_ = replacement;
1461 return replacement;
1462 }
1463
last_seen_state() const1464 grpc_connectivity_state last_seen_state() const { return last_seen_state_; }
1465
1466 private:
ApplyUpdateInControlPlaneWorkSerializer()1467 void ApplyUpdateInControlPlaneWorkSerializer() {
1468 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1469 gpr_log(GPR_INFO,
1470 "chand=%p: processing connectivity change in work serializer "
1471 "for subchannel wrapper %p subchannel %p "
1472 "watcher=%p",
1473 parent_->chand_, parent_.get(), parent_->subchannel_,
1474 watcher_.get());
1475 }
1476 ConnectivityStateChange state_change = PopConnectivityStateChange();
1477 absl::optional<absl::Cord> keepalive_throttling =
1478 state_change.status.GetPayload(kKeepaliveThrottlingKey);
1479 if (keepalive_throttling.has_value()) {
1480 int new_keepalive_time = -1;
1481 if (absl::SimpleAtoi(std::string(keepalive_throttling.value()),
1482 &new_keepalive_time)) {
1483 if (new_keepalive_time > parent_->chand_->keepalive_time_) {
1484 parent_->chand_->keepalive_time_ = new_keepalive_time;
1485 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1486 gpr_log(GPR_INFO, "chand=%p: throttling keepalive time to %d",
1487 parent_->chand_, parent_->chand_->keepalive_time_);
1488 }
1489 // Propagate the new keepalive time to all subchannels. This is so
1490 // that new transports created by any subchannel (and not just the
1491 // subchannel that received the GOAWAY), use the new keepalive time.
1492 for (auto* subchannel_wrapper :
1493 parent_->chand_->subchannel_wrappers_) {
1494 subchannel_wrapper->ThrottleKeepaliveTime(new_keepalive_time);
1495 }
1496 }
1497 } else {
1498 gpr_log(GPR_ERROR, "chand=%p: Illegal keepalive throttling value %s",
1499 parent_->chand_,
1500 std::string(keepalive_throttling.value()).c_str());
1501 }
1502 }
1503 // Ignore update if the parent WatcherWrapper has been replaced
1504 // since this callback was scheduled.
1505 if (watcher_ != nullptr) {
1506 last_seen_state_ = state_change.state;
1507 parent_->MaybeUpdateConnectedSubchannel(
1508 std::move(state_change.connected_subchannel));
1509 watcher_->OnConnectivityStateChange(state_change.state);
1510 }
1511 }
1512
1513 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>
1514 watcher_;
1515 RefCountedPtr<SubchannelWrapper> parent_;
1516 grpc_connectivity_state last_seen_state_;
1517 WatcherWrapper* replacement_ = nullptr;
1518 };
1519
MaybeUpdateConnectedSubchannel(RefCountedPtr<ConnectedSubchannel> connected_subchannel)1520 void MaybeUpdateConnectedSubchannel(
1521 RefCountedPtr<ConnectedSubchannel> connected_subchannel) {
1522 // Update the connected subchannel only if the channel is not shutting
1523 // down. This is because once the channel is shutting down, we
1524 // ignore picker updates from the LB policy, which means that
1525 // UpdateStateAndPickerLocked() will never process the entries
1526 // in chand_->pending_subchannel_updates_. So we don't want to add
1527 // entries there that will never be processed, since that would
1528 // leave dangling refs to the channel and prevent its destruction.
1529 grpc_error* disconnect_error = chand_->disconnect_error();
1530 if (disconnect_error != GRPC_ERROR_NONE) return;
1531 // Not shutting down, so do the update.
1532 if (connected_subchannel_ != connected_subchannel) {
1533 connected_subchannel_ = std::move(connected_subchannel);
1534 // Record the new connected subchannel so that it can be updated
1535 // in the data plane mutex the next time the picker is updated.
1536 chand_->pending_subchannel_updates_[Ref(
1537 DEBUG_LOCATION, "ConnectedSubchannelUpdate")] = connected_subchannel_;
1538 }
1539 }
1540
1541 ChannelData* chand_;
1542 Subchannel* subchannel_;
1543 absl::optional<std::string> health_check_service_name_;
1544 // Maps from the address of the watcher passed to us by the LB policy
1545 // to the address of the WrapperWatcher that we passed to the underlying
1546 // subchannel. This is needed so that when the LB policy calls
1547 // CancelConnectivityStateWatch() with its watcher, we know the
1548 // corresponding WrapperWatcher to cancel on the underlying subchannel.
1549 std::map<ConnectivityStateWatcherInterface*, WatcherWrapper*> watcher_map_;
1550 // To be accessed only in the control plane work_serializer.
1551 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
1552 // To be accessed only in the data plane mutex.
1553 RefCountedPtr<ConnectedSubchannel> connected_subchannel_in_data_plane_;
1554 };
1555
1556 //
1557 // ChannelData::ExternalConnectivityWatcher
1558 //
1559
ExternalConnectivityWatcher(ChannelData * chand,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)1560 ChannelData::ExternalConnectivityWatcher::ExternalConnectivityWatcher(
1561 ChannelData* chand, grpc_polling_entity pollent,
1562 grpc_connectivity_state* state, grpc_closure* on_complete,
1563 grpc_closure* watcher_timer_init)
1564 : chand_(chand),
1565 pollent_(pollent),
1566 initial_state_(*state),
1567 state_(state),
1568 on_complete_(on_complete),
1569 watcher_timer_init_(watcher_timer_init) {
1570 grpc_polling_entity_add_to_pollset_set(&pollent_,
1571 chand_->interested_parties_);
1572 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ExternalConnectivityWatcher");
1573 {
1574 MutexLock lock(&chand_->external_watchers_mu_);
1575 // Will be deleted when the watch is complete.
1576 GPR_ASSERT(chand->external_watchers_[on_complete] == nullptr);
1577 // Store a ref to the watcher in the external_watchers_ map.
1578 chand->external_watchers_[on_complete] =
1579 Ref(DEBUG_LOCATION, "AddWatcherToExternalWatchersMapLocked");
1580 }
1581 // Pass the ref from creating the object to Start().
1582 chand_->work_serializer_->Run(
1583 [this]() {
1584 // The ref is passed to AddWatcherLocked().
1585 AddWatcherLocked();
1586 },
1587 DEBUG_LOCATION);
1588 }
1589
~ExternalConnectivityWatcher()1590 ChannelData::ExternalConnectivityWatcher::~ExternalConnectivityWatcher() {
1591 grpc_polling_entity_del_from_pollset_set(&pollent_,
1592 chand_->interested_parties_);
1593 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1594 "ExternalConnectivityWatcher");
1595 }
1596
1597 void ChannelData::ExternalConnectivityWatcher::
RemoveWatcherFromExternalWatchersMap(ChannelData * chand,grpc_closure * on_complete,bool cancel)1598 RemoveWatcherFromExternalWatchersMap(ChannelData* chand,
1599 grpc_closure* on_complete,
1600 bool cancel) {
1601 RefCountedPtr<ExternalConnectivityWatcher> watcher;
1602 {
1603 MutexLock lock(&chand->external_watchers_mu_);
1604 auto it = chand->external_watchers_.find(on_complete);
1605 if (it != chand->external_watchers_.end()) {
1606 watcher = std::move(it->second);
1607 chand->external_watchers_.erase(it);
1608 }
1609 }
1610 // watcher->Cancel() will hop into the WorkSerializer, so we have to unlock
1611 // the mutex before calling it.
1612 if (watcher != nullptr && cancel) watcher->Cancel();
1613 }
1614
Notify(grpc_connectivity_state state,const absl::Status &)1615 void ChannelData::ExternalConnectivityWatcher::Notify(
1616 grpc_connectivity_state state, const absl::Status& /* status */) {
1617 bool done = false;
1618 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1619 MemoryOrder::RELAXED)) {
1620 return; // Already done.
1621 }
1622 // Remove external watcher.
1623 chand_->RemoveExternalConnectivityWatcher(on_complete_, /*cancel=*/false);
1624 // Report new state to the user.
1625 *state_ = state;
1626 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_NONE);
1627 // Hop back into the work_serializer to clean up.
1628 // Not needed in state SHUTDOWN, because the tracker will
1629 // automatically remove all watchers in that case.
1630 if (state != GRPC_CHANNEL_SHUTDOWN) {
1631 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1632 DEBUG_LOCATION);
1633 }
1634 }
1635
Cancel()1636 void ChannelData::ExternalConnectivityWatcher::Cancel() {
1637 bool done = false;
1638 if (!done_.CompareExchangeStrong(&done, true, MemoryOrder::RELAXED,
1639 MemoryOrder::RELAXED)) {
1640 return; // Already done.
1641 }
1642 ExecCtx::Run(DEBUG_LOCATION, on_complete_, GRPC_ERROR_CANCELLED);
1643 // Hop back into the work_serializer to clean up.
1644 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1645 DEBUG_LOCATION);
1646 }
1647
AddWatcherLocked()1648 void ChannelData::ExternalConnectivityWatcher::AddWatcherLocked() {
1649 Closure::Run(DEBUG_LOCATION, watcher_timer_init_, GRPC_ERROR_NONE);
1650 // Add new watcher. Pass the ref of the object from creation to OrphanablePtr.
1651 chand_->state_tracker_.AddWatcher(
1652 initial_state_, OrphanablePtr<ConnectivityStateWatcherInterface>(this));
1653 }
1654
RemoveWatcherLocked()1655 void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked() {
1656 chand_->state_tracker_.RemoveWatcher(this);
1657 }
1658
1659 //
1660 // ChannelData::ConnectivityWatcherAdder
1661 //
1662
1663 class ChannelData::ConnectivityWatcherAdder {
1664 public:
ConnectivityWatcherAdder(ChannelData * chand,grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)1665 ConnectivityWatcherAdder(
1666 ChannelData* chand, grpc_connectivity_state initial_state,
1667 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
1668 : chand_(chand),
1669 initial_state_(initial_state),
1670 watcher_(std::move(watcher)) {
1671 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1672 chand_->work_serializer_->Run([this]() { AddWatcherLocked(); },
1673 DEBUG_LOCATION);
1674 }
1675
1676 private:
AddWatcherLocked()1677 void AddWatcherLocked() {
1678 chand_->state_tracker_.AddWatcher(initial_state_, std::move(watcher_));
1679 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_, "ConnectivityWatcherAdder");
1680 delete this;
1681 }
1682
1683 ChannelData* chand_;
1684 grpc_connectivity_state initial_state_;
1685 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
1686 };
1687
1688 //
1689 // ChannelData::ConnectivityWatcherRemover
1690 //
1691
1692 class ChannelData::ConnectivityWatcherRemover {
1693 public:
ConnectivityWatcherRemover(ChannelData * chand,AsyncConnectivityStateWatcherInterface * watcher)1694 ConnectivityWatcherRemover(ChannelData* chand,
1695 AsyncConnectivityStateWatcherInterface* watcher)
1696 : chand_(chand), watcher_(watcher) {
1697 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
1698 chand_->work_serializer_->Run([this]() { RemoveWatcherLocked(); },
1699 DEBUG_LOCATION);
1700 }
1701
1702 private:
RemoveWatcherLocked()1703 void RemoveWatcherLocked() {
1704 chand_->state_tracker_.RemoveWatcher(watcher_);
1705 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1706 "ConnectivityWatcherRemover");
1707 delete this;
1708 }
1709
1710 ChannelData* chand_;
1711 AsyncConnectivityStateWatcherInterface* watcher_;
1712 };
1713
1714 //
1715 // ChannelData::ClientChannelControlHelper
1716 //
1717
1718 class ChannelData::ClientChannelControlHelper
1719 : public LoadBalancingPolicy::ChannelControlHelper {
1720 public:
ClientChannelControlHelper(ChannelData * chand)1721 explicit ClientChannelControlHelper(ChannelData* chand) : chand_(chand) {
1722 GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ClientChannelControlHelper");
1723 }
1724
~ClientChannelControlHelper()1725 ~ClientChannelControlHelper() override {
1726 GRPC_CHANNEL_STACK_UNREF(chand_->owning_stack_,
1727 "ClientChannelControlHelper");
1728 }
1729
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)1730 RefCountedPtr<SubchannelInterface> CreateSubchannel(
1731 ServerAddress address, const grpc_channel_args& args) override {
1732 if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
1733 // Determine health check service name.
1734 bool inhibit_health_checking = grpc_channel_arg_get_bool(
1735 grpc_channel_args_find(&args, GRPC_ARG_INHIBIT_HEALTH_CHECKING), false);
1736 absl::optional<std::string> health_check_service_name;
1737 if (!inhibit_health_checking) {
1738 health_check_service_name = chand_->health_check_service_name_;
1739 }
1740 // Remove channel args that should not affect subchannel uniqueness.
1741 static const char* args_to_remove[] = {
1742 GRPC_ARG_INHIBIT_HEALTH_CHECKING,
1743 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1744 };
1745 // Add channel args needed for the subchannel.
1746 absl::InlinedVector<grpc_arg, 3> args_to_add = {
1747 Subchannel::CreateSubchannelAddressArg(&address.address()),
1748 SubchannelPoolInterface::CreateChannelArg(
1749 chand_->subchannel_pool_.get()),
1750 };
1751 if (address.args() != nullptr) {
1752 for (size_t j = 0; j < address.args()->num_args; ++j) {
1753 args_to_add.emplace_back(address.args()->args[j]);
1754 }
1755 }
1756 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1757 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove),
1758 args_to_add.data(), args_to_add.size());
1759 gpr_free(args_to_add[0].value.string);
1760 // Create subchannel.
1761 Subchannel* subchannel =
1762 chand_->client_channel_factory_->CreateSubchannel(new_args);
1763 grpc_channel_args_destroy(new_args);
1764 if (subchannel == nullptr) return nullptr;
1765 // Make sure the subchannel has updated keepalive time.
1766 subchannel->ThrottleKeepaliveTime(chand_->keepalive_time_);
1767 // Create and return wrapper for the subchannel.
1768 return MakeRefCounted<SubchannelWrapper>(
1769 chand_, subchannel, std::move(health_check_service_name));
1770 }
1771
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)1772 void UpdateState(
1773 grpc_connectivity_state state, const absl::Status& status,
1774 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) override {
1775 if (chand_->resolver_ == nullptr) return; // Shutting down.
1776 grpc_error* disconnect_error = chand_->disconnect_error();
1777 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1778 const char* extra = disconnect_error == GRPC_ERROR_NONE
1779 ? ""
1780 : " (ignoring -- channel shutting down)";
1781 gpr_log(GPR_INFO, "chand=%p: update: state=%s status=(%s) picker=%p%s",
1782 chand_, ConnectivityStateName(state), status.ToString().c_str(),
1783 picker.get(), extra);
1784 }
1785 // Do update only if not shutting down.
1786 if (disconnect_error == GRPC_ERROR_NONE) {
1787 chand_->UpdateStateAndPickerLocked(state, status, "helper",
1788 std::move(picker));
1789 }
1790 }
1791
RequestReresolution()1792 void RequestReresolution() override {
1793 if (chand_->resolver_ == nullptr) return; // Shutting down.
1794 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1795 gpr_log(GPR_INFO, "chand=%p: started name re-resolving", chand_);
1796 }
1797 chand_->resolver_->RequestReresolutionLocked();
1798 }
1799
AddTraceEvent(TraceSeverity severity,absl::string_view message)1800 void AddTraceEvent(TraceSeverity severity,
1801 absl::string_view message) override {
1802 if (chand_->resolver_ == nullptr) return; // Shutting down.
1803 if (chand_->channelz_node_ != nullptr) {
1804 chand_->channelz_node_->AddTraceEvent(
1805 ConvertSeverityEnum(severity),
1806 grpc_slice_from_copied_buffer(message.data(), message.size()));
1807 }
1808 }
1809
1810 private:
ConvertSeverityEnum(TraceSeverity severity)1811 static channelz::ChannelTrace::Severity ConvertSeverityEnum(
1812 TraceSeverity severity) {
1813 if (severity == TRACE_INFO) return channelz::ChannelTrace::Info;
1814 if (severity == TRACE_WARNING) return channelz::ChannelTrace::Warning;
1815 return channelz::ChannelTrace::Error;
1816 }
1817
1818 ChannelData* chand_;
1819 };
1820
1821 //
1822 // ChannelData implementation
1823 //
1824
Init(grpc_channel_element * elem,grpc_channel_element_args * args)1825 grpc_error* ChannelData::Init(grpc_channel_element* elem,
1826 grpc_channel_element_args* args) {
1827 GPR_ASSERT(args->is_last);
1828 GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
1829 grpc_error* error = GRPC_ERROR_NONE;
1830 new (elem->channel_data) ChannelData(args, &error);
1831 return error;
1832 }
1833
Destroy(grpc_channel_element * elem)1834 void ChannelData::Destroy(grpc_channel_element* elem) {
1835 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
1836 chand->~ChannelData();
1837 }
1838
GetEnableRetries(const grpc_channel_args * args)1839 bool GetEnableRetries(const grpc_channel_args* args) {
1840 return grpc_channel_arg_get_bool(
1841 grpc_channel_args_find(args, GRPC_ARG_ENABLE_RETRIES), true);
1842 }
1843
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)1844 size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
1845 return static_cast<size_t>(grpc_channel_arg_get_integer(
1846 grpc_channel_args_find(args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE),
1847 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
1848 }
1849
GetSubchannelPool(const grpc_channel_args * args)1850 RefCountedPtr<SubchannelPoolInterface> GetSubchannelPool(
1851 const grpc_channel_args* args) {
1852 const bool use_local_subchannel_pool = grpc_channel_arg_get_bool(
1853 grpc_channel_args_find(args, GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL), false);
1854 if (use_local_subchannel_pool) {
1855 return MakeRefCounted<LocalSubchannelPool>();
1856 }
1857 return GlobalSubchannelPool::instance();
1858 }
1859
GetChannelzNode(const grpc_channel_args * args)1860 channelz::ChannelNode* GetChannelzNode(const grpc_channel_args* args) {
1861 const grpc_arg* arg =
1862 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1863 if (arg != nullptr && arg->type == GRPC_ARG_POINTER) {
1864 return static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1865 }
1866 return nullptr;
1867 }
1868
ChannelData(grpc_channel_element_args * args,grpc_error ** error)1869 ChannelData::ChannelData(grpc_channel_element_args* args, grpc_error** error)
1870 : deadline_checking_enabled_(
1871 grpc_deadline_checking_enabled(args->channel_args)),
1872 enable_retries_(GetEnableRetries(args->channel_args)),
1873 per_rpc_retry_buffer_size_(
1874 GetMaxPerRpcRetryBufferSize(args->channel_args)),
1875 owning_stack_(args->channel_stack),
1876 client_channel_factory_(
1877 ClientChannelFactory::GetFromChannelArgs(args->channel_args)),
1878 channelz_node_(GetChannelzNode(args->channel_args)),
1879 work_serializer_(std::make_shared<WorkSerializer>()),
1880 interested_parties_(grpc_pollset_set_create()),
1881 state_tracker_("client_channel", GRPC_CHANNEL_IDLE),
1882 subchannel_pool_(GetSubchannelPool(args->channel_args)),
1883 disconnect_error_(GRPC_ERROR_NONE) {
1884 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1885 gpr_log(GPR_INFO, "chand=%p: creating client_channel for channel stack %p",
1886 this, owning_stack_);
1887 }
1888 // Start backup polling.
1889 grpc_client_channel_start_backup_polling(interested_parties_);
1890 // Check client channel factory.
1891 if (client_channel_factory_ == nullptr) {
1892 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1893 "Missing client channel factory in args for client channel filter");
1894 return;
1895 }
1896 // Get server name to resolve, using proxy mapper if needed.
1897 const char* server_uri = grpc_channel_arg_get_string(
1898 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVER_URI));
1899 if (server_uri == nullptr) {
1900 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1901 "server URI channel arg missing or wrong type in client channel "
1902 "filter");
1903 return;
1904 }
1905 // Get default service config. If none is specified via the client API,
1906 // we use an empty config.
1907 const char* service_config_json = grpc_channel_arg_get_string(
1908 grpc_channel_args_find(args->channel_args, GRPC_ARG_SERVICE_CONFIG));
1909 if (service_config_json == nullptr) service_config_json = "{}";
1910 *error = GRPC_ERROR_NONE;
1911 default_service_config_ =
1912 ServiceConfig::Create(args->channel_args, service_config_json, error);
1913 if (*error != GRPC_ERROR_NONE) {
1914 default_service_config_.reset();
1915 return;
1916 }
1917 absl::StatusOr<URI> uri = URI::Parse(server_uri);
1918 if (uri.ok() && !uri->path().empty()) {
1919 server_name_ = std::string(absl::StripPrefix(uri->path(), "/"));
1920 }
1921 char* proxy_name = nullptr;
1922 grpc_channel_args* new_args = nullptr;
1923 ProxyMapperRegistry::MapName(server_uri, args->channel_args, &proxy_name,
1924 &new_args);
1925 target_uri_.reset(proxy_name != nullptr ? proxy_name
1926 : gpr_strdup(server_uri));
1927 // Strip out service config channel arg, so that it doesn't affect
1928 // subchannel uniqueness when the args flow down to that layer.
1929 const char* arg_to_remove = GRPC_ARG_SERVICE_CONFIG;
1930 channel_args_ = grpc_channel_args_copy_and_remove(
1931 new_args != nullptr ? new_args : args->channel_args, &arg_to_remove, 1);
1932 grpc_channel_args_destroy(new_args);
1933 keepalive_time_ = grpc_channel_args_find_integer(
1934 channel_args_, GRPC_ARG_KEEPALIVE_TIME_MS,
1935 {-1 /* default value, unset */, 1, INT_MAX});
1936 if (!ResolverRegistry::IsValidTarget(target_uri_.get())) {
1937 std::string error_message =
1938 absl::StrCat("the target uri is not valid: ", target_uri_.get());
1939 *error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_message.c_str());
1940 return;
1941 }
1942 *error = GRPC_ERROR_NONE;
1943 }
1944
~ChannelData()1945 ChannelData::~ChannelData() {
1946 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
1947 gpr_log(GPR_INFO, "chand=%p: destroying channel", this);
1948 }
1949 DestroyResolverAndLbPolicyLocked();
1950 grpc_channel_args_destroy(channel_args_);
1951 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
1952 // Stop backup polling.
1953 grpc_client_channel_stop_backup_polling(interested_parties_);
1954 grpc_pollset_set_destroy(interested_parties_);
1955 GRPC_ERROR_UNREF(disconnect_error_.Load(MemoryOrder::RELAXED));
1956 }
1957
ChooseLbPolicy(const Resolver::Result & resolver_result,const internal::ClientChannelGlobalParsedConfig * parsed_service_config)1958 RefCountedPtr<LoadBalancingPolicy::Config> ChooseLbPolicy(
1959 const Resolver::Result& resolver_result,
1960 const internal::ClientChannelGlobalParsedConfig* parsed_service_config) {
1961 // Prefer the LB policy config found in the service config.
1962 if (parsed_service_config->parsed_lb_config() != nullptr) {
1963 return parsed_service_config->parsed_lb_config();
1964 }
1965 // Try the deprecated LB policy name from the service config.
1966 // If not, try the setting from channel args.
1967 const char* policy_name = nullptr;
1968 if (!parsed_service_config->parsed_deprecated_lb_policy().empty()) {
1969 policy_name = parsed_service_config->parsed_deprecated_lb_policy().c_str();
1970 } else {
1971 const grpc_arg* channel_arg =
1972 grpc_channel_args_find(resolver_result.args, GRPC_ARG_LB_POLICY_NAME);
1973 policy_name = grpc_channel_arg_get_string(channel_arg);
1974 }
1975 // Use pick_first if nothing was specified and we didn't select grpclb
1976 // above.
1977 if (policy_name == nullptr) policy_name = "pick_first";
1978 // Now that we have the policy name, construct an empty config for it.
1979 Json config_json = Json::Array{Json::Object{
1980 {policy_name, Json::Object{}},
1981 }};
1982 grpc_error* parse_error = GRPC_ERROR_NONE;
1983 auto lb_policy_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1984 config_json, &parse_error);
1985 // The policy name came from one of three places:
1986 // - The deprecated loadBalancingPolicy field in the service config,
1987 // in which case the code in ClientChannelServiceConfigParser
1988 // already verified that the policy does not require a config.
1989 // - One of the hard-coded values here, all of which are known to not
1990 // require a config.
1991 // - A channel arg, in which case the application did something that
1992 // is a misuse of our API.
1993 // In the first two cases, these assertions will always be true. In
1994 // the last case, this is probably fine for now.
1995 // TODO(roth): If the last case becomes a problem, add better error
1996 // handling here.
1997 GPR_ASSERT(lb_policy_config != nullptr);
1998 GPR_ASSERT(parse_error == GRPC_ERROR_NONE);
1999 return lb_policy_config;
2000 }
2001
OnResolverResultChangedLocked(Resolver::Result result)2002 void ChannelData::OnResolverResultChangedLocked(Resolver::Result result) {
2003 // Handle race conditions.
2004 if (resolver_ == nullptr) return;
2005 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2006 gpr_log(GPR_INFO, "chand=%p: got resolver result", this);
2007 }
2008 // We only want to trace the address resolution in the follow cases:
2009 // (a) Address resolution resulted in service config change.
2010 // (b) Address resolution that causes number of backends to go from
2011 // zero to non-zero.
2012 // (c) Address resolution that causes number of backends to go from
2013 // non-zero to zero.
2014 // (d) Address resolution that causes a new LB policy to be created.
2015 //
2016 // We track a list of strings to eventually be concatenated and traced.
2017 absl::InlinedVector<const char*, 3> trace_strings;
2018 if (result.addresses.empty() && previous_resolution_contained_addresses_) {
2019 trace_strings.push_back("Address list became empty");
2020 } else if (!result.addresses.empty() &&
2021 !previous_resolution_contained_addresses_) {
2022 trace_strings.push_back("Address list became non-empty");
2023 }
2024 previous_resolution_contained_addresses_ = !result.addresses.empty();
2025 // The result of grpc_error_string() is owned by the error itself.
2026 // We're storing that string in trace_strings, so we need to make sure
2027 // that the error lives until we're done with the string.
2028 grpc_error* service_config_error =
2029 GRPC_ERROR_REF(result.service_config_error);
2030 if (service_config_error != GRPC_ERROR_NONE) {
2031 trace_strings.push_back(grpc_error_string(service_config_error));
2032 }
2033 // Choose the service config.
2034 RefCountedPtr<ServiceConfig> service_config;
2035 RefCountedPtr<ConfigSelector> config_selector;
2036 if (service_config_error != GRPC_ERROR_NONE) {
2037 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2038 gpr_log(GPR_INFO, "chand=%p: resolver returned service config error: %s",
2039 this, grpc_error_string(service_config_error));
2040 }
2041 // If the service config was invalid, then fallback to the
2042 // previously returned service config.
2043 if (saved_service_config_ != nullptr) {
2044 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2045 gpr_log(GPR_INFO,
2046 "chand=%p: resolver returned invalid service config. "
2047 "Continuing to use previous service config.",
2048 this);
2049 }
2050 service_config = saved_service_config_;
2051 config_selector = saved_config_selector_;
2052 } else {
2053 // We received an invalid service config and we don't have a
2054 // previous service config to fall back to. Put the channel into
2055 // TRANSIENT_FAILURE.
2056 OnResolverErrorLocked(GRPC_ERROR_REF(service_config_error));
2057 trace_strings.push_back("no valid service config");
2058 }
2059 } else if (result.service_config == nullptr) {
2060 // Resolver did not return any service config.
2061 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2062 gpr_log(GPR_INFO,
2063 "chand=%p: resolver returned no service config. Using default "
2064 "service config for channel.",
2065 this);
2066 }
2067 service_config = default_service_config_;
2068 } else {
2069 // Use ServiceConfig and ConfigSelector returned by resolver.
2070 service_config = result.service_config;
2071 config_selector = ConfigSelector::GetFromChannelArgs(*result.args);
2072 }
2073 if (service_config != nullptr) {
2074 // Extract global config for client channel.
2075 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2076 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2077 service_config->GetGlobalParsedConfig(
2078 internal::ClientChannelServiceConfigParser::ParserIndex()));
2079 // Choose LB policy config.
2080 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config =
2081 ChooseLbPolicy(result, parsed_service_config);
2082 // Check if the ServiceConfig has changed.
2083 const bool service_config_changed =
2084 saved_service_config_ == nullptr ||
2085 service_config->json_string() != saved_service_config_->json_string();
2086 // Check if the ConfigSelector has changed.
2087 const bool config_selector_changed = !ConfigSelector::Equals(
2088 saved_config_selector_.get(), config_selector.get());
2089 // If either has changed, apply the global parameters now.
2090 if (service_config_changed || config_selector_changed) {
2091 // Update service config in control plane.
2092 UpdateServiceConfigInControlPlaneLocked(
2093 std::move(service_config), std::move(config_selector),
2094 parsed_service_config, lb_policy_config->name());
2095 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2096 gpr_log(GPR_INFO, "chand=%p: service config not changed", this);
2097 }
2098 // Create or update LB policy, as needed.
2099 CreateOrUpdateLbPolicyLocked(std::move(lb_policy_config),
2100 std::move(result));
2101 if (service_config_changed || config_selector_changed) {
2102 // Start using new service config for calls.
2103 // This needs to happen after the LB policy has been updated, since
2104 // the ConfigSelector may need the LB policy to know about new
2105 // destinations before it can send RPCs to those destinations.
2106 UpdateServiceConfigInDataPlaneLocked();
2107 // TODO(ncteisen): might be worth somehow including a snippet of the
2108 // config in the trace, at the risk of bloating the trace logs.
2109 trace_strings.push_back("Service config changed");
2110 }
2111 }
2112 // Add channel trace event.
2113 if (!trace_strings.empty()) {
2114 std::string message =
2115 absl::StrCat("Resolution event: ", absl::StrJoin(trace_strings, ", "));
2116 if (channelz_node_ != nullptr) {
2117 channelz_node_->AddTraceEvent(channelz::ChannelTrace::Severity::Info,
2118 grpc_slice_from_cpp_string(message));
2119 }
2120 }
2121 GRPC_ERROR_UNREF(service_config_error);
2122 }
2123
OnResolverErrorLocked(grpc_error * error)2124 void ChannelData::OnResolverErrorLocked(grpc_error* error) {
2125 if (resolver_ == nullptr) {
2126 GRPC_ERROR_UNREF(error);
2127 return;
2128 }
2129 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2130 gpr_log(GPR_INFO, "chand=%p: resolver transient failure: %s", this,
2131 grpc_error_string(error));
2132 }
2133 // If we already have an LB policy from a previous resolution
2134 // result, then we continue to let it set the connectivity state.
2135 // Otherwise, we go into TRANSIENT_FAILURE.
2136 if (lb_policy_ == nullptr) {
2137 grpc_error* state_error = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
2138 "Resolver transient failure", &error, 1);
2139 {
2140 MutexLock lock(&resolution_mu_);
2141 // Update resolver transient failure.
2142 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2143 resolver_transient_failure_error_ = GRPC_ERROR_REF(state_error);
2144 // Process calls that were queued waiting for the resolver result.
2145 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2146 call = call->next) {
2147 grpc_call_element* elem = call->elem;
2148 CallData* calld = static_cast<CallData*>(elem->call_data);
2149 grpc_error* error = GRPC_ERROR_NONE;
2150 if (calld->CheckResolutionLocked(elem, &error)) {
2151 calld->AsyncResolutionDone(elem, error);
2152 }
2153 }
2154 }
2155 // Update connectivity state.
2156 UpdateStateAndPickerLocked(
2157 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(state_error),
2158 "resolver failure",
2159 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2160 state_error));
2161 }
2162 GRPC_ERROR_UNREF(error);
2163 }
2164
CreateOrUpdateLbPolicyLocked(RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,Resolver::Result result)2165 void ChannelData::CreateOrUpdateLbPolicyLocked(
2166 RefCountedPtr<LoadBalancingPolicy::Config> lb_policy_config,
2167 Resolver::Result result) {
2168 // Construct update.
2169 LoadBalancingPolicy::UpdateArgs update_args;
2170 update_args.addresses = std::move(result.addresses);
2171 update_args.config = std::move(lb_policy_config);
2172 // Remove the config selector from channel args so that we're not holding
2173 // unnecessary refs that cause it to be destroyed somewhere other than in the
2174 // WorkSerializer.
2175 const char* arg_name = GRPC_ARG_CONFIG_SELECTOR;
2176 update_args.args =
2177 grpc_channel_args_copy_and_remove(result.args, &arg_name, 1);
2178 // Create policy if needed.
2179 if (lb_policy_ == nullptr) {
2180 lb_policy_ = CreateLbPolicyLocked(*update_args.args);
2181 }
2182 // Update the policy.
2183 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2184 gpr_log(GPR_INFO, "chand=%p: Updating child policy %p", this,
2185 lb_policy_.get());
2186 }
2187 lb_policy_->UpdateLocked(std::move(update_args));
2188 }
2189
2190 // Creates a new LB policy.
CreateLbPolicyLocked(const grpc_channel_args & args)2191 OrphanablePtr<LoadBalancingPolicy> ChannelData::CreateLbPolicyLocked(
2192 const grpc_channel_args& args) {
2193 LoadBalancingPolicy::Args lb_policy_args;
2194 lb_policy_args.work_serializer = work_serializer_;
2195 lb_policy_args.channel_control_helper =
2196 absl::make_unique<ClientChannelControlHelper>(this);
2197 lb_policy_args.args = &args;
2198 OrphanablePtr<LoadBalancingPolicy> lb_policy =
2199 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
2200 &grpc_client_channel_routing_trace);
2201 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2202 gpr_log(GPR_INFO, "chand=%p: created new LB policy %p", this,
2203 lb_policy.get());
2204 }
2205 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
2206 interested_parties_);
2207 return lb_policy;
2208 }
2209
AddResolverQueuedCall(ResolverQueuedCall * call,grpc_polling_entity * pollent)2210 void ChannelData::AddResolverQueuedCall(ResolverQueuedCall* call,
2211 grpc_polling_entity* pollent) {
2212 // Add call to queued calls list.
2213 call->next = resolver_queued_calls_;
2214 resolver_queued_calls_ = call;
2215 // Add call's pollent to channel's interested_parties, so that I/O
2216 // can be done under the call's CQ.
2217 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2218 }
2219
RemoveResolverQueuedCall(ResolverQueuedCall * to_remove,grpc_polling_entity * pollent)2220 void ChannelData::RemoveResolverQueuedCall(ResolverQueuedCall* to_remove,
2221 grpc_polling_entity* pollent) {
2222 // Remove call's pollent from channel's interested_parties.
2223 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2224 // Remove from queued calls list.
2225 for (ResolverQueuedCall** call = &resolver_queued_calls_; *call != nullptr;
2226 call = &(*call)->next) {
2227 if (*call == to_remove) {
2228 *call = to_remove->next;
2229 return;
2230 }
2231 }
2232 }
2233
UpdateServiceConfigInControlPlaneLocked(RefCountedPtr<ServiceConfig> service_config,RefCountedPtr<ConfigSelector> config_selector,const internal::ClientChannelGlobalParsedConfig * parsed_service_config,const char * lb_policy_name)2234 void ChannelData::UpdateServiceConfigInControlPlaneLocked(
2235 RefCountedPtr<ServiceConfig> service_config,
2236 RefCountedPtr<ConfigSelector> config_selector,
2237 const internal::ClientChannelGlobalParsedConfig* parsed_service_config,
2238 const char* lb_policy_name) {
2239 UniquePtr<char> service_config_json(
2240 gpr_strdup(service_config->json_string().c_str()));
2241 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2242 gpr_log(GPR_INFO,
2243 "chand=%p: resolver returned updated service config: \"%s\"", this,
2244 service_config_json.get());
2245 }
2246 // Save service config.
2247 saved_service_config_ = std::move(service_config);
2248 // Update health check service name if needed.
2249 if (health_check_service_name_ !=
2250 parsed_service_config->health_check_service_name()) {
2251 health_check_service_name_ =
2252 parsed_service_config->health_check_service_name();
2253 // Update health check service name used by existing subchannel wrappers.
2254 for (auto* subchannel_wrapper : subchannel_wrappers_) {
2255 subchannel_wrapper->UpdateHealthCheckServiceName(
2256 health_check_service_name_);
2257 }
2258 }
2259 // Swap out the data used by GetChannelInfo().
2260 UniquePtr<char> lb_policy_name_owned(gpr_strdup(lb_policy_name));
2261 {
2262 MutexLock lock(&info_mu_);
2263 info_lb_policy_name_ = std::move(lb_policy_name_owned);
2264 info_service_config_json_ = std::move(service_config_json);
2265 }
2266 // Save config selector.
2267 saved_config_selector_ = std::move(config_selector);
2268 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2269 gpr_log(GPR_INFO, "chand=%p: using ConfigSelector %p", this,
2270 saved_config_selector_.get());
2271 }
2272 }
2273
UpdateServiceConfigInDataPlaneLocked()2274 void ChannelData::UpdateServiceConfigInDataPlaneLocked() {
2275 // Grab ref to service config.
2276 RefCountedPtr<ServiceConfig> service_config = saved_service_config_;
2277 // Grab ref to config selector. Use default if resolver didn't supply one.
2278 RefCountedPtr<ConfigSelector> config_selector = saved_config_selector_;
2279 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2280 gpr_log(GPR_INFO, "chand=%p: switching to ConfigSelector %p", this,
2281 saved_config_selector_.get());
2282 }
2283 if (config_selector == nullptr) {
2284 config_selector =
2285 MakeRefCounted<DefaultConfigSelector>(saved_service_config_);
2286 }
2287 // Get retry throttle data from service config.
2288 const internal::ClientChannelGlobalParsedConfig* parsed_service_config =
2289 static_cast<const internal::ClientChannelGlobalParsedConfig*>(
2290 saved_service_config_->GetGlobalParsedConfig(
2291 internal::ClientChannelServiceConfigParser::ParserIndex()));
2292 absl::optional<internal::ClientChannelGlobalParsedConfig::RetryThrottling>
2293 retry_throttle_config = parsed_service_config->retry_throttling();
2294 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data;
2295 if (retry_throttle_config.has_value()) {
2296 retry_throttle_data = internal::ServerRetryThrottleMap::GetDataForServer(
2297 server_name_, retry_throttle_config.value().max_milli_tokens,
2298 retry_throttle_config.value().milli_token_ratio);
2299 }
2300 // Construct per-LB filter stack.
2301 std::vector<const grpc_channel_filter*> filters =
2302 config_selector->GetFilters();
2303 filters.push_back(&kDynamicTerminationFilterVtable);
2304 absl::InlinedVector<grpc_arg, 2> args_to_add;
2305 args_to_add.push_back(grpc_channel_arg_pointer_create(
2306 const_cast<char*>(GRPC_ARG_CLIENT_CHANNEL_DATA), this,
2307 &kChannelDataArgPointerVtable));
2308 if (retry_throttle_data != nullptr) {
2309 args_to_add.push_back(grpc_channel_arg_pointer_create(
2310 const_cast<char*>(GRPC_ARG_RETRY_THROTTLE_DATA),
2311 retry_throttle_data.get(), &kRetryThrottleDataArgPointerVtable));
2312 }
2313 grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
2314 channel_args_, args_to_add.data(), args_to_add.size());
2315 RefCountedPtr<DynamicFilters> dynamic_filters =
2316 DynamicFilters::Create(new_args, std::move(filters));
2317 GPR_ASSERT(dynamic_filters != nullptr);
2318 grpc_channel_args_destroy(new_args);
2319 // Grab data plane lock to update service config.
2320 //
2321 // We defer unreffing the old values (and deallocating memory) until
2322 // after releasing the lock to keep the critical section small.
2323 std::set<grpc_call_element*> calls_pending_resolver_result;
2324 {
2325 MutexLock lock(&resolution_mu_);
2326 GRPC_ERROR_UNREF(resolver_transient_failure_error_);
2327 resolver_transient_failure_error_ = GRPC_ERROR_NONE;
2328 // Update service config.
2329 received_service_config_data_ = true;
2330 // Old values will be unreffed after lock is released.
2331 service_config_.swap(service_config);
2332 config_selector_.swap(config_selector);
2333 dynamic_filters_.swap(dynamic_filters);
2334 // Process calls that were queued waiting for the resolver result.
2335 for (ResolverQueuedCall* call = resolver_queued_calls_; call != nullptr;
2336 call = call->next) {
2337 grpc_call_element* elem = call->elem;
2338 CallData* calld = static_cast<CallData*>(elem->call_data);
2339 grpc_error* error = GRPC_ERROR_NONE;
2340 if (calld->CheckResolutionLocked(elem, &error)) {
2341 calld->AsyncResolutionDone(elem, error);
2342 }
2343 }
2344 }
2345 // Old values will be unreffed after lock is released when they go out
2346 // of scope.
2347 }
2348
CreateResolverLocked()2349 void ChannelData::CreateResolverLocked() {
2350 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2351 gpr_log(GPR_INFO, "chand=%p: starting name resolution", this);
2352 }
2353 resolver_ = ResolverRegistry::CreateResolver(
2354 target_uri_.get(), channel_args_, interested_parties_, work_serializer_,
2355 absl::make_unique<ResolverResultHandler>(this));
2356 // Since the validity of the args was checked when the channel was created,
2357 // CreateResolver() must return a non-null result.
2358 GPR_ASSERT(resolver_ != nullptr);
2359 UpdateStateAndPickerLocked(
2360 GRPC_CHANNEL_CONNECTING, absl::Status(), "started resolving",
2361 absl::make_unique<LoadBalancingPolicy::QueuePicker>(nullptr));
2362 resolver_->StartLocked();
2363 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2364 gpr_log(GPR_INFO, "chand=%p: created resolver=%p", this, resolver_.get());
2365 }
2366 }
2367
DestroyResolverAndLbPolicyLocked()2368 void ChannelData::DestroyResolverAndLbPolicyLocked() {
2369 if (resolver_ != nullptr) {
2370 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2371 gpr_log(GPR_INFO, "chand=%p: shutting down resolver=%p", this,
2372 resolver_.get());
2373 }
2374 resolver_.reset();
2375 if (lb_policy_ != nullptr) {
2376 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2377 gpr_log(GPR_INFO, "chand=%p: shutting down lb_policy=%p", this,
2378 lb_policy_.get());
2379 }
2380 grpc_pollset_set_del_pollset_set(lb_policy_->interested_parties(),
2381 interested_parties_);
2382 lb_policy_.reset();
2383 }
2384 }
2385 }
2386
UpdateStateAndPickerLocked(grpc_connectivity_state state,const absl::Status & status,const char * reason,std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker)2387 void ChannelData::UpdateStateAndPickerLocked(
2388 grpc_connectivity_state state, const absl::Status& status,
2389 const char* reason,
2390 std::unique_ptr<LoadBalancingPolicy::SubchannelPicker> picker) {
2391 // Special case for IDLE and SHUTDOWN states.
2392 if (picker == nullptr || state == GRPC_CHANNEL_SHUTDOWN) {
2393 saved_service_config_.reset();
2394 saved_config_selector_.reset();
2395 // Acquire resolution lock to update config selector and associated state.
2396 // To minimize lock contention, we wait to unref these objects until
2397 // after we release the lock.
2398 RefCountedPtr<ServiceConfig> service_config_to_unref;
2399 RefCountedPtr<ConfigSelector> config_selector_to_unref;
2400 RefCountedPtr<DynamicFilters> dynamic_filters_to_unref;
2401 {
2402 MutexLock lock(&resolution_mu_);
2403 received_service_config_data_ = false;
2404 service_config_to_unref = std::move(service_config_);
2405 config_selector_to_unref = std::move(config_selector_);
2406 dynamic_filters_to_unref = std::move(dynamic_filters_);
2407 }
2408 }
2409 // Update connectivity state.
2410 state_tracker_.SetState(state, status, reason);
2411 if (channelz_node_ != nullptr) {
2412 channelz_node_->SetConnectivityState(state);
2413 channelz_node_->AddTraceEvent(
2414 channelz::ChannelTrace::Severity::Info,
2415 grpc_slice_from_static_string(
2416 channelz::ChannelNode::GetChannelConnectivityStateChangeString(
2417 state)));
2418 }
2419 // Grab data plane lock to do subchannel updates and update the picker.
2420 //
2421 // Note that we want to minimize the work done while holding the data
2422 // plane lock, to keep the critical section small. So, for all of the
2423 // objects that we might wind up unreffing here, we actually hold onto
2424 // the refs until after we release the lock, and then unref them at
2425 // that point. This includes the following:
2426 // - refs to subchannel wrappers in the keys of pending_subchannel_updates_
2427 // - ownership of the existing picker in picker_
2428 {
2429 MutexLock lock(&data_plane_mu_);
2430 // Handle subchannel updates.
2431 for (auto& p : pending_subchannel_updates_) {
2432 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2433 gpr_log(GPR_INFO,
2434 "chand=%p: updating subchannel wrapper %p data plane "
2435 "connected_subchannel to %p",
2436 this, p.first.get(), p.second.get());
2437 }
2438 // Note: We do not remove the entry from pending_subchannel_updates_
2439 // here, since this would unref the subchannel wrapper; instead,
2440 // we wait until we've released the lock to clear the map.
2441 p.first->set_connected_subchannel_in_data_plane(std::move(p.second));
2442 }
2443 // Swap out the picker.
2444 // Note: Original value will be destroyed after the lock is released.
2445 picker_.swap(picker);
2446 // Re-process queued picks.
2447 for (LbQueuedCall* call = lb_queued_calls_; call != nullptr;
2448 call = call->next) {
2449 grpc_error* error = GRPC_ERROR_NONE;
2450 if (call->lb_call->PickSubchannelLocked(&error)) {
2451 call->lb_call->AsyncPickDone(error);
2452 }
2453 }
2454 }
2455 // Clear the pending update map after releasing the lock, to keep the
2456 // critical section small.
2457 pending_subchannel_updates_.clear();
2458 }
2459
DoPingLocked(grpc_transport_op * op)2460 grpc_error* ChannelData::DoPingLocked(grpc_transport_op* op) {
2461 if (state_tracker_.state() != GRPC_CHANNEL_READY) {
2462 return GRPC_ERROR_CREATE_FROM_STATIC_STRING("channel not connected");
2463 }
2464 LoadBalancingPolicy::PickResult result =
2465 picker_->Pick(LoadBalancingPolicy::PickArgs());
2466 ConnectedSubchannel* connected_subchannel = nullptr;
2467 if (result.subchannel != nullptr) {
2468 SubchannelWrapper* subchannel =
2469 static_cast<SubchannelWrapper*>(result.subchannel.get());
2470 connected_subchannel = subchannel->connected_subchannel();
2471 }
2472 if (connected_subchannel != nullptr) {
2473 connected_subchannel->Ping(op->send_ping.on_initiate, op->send_ping.on_ack);
2474 } else {
2475 if (result.error == GRPC_ERROR_NONE) {
2476 result.error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
2477 "LB policy dropped call on ping");
2478 }
2479 }
2480 return result.error;
2481 }
2482
StartTransportOpLocked(grpc_transport_op * op)2483 void ChannelData::StartTransportOpLocked(grpc_transport_op* op) {
2484 // Connectivity watch.
2485 if (op->start_connectivity_watch != nullptr) {
2486 state_tracker_.AddWatcher(op->start_connectivity_watch_state,
2487 std::move(op->start_connectivity_watch));
2488 }
2489 if (op->stop_connectivity_watch != nullptr) {
2490 state_tracker_.RemoveWatcher(op->stop_connectivity_watch);
2491 }
2492 // Ping.
2493 if (op->send_ping.on_initiate != nullptr || op->send_ping.on_ack != nullptr) {
2494 grpc_error* error = DoPingLocked(op);
2495 if (error != GRPC_ERROR_NONE) {
2496 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_initiate,
2497 GRPC_ERROR_REF(error));
2498 ExecCtx::Run(DEBUG_LOCATION, op->send_ping.on_ack, error);
2499 }
2500 op->bind_pollset = nullptr;
2501 op->send_ping.on_initiate = nullptr;
2502 op->send_ping.on_ack = nullptr;
2503 }
2504 // Reset backoff.
2505 if (op->reset_connect_backoff) {
2506 if (lb_policy_ != nullptr) {
2507 lb_policy_->ResetBackoffLocked();
2508 }
2509 }
2510 // Disconnect or enter IDLE.
2511 if (op->disconnect_with_error != GRPC_ERROR_NONE) {
2512 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2513 gpr_log(GPR_INFO, "chand=%p: disconnect_with_error: %s", this,
2514 grpc_error_string(op->disconnect_with_error));
2515 }
2516 DestroyResolverAndLbPolicyLocked();
2517 intptr_t value;
2518 if (grpc_error_get_int(op->disconnect_with_error,
2519 GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, &value) &&
2520 static_cast<grpc_connectivity_state>(value) == GRPC_CHANNEL_IDLE) {
2521 if (disconnect_error() == GRPC_ERROR_NONE) {
2522 // Enter IDLE state.
2523 UpdateStateAndPickerLocked(GRPC_CHANNEL_IDLE, absl::Status(),
2524 "channel entering IDLE", nullptr);
2525 }
2526 GRPC_ERROR_UNREF(op->disconnect_with_error);
2527 } else {
2528 // Disconnect.
2529 GPR_ASSERT(disconnect_error_.Load(MemoryOrder::RELAXED) ==
2530 GRPC_ERROR_NONE);
2531 disconnect_error_.Store(op->disconnect_with_error, MemoryOrder::RELEASE);
2532 UpdateStateAndPickerLocked(
2533 GRPC_CHANNEL_SHUTDOWN, absl::Status(), "shutdown from API",
2534 absl::make_unique<LoadBalancingPolicy::TransientFailurePicker>(
2535 GRPC_ERROR_REF(op->disconnect_with_error)));
2536 }
2537 }
2538 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "start_transport_op");
2539 ExecCtx::Run(DEBUG_LOCATION, op->on_consumed, GRPC_ERROR_NONE);
2540 }
2541
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)2542 void ChannelData::StartTransportOp(grpc_channel_element* elem,
2543 grpc_transport_op* op) {
2544 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2545 GPR_ASSERT(op->set_accept_stream == false);
2546 // Handle bind_pollset.
2547 if (op->bind_pollset != nullptr) {
2548 grpc_pollset_set_add_pollset(chand->interested_parties_, op->bind_pollset);
2549 }
2550 // Pop into control plane work_serializer for remaining ops.
2551 GRPC_CHANNEL_STACK_REF(chand->owning_stack_, "start_transport_op");
2552 chand->work_serializer_->Run(
2553 [chand, op]() { chand->StartTransportOpLocked(op); }, DEBUG_LOCATION);
2554 }
2555
GetChannelInfo(grpc_channel_element * elem,const grpc_channel_info * info)2556 void ChannelData::GetChannelInfo(grpc_channel_element* elem,
2557 const grpc_channel_info* info) {
2558 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2559 MutexLock lock(&chand->info_mu_);
2560 if (info->lb_policy_name != nullptr) {
2561 *info->lb_policy_name = gpr_strdup(chand->info_lb_policy_name_.get());
2562 }
2563 if (info->service_config_json != nullptr) {
2564 *info->service_config_json =
2565 gpr_strdup(chand->info_service_config_json_.get());
2566 }
2567 }
2568
AddLbQueuedCall(LbQueuedCall * call,grpc_polling_entity * pollent)2569 void ChannelData::AddLbQueuedCall(LbQueuedCall* call,
2570 grpc_polling_entity* pollent) {
2571 // Add call to queued picks list.
2572 call->next = lb_queued_calls_;
2573 lb_queued_calls_ = call;
2574 // Add call's pollent to channel's interested_parties, so that I/O
2575 // can be done under the call's CQ.
2576 grpc_polling_entity_add_to_pollset_set(pollent, interested_parties_);
2577 }
2578
RemoveLbQueuedCall(LbQueuedCall * to_remove,grpc_polling_entity * pollent)2579 void ChannelData::RemoveLbQueuedCall(LbQueuedCall* to_remove,
2580 grpc_polling_entity* pollent) {
2581 // Remove call's pollent from channel's interested_parties.
2582 grpc_polling_entity_del_from_pollset_set(pollent, interested_parties_);
2583 // Remove from queued picks list.
2584 for (LbQueuedCall** call = &lb_queued_calls_; *call != nullptr;
2585 call = &(*call)->next) {
2586 if (*call == to_remove) {
2587 *call = to_remove->next;
2588 return;
2589 }
2590 }
2591 }
2592
2593 RefCountedPtr<ConnectedSubchannel>
GetConnectedSubchannelInDataPlane(SubchannelInterface * subchannel) const2594 ChannelData::GetConnectedSubchannelInDataPlane(
2595 SubchannelInterface* subchannel) const {
2596 SubchannelWrapper* subchannel_wrapper =
2597 static_cast<SubchannelWrapper*>(subchannel);
2598 ConnectedSubchannel* connected_subchannel =
2599 subchannel_wrapper->connected_subchannel_in_data_plane();
2600 if (connected_subchannel == nullptr) return nullptr;
2601 return connected_subchannel->Ref();
2602 }
2603
TryToConnectLocked()2604 void ChannelData::TryToConnectLocked() {
2605 if (lb_policy_ != nullptr) {
2606 lb_policy_->ExitIdleLocked();
2607 } else if (resolver_ == nullptr) {
2608 CreateResolverLocked();
2609 }
2610 GRPC_CHANNEL_STACK_UNREF(owning_stack_, "TryToConnect");
2611 }
2612
CheckConnectivityState(bool try_to_connect)2613 grpc_connectivity_state ChannelData::CheckConnectivityState(
2614 bool try_to_connect) {
2615 grpc_connectivity_state out = state_tracker_.state();
2616 if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
2617 GRPC_CHANNEL_STACK_REF(owning_stack_, "TryToConnect");
2618 work_serializer_->Run([this]() { TryToConnectLocked(); }, DEBUG_LOCATION);
2619 }
2620 return out;
2621 }
2622
AddConnectivityWatcher(grpc_connectivity_state initial_state,OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)2623 void ChannelData::AddConnectivityWatcher(
2624 grpc_connectivity_state initial_state,
2625 OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
2626 new ConnectivityWatcherAdder(this, initial_state, std::move(watcher));
2627 }
2628
RemoveConnectivityWatcher(AsyncConnectivityStateWatcherInterface * watcher)2629 void ChannelData::RemoveConnectivityWatcher(
2630 AsyncConnectivityStateWatcherInterface* watcher) {
2631 new ConnectivityWatcherRemover(this, watcher);
2632 }
2633
2634 //
2635 // CallData implementation
2636 //
2637
CallData(grpc_call_element * elem,const ChannelData & chand,const grpc_call_element_args & args)2638 CallData::CallData(grpc_call_element* elem, const ChannelData& chand,
2639 const grpc_call_element_args& args)
2640 : deadline_state_(elem, args,
2641 GPR_LIKELY(chand.deadline_checking_enabled())
2642 ? args.deadline
2643 : GRPC_MILLIS_INF_FUTURE),
2644 path_(grpc_slice_ref_internal(args.path)),
2645 call_start_time_(args.start_time),
2646 deadline_(args.deadline),
2647 arena_(args.arena),
2648 owning_call_(args.call_stack),
2649 call_combiner_(args.call_combiner),
2650 call_context_(args.context) {
2651 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2652 gpr_log(GPR_INFO, "chand=%p calld=%p: created call", &chand, this);
2653 }
2654 }
2655
~CallData()2656 CallData::~CallData() {
2657 grpc_slice_unref_internal(path_);
2658 GRPC_ERROR_UNREF(cancel_error_);
2659 // Make sure there are no remaining pending batches.
2660 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2661 GPR_ASSERT(pending_batches_[i] == nullptr);
2662 }
2663 }
2664
Init(grpc_call_element * elem,const grpc_call_element_args * args)2665 grpc_error* CallData::Init(grpc_call_element* elem,
2666 const grpc_call_element_args* args) {
2667 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2668 new (elem->call_data) CallData(elem, *chand, *args);
2669 return GRPC_ERROR_NONE;
2670 }
2671
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)2672 void CallData::Destroy(grpc_call_element* elem,
2673 const grpc_call_final_info* /*final_info*/,
2674 grpc_closure* then_schedule_closure) {
2675 CallData* calld = static_cast<CallData*>(elem->call_data);
2676 RefCountedPtr<DynamicFilters::Call> dynamic_call =
2677 std::move(calld->dynamic_call_);
2678 calld->~CallData();
2679 if (GPR_LIKELY(dynamic_call != nullptr)) {
2680 dynamic_call->SetAfterCallStackDestroy(then_schedule_closure);
2681 } else {
2682 // TODO(yashkt) : This can potentially be a Closure::Run
2683 ExecCtx::Run(DEBUG_LOCATION, then_schedule_closure, GRPC_ERROR_NONE);
2684 }
2685 }
2686
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2687 void CallData::StartTransportStreamOpBatch(
2688 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
2689 GPR_TIMER_SCOPE("cc_start_transport_stream_op_batch", 0);
2690 CallData* calld = static_cast<CallData*>(elem->call_data);
2691 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2692 if (GPR_LIKELY(chand->deadline_checking_enabled())) {
2693 grpc_deadline_state_client_start_transport_stream_op_batch(elem, batch);
2694 }
2695 // Intercept recv_initial_metadata for config selector on-committed callback.
2696 if (batch->recv_initial_metadata) {
2697 calld->InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(batch);
2698 }
2699 // If we've previously been cancelled, immediately fail any new batches.
2700 if (GPR_UNLIKELY(calld->cancel_error_ != GRPC_ERROR_NONE)) {
2701 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2702 gpr_log(GPR_INFO, "chand=%p calld=%p: failing batch with error: %s",
2703 chand, calld, grpc_error_string(calld->cancel_error_));
2704 }
2705 // Note: This will release the call combiner.
2706 grpc_transport_stream_op_batch_finish_with_failure(
2707 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2708 return;
2709 }
2710 // Handle cancellation.
2711 if (GPR_UNLIKELY(batch->cancel_stream)) {
2712 // Stash a copy of cancel_error in our call data, so that we can use
2713 // it for subsequent operations. This ensures that if the call is
2714 // cancelled before any batches are passed down (e.g., if the deadline
2715 // is in the past when the call starts), we can return the right
2716 // error to the caller when the first batch does get passed down.
2717 GRPC_ERROR_UNREF(calld->cancel_error_);
2718 calld->cancel_error_ =
2719 GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
2720 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2721 gpr_log(GPR_INFO, "chand=%p calld=%p: recording cancel_error=%s", chand,
2722 calld, grpc_error_string(calld->cancel_error_));
2723 }
2724 // If we do not have a dynamic call (i.e., name resolution has not
2725 // yet completed), fail all pending batches. Otherwise, send the
2726 // cancellation down to the dynamic call.
2727 if (calld->dynamic_call_ == nullptr) {
2728 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(calld->cancel_error_),
2729 NoYieldCallCombiner);
2730 // Note: This will release the call combiner.
2731 grpc_transport_stream_op_batch_finish_with_failure(
2732 batch, GRPC_ERROR_REF(calld->cancel_error_), calld->call_combiner_);
2733 } else {
2734 // Note: This will release the call combiner.
2735 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2736 }
2737 return;
2738 }
2739 // Add the batch to the pending list.
2740 calld->PendingBatchesAdd(elem, batch);
2741 // Check if we've already created a dynamic call.
2742 // Note that once we have done so, we do not need to acquire the channel's
2743 // resolution mutex, which is more efficient (especially for streaming calls).
2744 if (calld->dynamic_call_ != nullptr) {
2745 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2746 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on dynamic_call=%p",
2747 chand, calld, calld->dynamic_call_.get());
2748 }
2749 calld->PendingBatchesResume(elem);
2750 return;
2751 }
2752 // We do not yet have a dynamic call.
2753 // For batches containing a send_initial_metadata op, acquire the
2754 // channel's resolution mutex to apply the service config to the call,
2755 // after which we will create a dynamic call.
2756 if (GPR_LIKELY(batch->send_initial_metadata)) {
2757 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2758 gpr_log(GPR_INFO,
2759 "chand=%p calld=%p: grabbing resolution mutex to apply service "
2760 "config",
2761 chand, calld);
2762 }
2763 CheckResolution(elem, GRPC_ERROR_NONE);
2764 } else {
2765 // For all other batches, release the call combiner.
2766 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2767 gpr_log(GPR_INFO,
2768 "chand=%p calld=%p: saved batch, yielding call combiner", chand,
2769 calld);
2770 }
2771 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
2772 "batch does not include send_initial_metadata");
2773 }
2774 }
2775
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)2776 void CallData::SetPollent(grpc_call_element* elem,
2777 grpc_polling_entity* pollent) {
2778 CallData* calld = static_cast<CallData*>(elem->call_data);
2779 calld->pollent_ = pollent;
2780 }
2781
2782 //
2783 // pending_batches management
2784 //
2785
GetBatchIndex(grpc_transport_stream_op_batch * batch)2786 size_t CallData::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
2787 // Note: It is important the send_initial_metadata be the first entry
2788 // here, since the code in pick_subchannel_locked() assumes it will be.
2789 if (batch->send_initial_metadata) return 0;
2790 if (batch->send_message) return 1;
2791 if (batch->send_trailing_metadata) return 2;
2792 if (batch->recv_initial_metadata) return 3;
2793 if (batch->recv_message) return 4;
2794 if (batch->recv_trailing_metadata) return 5;
2795 GPR_UNREACHABLE_CODE(return (size_t)-1);
2796 }
2797
2798 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)2799 void CallData::PendingBatchesAdd(grpc_call_element* elem,
2800 grpc_transport_stream_op_batch* batch) {
2801 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2802 const size_t idx = GetBatchIndex(batch);
2803 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2804 gpr_log(GPR_INFO,
2805 "chand=%p calld=%p: adding pending batch at index %" PRIuPTR, chand,
2806 this, idx);
2807 }
2808 grpc_transport_stream_op_batch*& pending = pending_batches_[idx];
2809 GPR_ASSERT(pending == nullptr);
2810 pending = batch;
2811 }
2812
2813 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)2814 void CallData::FailPendingBatchInCallCombiner(void* arg, grpc_error* error) {
2815 grpc_transport_stream_op_batch* batch =
2816 static_cast<grpc_transport_stream_op_batch*>(arg);
2817 CallData* calld = static_cast<CallData*>(batch->handler_private.extra_arg);
2818 // Note: This will release the call combiner.
2819 grpc_transport_stream_op_batch_finish_with_failure(
2820 batch, GRPC_ERROR_REF(error), calld->call_combiner_);
2821 }
2822
2823 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_call_element * elem,grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)2824 void CallData::PendingBatchesFail(
2825 grpc_call_element* elem, grpc_error* error,
2826 YieldCallCombinerPredicate yield_call_combiner_predicate) {
2827 GPR_ASSERT(error != GRPC_ERROR_NONE);
2828 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2829 size_t num_batches = 0;
2830 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2831 if (pending_batches_[i] != nullptr) ++num_batches;
2832 }
2833 gpr_log(GPR_INFO,
2834 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2835 elem->channel_data, this, num_batches, grpc_error_string(error));
2836 }
2837 CallCombinerClosureList closures;
2838 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2839 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2840 if (batch != nullptr) {
2841 batch->handler_private.extra_arg = this;
2842 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2843 FailPendingBatchInCallCombiner, batch,
2844 grpc_schedule_on_exec_ctx);
2845 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2846 "PendingBatchesFail");
2847 batch = nullptr;
2848 }
2849 }
2850 if (yield_call_combiner_predicate(closures)) {
2851 closures.RunClosures(call_combiner_);
2852 } else {
2853 closures.RunClosuresWithoutYielding(call_combiner_);
2854 }
2855 GRPC_ERROR_UNREF(error);
2856 }
2857
2858 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)2859 void CallData::ResumePendingBatchInCallCombiner(void* arg,
2860 grpc_error* /*ignored*/) {
2861 grpc_transport_stream_op_batch* batch =
2862 static_cast<grpc_transport_stream_op_batch*>(arg);
2863 auto* elem =
2864 static_cast<grpc_call_element*>(batch->handler_private.extra_arg);
2865 auto* calld = static_cast<CallData*>(elem->call_data);
2866 // Note: This will release the call combiner.
2867 calld->dynamic_call_->StartTransportStreamOpBatch(batch);
2868 }
2869
2870 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume(grpc_call_element * elem)2871 void CallData::PendingBatchesResume(grpc_call_element* elem) {
2872 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2873 // Retries not enabled; send down batches as-is.
2874 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
2875 size_t num_batches = 0;
2876 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2877 if (pending_batches_[i] != nullptr) ++num_batches;
2878 }
2879 gpr_log(GPR_INFO,
2880 "chand=%p calld=%p: starting %" PRIuPTR
2881 " pending batches on dynamic_call=%p",
2882 chand, this, num_batches, dynamic_call_.get());
2883 }
2884 CallCombinerClosureList closures;
2885 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2886 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
2887 if (batch != nullptr) {
2888 batch->handler_private.extra_arg = elem;
2889 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2890 ResumePendingBatchInCallCombiner, batch, nullptr);
2891 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
2892 "PendingBatchesResume");
2893 batch = nullptr;
2894 }
2895 }
2896 // Note: This will release the call combiner.
2897 closures.RunClosures(call_combiner_);
2898 }
2899
2900 //
2901 // name resolution
2902 //
2903
2904 // A class to handle the call combiner cancellation callback for a
2905 // queued pick.
2906 class CallData::ResolverQueuedCallCanceller {
2907 public:
ResolverQueuedCallCanceller(grpc_call_element * elem)2908 explicit ResolverQueuedCallCanceller(grpc_call_element* elem) : elem_(elem) {
2909 auto* calld = static_cast<CallData*>(elem->call_data);
2910 GRPC_CALL_STACK_REF(calld->owning_call_, "ResolverQueuedCallCanceller");
2911 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this,
2912 grpc_schedule_on_exec_ctx);
2913 calld->call_combiner_->SetNotifyOnCancel(&closure_);
2914 }
2915
2916 private:
CancelLocked(void * arg,grpc_error * error)2917 static void CancelLocked(void* arg, grpc_error* error) {
2918 auto* self = static_cast<ResolverQueuedCallCanceller*>(arg);
2919 auto* chand = static_cast<ChannelData*>(self->elem_->channel_data);
2920 auto* calld = static_cast<CallData*>(self->elem_->call_data);
2921 {
2922 MutexLock lock(chand->resolution_mu());
2923 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2924 gpr_log(GPR_INFO,
2925 "chand=%p calld=%p: cancelling resolver queued pick: "
2926 "error=%s self=%p calld->resolver_pick_canceller=%p",
2927 chand, calld, grpc_error_string(error), self,
2928 calld->resolver_call_canceller_);
2929 }
2930 if (calld->resolver_call_canceller_ == self && error != GRPC_ERROR_NONE) {
2931 // Remove pick from list of queued picks.
2932 calld->MaybeRemoveCallFromResolverQueuedCallsLocked(self->elem_);
2933 // Fail pending batches on the call.
2934 calld->PendingBatchesFail(self->elem_, GRPC_ERROR_REF(error),
2935 YieldCallCombinerIfPendingBatchesFound);
2936 }
2937 }
2938 GRPC_CALL_STACK_UNREF(calld->owning_call_, "ResolvingQueuedCallCanceller");
2939 delete self;
2940 }
2941
2942 grpc_call_element* elem_;
2943 grpc_closure closure_;
2944 };
2945
MaybeRemoveCallFromResolverQueuedCallsLocked(grpc_call_element * elem)2946 void CallData::MaybeRemoveCallFromResolverQueuedCallsLocked(
2947 grpc_call_element* elem) {
2948 if (!queued_pending_resolver_result_) return;
2949 auto* chand = static_cast<ChannelData*>(elem->channel_data);
2950 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2951 gpr_log(GPR_INFO,
2952 "chand=%p calld=%p: removing from resolver queued picks list",
2953 chand, this);
2954 }
2955 chand->RemoveResolverQueuedCall(&resolver_queued_call_, pollent_);
2956 queued_pending_resolver_result_ = false;
2957 // Lame the call combiner canceller.
2958 resolver_call_canceller_ = nullptr;
2959 }
2960
MaybeAddCallToResolverQueuedCallsLocked(grpc_call_element * elem)2961 void CallData::MaybeAddCallToResolverQueuedCallsLocked(
2962 grpc_call_element* elem) {
2963 if (queued_pending_resolver_result_) return;
2964 auto* chand = static_cast<ChannelData*>(elem->channel_data);
2965 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2966 gpr_log(GPR_INFO, "chand=%p calld=%p: adding to resolver queued picks list",
2967 chand, this);
2968 }
2969 queued_pending_resolver_result_ = true;
2970 resolver_queued_call_.elem = elem;
2971 chand->AddResolverQueuedCall(&resolver_queued_call_, pollent_);
2972 // Register call combiner cancellation callback.
2973 resolver_call_canceller_ = new ResolverQueuedCallCanceller(elem);
2974 }
2975
ApplyServiceConfigToCallLocked(grpc_call_element * elem,grpc_metadata_batch * initial_metadata)2976 grpc_error* CallData::ApplyServiceConfigToCallLocked(
2977 grpc_call_element* elem, grpc_metadata_batch* initial_metadata) {
2978 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
2979 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
2980 gpr_log(GPR_INFO, "chand=%p calld=%p: applying service config to call",
2981 chand, this);
2982 }
2983 ConfigSelector* config_selector = chand->config_selector();
2984 if (config_selector != nullptr) {
2985 // Use the ConfigSelector to determine the config for the call.
2986 ConfigSelector::CallConfig call_config =
2987 config_selector->GetCallConfig({&path_, initial_metadata, arena_});
2988 if (call_config.error != GRPC_ERROR_NONE) return call_config.error;
2989 on_call_committed_ = std::move(call_config.on_call_committed);
2990 // Create a ServiceConfigCallData for the call. This stores a ref to the
2991 // ServiceConfig and caches the right set of parsed configs to use for
2992 // the call. The MethodConfig will store itself in the call context,
2993 // so that it can be accessed by filters in the subchannel, and it
2994 // will be cleaned up when the call ends.
2995 auto* service_config_call_data = arena_->New<ServiceConfigCallData>(
2996 std::move(call_config.service_config), call_config.method_configs,
2997 std::move(call_config.call_attributes), call_context_);
2998 // Apply our own method params to the call.
2999 auto* method_params = static_cast<ClientChannelMethodParsedConfig*>(
3000 service_config_call_data->GetMethodParsedConfig(
3001 internal::ClientChannelServiceConfigParser::ParserIndex()));
3002 if (method_params != nullptr) {
3003 // If the deadline from the service config is shorter than the one
3004 // from the client API, reset the deadline timer.
3005 if (chand->deadline_checking_enabled() && method_params->timeout() != 0) {
3006 const grpc_millis per_method_deadline =
3007 grpc_cycle_counter_to_millis_round_up(call_start_time_) +
3008 method_params->timeout();
3009 if (per_method_deadline < deadline_) {
3010 deadline_ = per_method_deadline;
3011 grpc_deadline_state_reset(elem, deadline_);
3012 }
3013 }
3014 // If the service config set wait_for_ready and the application
3015 // did not explicitly set it, use the value from the service config.
3016 uint32_t* send_initial_metadata_flags =
3017 &pending_batches_[0]
3018 ->payload->send_initial_metadata.send_initial_metadata_flags;
3019 if (method_params->wait_for_ready().has_value() &&
3020 !(*send_initial_metadata_flags &
3021 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET)) {
3022 if (method_params->wait_for_ready().value()) {
3023 *send_initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3024 } else {
3025 *send_initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY;
3026 }
3027 }
3028 }
3029 // Set the dynamic filter stack.
3030 dynamic_filters_ = chand->dynamic_filters();
3031 }
3032 return GRPC_ERROR_NONE;
3033 }
3034
RecvInitialMetadataReadyForConfigSelectorCommitCallback(void * arg,grpc_error * error)3035 void CallData::RecvInitialMetadataReadyForConfigSelectorCommitCallback(
3036 void* arg, grpc_error* error) {
3037 auto* self = static_cast<CallData*>(arg);
3038 if (self->on_call_committed_ != nullptr) {
3039 self->on_call_committed_();
3040 self->on_call_committed_ = nullptr;
3041 }
3042 // Chain to original callback.
3043 Closure::Run(DEBUG_LOCATION, self->original_recv_initial_metadata_ready_,
3044 GRPC_ERROR_REF(error));
3045 }
3046
3047 // TODO(roth): Consider not intercepting this callback unless we
3048 // actually need to, if this causes a performance problem.
InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(grpc_transport_stream_op_batch * batch)3049 void CallData::InjectRecvInitialMetadataReadyForConfigSelectorCommitCallback(
3050 grpc_transport_stream_op_batch* batch) {
3051 original_recv_initial_metadata_ready_ =
3052 batch->payload->recv_initial_metadata.recv_initial_metadata_ready;
3053 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_,
3054 RecvInitialMetadataReadyForConfigSelectorCommitCallback,
3055 this, nullptr);
3056 batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3057 &recv_initial_metadata_ready_;
3058 }
3059
AsyncResolutionDone(grpc_call_element * elem,grpc_error * error)3060 void CallData::AsyncResolutionDone(grpc_call_element* elem, grpc_error* error) {
3061 GRPC_CLOSURE_INIT(&pick_closure_, ResolutionDone, elem, nullptr);
3062 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
3063 }
3064
ResolutionDone(void * arg,grpc_error * error)3065 void CallData::ResolutionDone(void* arg, grpc_error* error) {
3066 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3067 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3068 CallData* calld = static_cast<CallData*>(elem->call_data);
3069 if (error != GRPC_ERROR_NONE) {
3070 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3071 gpr_log(GPR_INFO,
3072 "chand=%p calld=%p: error applying config to call: error=%s",
3073 chand, calld, grpc_error_string(error));
3074 }
3075 calld->PendingBatchesFail(elem, GRPC_ERROR_REF(error), YieldCallCombiner);
3076 return;
3077 }
3078 calld->CreateDynamicCall(elem);
3079 }
3080
CheckResolution(void * arg,grpc_error * error)3081 void CallData::CheckResolution(void* arg, grpc_error* error) {
3082 grpc_call_element* elem = static_cast<grpc_call_element*>(arg);
3083 CallData* calld = static_cast<CallData*>(elem->call_data);
3084 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3085 bool resolution_complete;
3086 {
3087 MutexLock lock(chand->resolution_mu());
3088 resolution_complete = calld->CheckResolutionLocked(elem, &error);
3089 }
3090 if (resolution_complete) {
3091 ResolutionDone(elem, error);
3092 GRPC_ERROR_UNREF(error);
3093 }
3094 }
3095
CheckResolutionLocked(grpc_call_element * elem,grpc_error ** error)3096 bool CallData::CheckResolutionLocked(grpc_call_element* elem,
3097 grpc_error** error) {
3098 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
3099 // If we're still in IDLE, we need to start resolving.
3100 if (GPR_UNLIKELY(chand->CheckConnectivityState(false) == GRPC_CHANNEL_IDLE)) {
3101 // Bounce into the control plane work serializer to start resolving,
3102 // in case we are still in IDLE state. Since we are holding on to the
3103 // resolution mutex here, we offload it on the ExecCtx so that we don't
3104 // deadlock with ourselves.
3105 GRPC_CHANNEL_STACK_REF(chand->owning_stack(), "CheckResolutionLocked");
3106 ExecCtx::Run(
3107 DEBUG_LOCATION,
3108 GRPC_CLOSURE_CREATE(
3109 [](void* arg, grpc_error* /*error*/) {
3110 auto* chand = static_cast<ChannelData*>(arg);
3111 chand->work_serializer()->Run(
3112 [chand]() {
3113 chand->CheckConnectivityState(/*try_to_connect=*/true);
3114 GRPC_CHANNEL_STACK_UNREF(chand->owning_stack(),
3115 "CheckResolutionLocked");
3116 },
3117 DEBUG_LOCATION);
3118 },
3119 chand, nullptr),
3120 GRPC_ERROR_NONE);
3121 }
3122 // Get send_initial_metadata batch and flags.
3123 auto& send_initial_metadata =
3124 pending_batches_[0]->payload->send_initial_metadata;
3125 grpc_metadata_batch* initial_metadata_batch =
3126 send_initial_metadata.send_initial_metadata;
3127 const uint32_t send_initial_metadata_flags =
3128 send_initial_metadata.send_initial_metadata_flags;
3129 // If we don't yet have a resolver result, we need to queue the call
3130 // until we get one.
3131 if (GPR_UNLIKELY(!chand->received_service_config_data())) {
3132 // If the resolver returned transient failure before returning the
3133 // first service config, fail any non-wait_for_ready calls.
3134 grpc_error* resolver_error = chand->resolver_transient_failure_error();
3135 if (resolver_error != GRPC_ERROR_NONE &&
3136 (send_initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY) ==
3137 0) {
3138 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3139 *error = GRPC_ERROR_REF(resolver_error);
3140 return true;
3141 }
3142 // Either the resolver has not yet returned a result, or it has
3143 // returned transient failure but the call is wait_for_ready. In
3144 // either case, queue the call.
3145 MaybeAddCallToResolverQueuedCallsLocked(elem);
3146 return false;
3147 }
3148 // Apply service config to call if not yet applied.
3149 if (GPR_LIKELY(!service_config_applied_)) {
3150 service_config_applied_ = true;
3151 *error = ApplyServiceConfigToCallLocked(elem, initial_metadata_batch);
3152 }
3153 MaybeRemoveCallFromResolverQueuedCallsLocked(elem);
3154 return true;
3155 }
3156
CreateDynamicCall(grpc_call_element * elem)3157 void CallData::CreateDynamicCall(grpc_call_element* elem) {
3158 auto* chand = static_cast<ChannelData*>(elem->channel_data);
3159 DynamicFilters::Call::Args args = {std::move(dynamic_filters_),
3160 pollent_,
3161 path_,
3162 call_start_time_,
3163 deadline_,
3164 arena_,
3165 call_context_,
3166 call_combiner_};
3167 grpc_error* error = GRPC_ERROR_NONE;
3168 DynamicFilters* channel_stack = args.channel_stack.get();
3169 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3170 gpr_log(
3171 GPR_INFO,
3172 "chand=%p calld=%p: creating dynamic call stack on channel_stack=%p",
3173 chand, this, channel_stack);
3174 }
3175 dynamic_call_ = channel_stack->CreateCall(std::move(args), &error);
3176 if (error != GRPC_ERROR_NONE) {
3177 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
3178 gpr_log(GPR_INFO,
3179 "chand=%p calld=%p: failed to create dynamic call: error=%s",
3180 chand, this, grpc_error_string(error));
3181 }
3182 PendingBatchesFail(elem, error, YieldCallCombiner);
3183 return;
3184 }
3185 PendingBatchesResume(elem);
3186 }
3187
3188 //
3189 // RetryingCall implementation
3190 //
3191
3192 // Retry support:
3193 //
3194 // In order to support retries, we act as a proxy for stream op batches.
3195 // When we get a batch from the surface, we add it to our list of pending
3196 // batches, and we then use those batches to construct separate "child"
3197 // batches to be started on the subchannel call. When the child batches
3198 // return, we then decide which pending batches have been completed and
3199 // schedule their callbacks accordingly. If a subchannel call fails and
3200 // we want to retry it, we do a new pick and start again, constructing
3201 // new "child" batches for the new subchannel call.
3202 //
3203 // Note that retries are committed when receiving data from the server
3204 // (except for Trailers-Only responses). However, there may be many
3205 // send ops started before receiving any data, so we may have already
3206 // completed some number of send ops (and returned the completions up to
3207 // the surface) by the time we realize that we need to retry. To deal
3208 // with this, we cache data for send ops, so that we can replay them on a
3209 // different subchannel call even after we have completed the original
3210 // batches.
3211 //
3212 // There are two sets of data to maintain:
3213 // - In call_data (in the parent channel), we maintain a list of pending
3214 // ops and cached data for send ops.
3215 // - In the subchannel call, we maintain state to indicate what ops have
3216 // already been sent down to that call.
3217 //
3218 // When constructing the "child" batches, we compare those two sets of
3219 // data to see which batches need to be sent to the subchannel call.
3220
3221 // TODO(roth): In subsequent PRs:
3222 // - add support for transparent retries (including initial metadata)
3223 // - figure out how to record stats in census for retries
3224 // (census filter is on top of this one)
3225 // - add census stats for retries
3226
RetryingCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,const ClientChannelMethodParsedConfig::RetryPolicy * retry_policy)3227 RetryingCall::RetryingCall(
3228 ChannelData* chand, const grpc_call_element_args& args,
3229 grpc_polling_entity* pollent,
3230 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data,
3231 const ClientChannelMethodParsedConfig::RetryPolicy* retry_policy)
3232 : chand_(chand),
3233 pollent_(pollent),
3234 retry_throttle_data_(std::move(retry_throttle_data)),
3235 retry_policy_(retry_policy),
3236 retry_backoff_(
3237 BackOff::Options()
3238 .set_initial_backoff(
3239 retry_policy_ == nullptr ? 0 : retry_policy_->initial_backoff)
3240 .set_multiplier(retry_policy_ == nullptr
3241 ? 0
3242 : retry_policy_->backoff_multiplier)
3243 .set_jitter(RETRY_BACKOFF_JITTER)
3244 .set_max_backoff(
3245 retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff)),
3246 path_(grpc_slice_ref_internal(args.path)),
3247 call_start_time_(args.start_time),
3248 deadline_(args.deadline),
3249 arena_(args.arena),
3250 owning_call_(args.call_stack),
3251 call_combiner_(args.call_combiner),
3252 call_context_(args.context),
3253 pending_send_initial_metadata_(false),
3254 pending_send_message_(false),
3255 pending_send_trailing_metadata_(false),
3256 enable_retries_(true),
3257 retry_committed_(false),
3258 last_attempt_got_server_pushback_(false) {}
3259
~RetryingCall()3260 RetryingCall::~RetryingCall() {
3261 grpc_slice_unref_internal(path_);
3262 GRPC_ERROR_UNREF(cancel_error_);
3263 // Make sure there are no remaining pending batches.
3264 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3265 GPR_ASSERT(pending_batches_[i].batch == nullptr);
3266 }
3267 }
3268
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)3269 void RetryingCall::StartTransportStreamOpBatch(
3270 grpc_transport_stream_op_batch* batch) {
3271 // If we've previously been cancelled, immediately fail any new batches.
3272 if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
3273 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3274 gpr_log(GPR_INFO,
3275 "chand=%p retrying_call=%p: failing batch with error: %s", chand_,
3276 this, grpc_error_string(cancel_error_));
3277 }
3278 // Note: This will release the call combiner.
3279 grpc_transport_stream_op_batch_finish_with_failure(
3280 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3281 return;
3282 }
3283 // Handle cancellation.
3284 if (GPR_UNLIKELY(batch->cancel_stream)) {
3285 // Stash a copy of cancel_error in our call data, so that we can use
3286 // it for subsequent operations. This ensures that if the call is
3287 // cancelled before any batches are passed down (e.g., if the deadline
3288 // is in the past when the call starts), we can return the right
3289 // error to the caller when the first batch does get passed down.
3290 GRPC_ERROR_UNREF(cancel_error_);
3291 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
3292 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3293 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: recording cancel_error=%s",
3294 chand_, this, grpc_error_string(cancel_error_));
3295 }
3296 // If we do not have an LB call (i.e., a pick has not yet been started),
3297 // fail all pending batches. Otherwise, send the cancellation down to the
3298 // LB call.
3299 if (lb_call_ == nullptr) {
3300 // TODO(roth): If there is a pending retry callback, do we need to
3301 // cancel it here?
3302 PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
3303 // Note: This will release the call combiner.
3304 grpc_transport_stream_op_batch_finish_with_failure(
3305 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
3306 } else {
3307 // Note: This will release the call combiner.
3308 lb_call_->StartTransportStreamOpBatch(batch);
3309 }
3310 return;
3311 }
3312 // Add the batch to the pending list.
3313 PendingBatchesAdd(batch);
3314 // Create LB call if needed.
3315 // TODO(roth): If we get a new batch from the surface after the
3316 // initial retry attempt has failed, while the retry timer is pending,
3317 // we should queue the batch and not try to send it immediately.
3318 if (lb_call_ == nullptr) {
3319 // We do not yet have an LB call, so create one.
3320 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3321 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: creating LB call", chand_,
3322 this);
3323 }
3324 CreateLbCall(this, GRPC_ERROR_NONE);
3325 return;
3326 }
3327 // Send batches to LB call.
3328 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3329 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: starting batch on lb_call=%p",
3330 chand_, this, lb_call_.get());
3331 }
3332 PendingBatchesResume();
3333 }
3334
subchannel_call() const3335 RefCountedPtr<SubchannelCall> RetryingCall::subchannel_call() const {
3336 if (lb_call_ == nullptr) return nullptr;
3337 return lb_call_->subchannel_call();
3338 }
3339
3340 //
3341 // send op data caching
3342 //
3343
MaybeCacheSendOpsForBatch(PendingBatch * pending)3344 void RetryingCall::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
3345 if (pending->send_ops_cached) return;
3346 pending->send_ops_cached = true;
3347 grpc_transport_stream_op_batch* batch = pending->batch;
3348 // Save a copy of metadata for send_initial_metadata ops.
3349 if (batch->send_initial_metadata) {
3350 seen_send_initial_metadata_ = true;
3351 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
3352 grpc_metadata_batch* send_initial_metadata =
3353 batch->payload->send_initial_metadata.send_initial_metadata;
3354 send_initial_metadata_storage_ =
3355 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3356 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
3357 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
3358 send_initial_metadata_storage_);
3359 send_initial_metadata_flags_ =
3360 batch->payload->send_initial_metadata.send_initial_metadata_flags;
3361 peer_string_ = batch->payload->send_initial_metadata.peer_string;
3362 }
3363 // Set up cache for send_message ops.
3364 if (batch->send_message) {
3365 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
3366 std::move(batch->payload->send_message.send_message));
3367 send_messages_.push_back(cache);
3368 }
3369 // Save metadata batch for send_trailing_metadata ops.
3370 if (batch->send_trailing_metadata) {
3371 seen_send_trailing_metadata_ = true;
3372 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
3373 grpc_metadata_batch* send_trailing_metadata =
3374 batch->payload->send_trailing_metadata.send_trailing_metadata;
3375 send_trailing_metadata_storage_ =
3376 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
3377 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
3378 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
3379 send_trailing_metadata_storage_);
3380 }
3381 }
3382
FreeCachedSendInitialMetadata()3383 void RetryingCall::FreeCachedSendInitialMetadata() {
3384 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3385 gpr_log(GPR_INFO,
3386 "chand=%p retrying_call=%p: destroying send_initial_metadata",
3387 chand_, this);
3388 }
3389 grpc_metadata_batch_destroy(&send_initial_metadata_);
3390 }
3391
FreeCachedSendMessage(size_t idx)3392 void RetryingCall::FreeCachedSendMessage(size_t idx) {
3393 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3394 gpr_log(GPR_INFO,
3395 "chand=%p retrying_call=%p: destroying send_messages[%" PRIuPTR "]",
3396 chand_, this, idx);
3397 }
3398 send_messages_[idx]->Destroy();
3399 }
3400
FreeCachedSendTrailingMetadata()3401 void RetryingCall::FreeCachedSendTrailingMetadata() {
3402 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3403 gpr_log(GPR_INFO,
3404 "chand_=%p retrying_call=%p: destroying send_trailing_metadata",
3405 chand_, this);
3406 }
3407 grpc_metadata_batch_destroy(&send_trailing_metadata_);
3408 }
3409
FreeCachedSendOpDataAfterCommit(SubchannelCallRetryState * retry_state)3410 void RetryingCall::FreeCachedSendOpDataAfterCommit(
3411 SubchannelCallRetryState* retry_state) {
3412 if (retry_state->completed_send_initial_metadata) {
3413 FreeCachedSendInitialMetadata();
3414 }
3415 for (size_t i = 0; i < retry_state->completed_send_message_count; ++i) {
3416 FreeCachedSendMessage(i);
3417 }
3418 if (retry_state->completed_send_trailing_metadata) {
3419 FreeCachedSendTrailingMetadata();
3420 }
3421 }
3422
FreeCachedSendOpDataForCompletedBatch(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state)3423 void RetryingCall::FreeCachedSendOpDataForCompletedBatch(
3424 SubchannelCallBatchData* batch_data,
3425 SubchannelCallRetryState* retry_state) {
3426 if (batch_data->batch.send_initial_metadata) {
3427 FreeCachedSendInitialMetadata();
3428 }
3429 if (batch_data->batch.send_message) {
3430 FreeCachedSendMessage(retry_state->completed_send_message_count - 1);
3431 }
3432 if (batch_data->batch.send_trailing_metadata) {
3433 FreeCachedSendTrailingMetadata();
3434 }
3435 }
3436
3437 //
3438 // pending_batches management
3439 //
3440
GetBatchIndex(grpc_transport_stream_op_batch * batch)3441 size_t RetryingCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
3442 // Note: It is important the send_initial_metadata be the first entry
3443 // here, since the code in pick_subchannel_locked() assumes it will be.
3444 if (batch->send_initial_metadata) return 0;
3445 if (batch->send_message) return 1;
3446 if (batch->send_trailing_metadata) return 2;
3447 if (batch->recv_initial_metadata) return 3;
3448 if (batch->recv_message) return 4;
3449 if (batch->recv_trailing_metadata) return 5;
3450 GPR_UNREACHABLE_CODE(return (size_t)-1);
3451 }
3452
3453 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)3454 void RetryingCall::PendingBatchesAdd(grpc_transport_stream_op_batch* batch) {
3455 const size_t idx = GetBatchIndex(batch);
3456 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3457 gpr_log(
3458 GPR_INFO,
3459 "chand_=%p retrying_call=%p: adding pending batch at index %" PRIuPTR,
3460 chand_, this, idx);
3461 }
3462 PendingBatch* pending = &pending_batches_[idx];
3463 GPR_ASSERT(pending->batch == nullptr);
3464 pending->batch = batch;
3465 pending->send_ops_cached = false;
3466 if (enable_retries_) {
3467 // Update state in calld about pending batches.
3468 // Also check if the batch takes us over the retry buffer limit.
3469 // Note: We don't check the size of trailing metadata here, because
3470 // gRPC clients do not send trailing metadata.
3471 if (batch->send_initial_metadata) {
3472 pending_send_initial_metadata_ = true;
3473 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
3474 batch->payload->send_initial_metadata.send_initial_metadata);
3475 }
3476 if (batch->send_message) {
3477 pending_send_message_ = true;
3478 bytes_buffered_for_retry_ +=
3479 batch->payload->send_message.send_message->length();
3480 }
3481 if (batch->send_trailing_metadata) {
3482 pending_send_trailing_metadata_ = true;
3483 }
3484 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
3485 chand_->per_rpc_retry_buffer_size())) {
3486 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3487 gpr_log(GPR_INFO,
3488 "chand=%p retrying_call=%p: exceeded retry buffer size, "
3489 "committing",
3490 chand_, this);
3491 }
3492 SubchannelCallRetryState* retry_state =
3493 lb_call_ == nullptr ? nullptr
3494 : static_cast<SubchannelCallRetryState*>(
3495 lb_call_->GetParentData());
3496 RetryCommit(retry_state);
3497 // If we are not going to retry and have not yet started, pretend
3498 // retries are disabled so that we don't bother with retry overhead.
3499 if (num_attempts_completed_ == 0) {
3500 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3501 gpr_log(GPR_INFO,
3502 "chand=%p retrying_call=%p: disabling retries before first "
3503 "attempt",
3504 chand_, this);
3505 }
3506 // TODO(roth): Treat this as a commit?
3507 enable_retries_ = false;
3508 }
3509 }
3510 }
3511 }
3512
PendingBatchClear(PendingBatch * pending)3513 void RetryingCall::PendingBatchClear(PendingBatch* pending) {
3514 if (enable_retries_) {
3515 if (pending->batch->send_initial_metadata) {
3516 pending_send_initial_metadata_ = false;
3517 }
3518 if (pending->batch->send_message) {
3519 pending_send_message_ = false;
3520 }
3521 if (pending->batch->send_trailing_metadata) {
3522 pending_send_trailing_metadata_ = false;
3523 }
3524 }
3525 pending->batch = nullptr;
3526 }
3527
MaybeClearPendingBatch(PendingBatch * pending)3528 void RetryingCall::MaybeClearPendingBatch(PendingBatch* pending) {
3529 grpc_transport_stream_op_batch* batch = pending->batch;
3530 // We clear the pending batch if all of its callbacks have been
3531 // scheduled and reset to nullptr.
3532 if (batch->on_complete == nullptr &&
3533 (!batch->recv_initial_metadata ||
3534 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
3535 nullptr) &&
3536 (!batch->recv_message ||
3537 batch->payload->recv_message.recv_message_ready == nullptr) &&
3538 (!batch->recv_trailing_metadata ||
3539 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
3540 nullptr)) {
3541 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3542 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: clearing pending batch",
3543 chand_, this);
3544 }
3545 PendingBatchClear(pending);
3546 }
3547 }
3548
3549 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)3550 void RetryingCall::FailPendingBatchInCallCombiner(void* arg,
3551 grpc_error* error) {
3552 grpc_transport_stream_op_batch* batch =
3553 static_cast<grpc_transport_stream_op_batch*>(arg);
3554 RetryingCall* call =
3555 static_cast<RetryingCall*>(batch->handler_private.extra_arg);
3556 // Note: This will release the call combiner.
3557 grpc_transport_stream_op_batch_finish_with_failure(
3558 batch, GRPC_ERROR_REF(error), call->call_combiner_);
3559 }
3560
3561 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)3562 void RetryingCall::PendingBatchesFail(
3563 grpc_error* error,
3564 YieldCallCombinerPredicate yield_call_combiner_predicate) {
3565 GPR_ASSERT(error != GRPC_ERROR_NONE);
3566 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3567 size_t num_batches = 0;
3568 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3569 if (pending_batches_[i].batch != nullptr) ++num_batches;
3570 }
3571 gpr_log(GPR_INFO,
3572 "chand=%p retrying_call=%p: failing %" PRIuPTR
3573 " pending batches: %s",
3574 chand_, this, num_batches, grpc_error_string(error));
3575 }
3576 CallCombinerClosureList closures;
3577 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3578 PendingBatch* pending = &pending_batches_[i];
3579 grpc_transport_stream_op_batch* batch = pending->batch;
3580 if (batch != nullptr) {
3581 batch->handler_private.extra_arg = this;
3582 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3583 FailPendingBatchInCallCombiner, batch,
3584 grpc_schedule_on_exec_ctx);
3585 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
3586 "PendingBatchesFail");
3587 PendingBatchClear(pending);
3588 }
3589 }
3590 if (yield_call_combiner_predicate(closures)) {
3591 closures.RunClosures(call_combiner_);
3592 } else {
3593 closures.RunClosuresWithoutYielding(call_combiner_);
3594 }
3595 GRPC_ERROR_UNREF(error);
3596 }
3597
3598 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)3599 void RetryingCall::ResumePendingBatchInCallCombiner(void* arg,
3600 grpc_error* /*ignored*/) {
3601 grpc_transport_stream_op_batch* batch =
3602 static_cast<grpc_transport_stream_op_batch*>(arg);
3603 auto* lb_call =
3604 static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
3605 // Note: This will release the call combiner.
3606 lb_call->StartTransportStreamOpBatch(batch);
3607 }
3608
3609 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()3610 void RetryingCall::PendingBatchesResume() {
3611 if (enable_retries_) {
3612 StartRetriableSubchannelBatches(this, GRPC_ERROR_NONE);
3613 return;
3614 }
3615 // Retries not enabled; send down batches as-is.
3616 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3617 size_t num_batches = 0;
3618 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3619 if (pending_batches_[i].batch != nullptr) ++num_batches;
3620 }
3621 gpr_log(GPR_INFO,
3622 "chand=%p retrying_call=%p: starting %" PRIuPTR
3623 " pending batches on lb_call=%p",
3624 chand_, this, num_batches, lb_call_.get());
3625 }
3626 CallCombinerClosureList closures;
3627 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3628 PendingBatch* pending = &pending_batches_[i];
3629 grpc_transport_stream_op_batch* batch = pending->batch;
3630 if (batch != nullptr) {
3631 batch->handler_private.extra_arg = lb_call_.get();
3632 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
3633 ResumePendingBatchInCallCombiner, batch, nullptr);
3634 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
3635 "PendingBatchesResume");
3636 PendingBatchClear(pending);
3637 }
3638 }
3639 // Note: This will release the call combiner.
3640 closures.RunClosures(call_combiner_);
3641 }
3642
3643 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)3644 RetryingCall::PendingBatch* RetryingCall::PendingBatchFind(
3645 const char* log_message, Predicate predicate) {
3646 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
3647 PendingBatch* pending = &pending_batches_[i];
3648 grpc_transport_stream_op_batch* batch = pending->batch;
3649 if (batch != nullptr && predicate(batch)) {
3650 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3651 gpr_log(
3652 GPR_INFO,
3653 "chand=%p retrying_call=%p: %s pending batch at index %" PRIuPTR,
3654 chand_, this, log_message, i);
3655 }
3656 return pending;
3657 }
3658 }
3659 return nullptr;
3660 }
3661
3662 //
3663 // retry code
3664 //
3665
RetryCommit(SubchannelCallRetryState * retry_state)3666 void RetryingCall::RetryCommit(SubchannelCallRetryState* retry_state) {
3667 if (retry_committed_) return;
3668 retry_committed_ = true;
3669 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3670 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: committing retries", chand_,
3671 this);
3672 }
3673 if (retry_state != nullptr) {
3674 FreeCachedSendOpDataAfterCommit(retry_state);
3675 }
3676 }
3677
DoRetry(SubchannelCallRetryState * retry_state,grpc_millis server_pushback_ms)3678 void RetryingCall::DoRetry(SubchannelCallRetryState* retry_state,
3679 grpc_millis server_pushback_ms) {
3680 GPR_ASSERT(retry_policy_ != nullptr);
3681 // Reset LB call.
3682 lb_call_.reset();
3683 // Compute backoff delay.
3684 grpc_millis next_attempt_time;
3685 if (server_pushback_ms >= 0) {
3686 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
3687 last_attempt_got_server_pushback_ = true;
3688 } else {
3689 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
3690 last_attempt_got_server_pushback_ = false;
3691 }
3692 next_attempt_time = retry_backoff_.NextAttemptTime();
3693 }
3694 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3695 gpr_log(GPR_INFO,
3696 "chand=%p retrying_call=%p: retrying failed call in %" PRId64 " ms",
3697 chand_, this, next_attempt_time - ExecCtx::Get()->Now());
3698 }
3699 // Schedule retry after computed delay.
3700 GRPC_CLOSURE_INIT(&retry_closure_, CreateLbCall, this, nullptr);
3701 grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
3702 // Update bookkeeping.
3703 if (retry_state != nullptr) retry_state->retry_dispatched = true;
3704 }
3705
MaybeRetry(SubchannelCallBatchData * batch_data,grpc_status_code status,grpc_mdelem * server_pushback_md)3706 bool RetryingCall::MaybeRetry(SubchannelCallBatchData* batch_data,
3707 grpc_status_code status,
3708 grpc_mdelem* server_pushback_md) {
3709 // Get retry policy.
3710 if (retry_policy_ == nullptr) return false;
3711 // If we've already dispatched a retry from this call, return true.
3712 // This catches the case where the batch has multiple callbacks
3713 // (i.e., it includes either recv_message or recv_initial_metadata).
3714 SubchannelCallRetryState* retry_state = nullptr;
3715 if (batch_data != nullptr) {
3716 retry_state = static_cast<SubchannelCallRetryState*>(
3717 batch_data->lb_call->GetParentData());
3718 if (retry_state->retry_dispatched) {
3719 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3720 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retry already dispatched",
3721 chand_, this);
3722 }
3723 return true;
3724 }
3725 }
3726 // Check status.
3727 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
3728 if (retry_throttle_data_ != nullptr) {
3729 retry_throttle_data_->RecordSuccess();
3730 }
3731 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3732 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call succeeded", chand_,
3733 this);
3734 }
3735 return false;
3736 }
3737 // Status is not OK. Check whether the status is retryable.
3738 if (!retry_policy_->retryable_status_codes.Contains(status)) {
3739 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3740 gpr_log(
3741 GPR_INFO,
3742 "chand=%p retrying_call=%p: status %s not configured as retryable",
3743 chand_, this, grpc_status_code_to_string(status));
3744 }
3745 return false;
3746 }
3747 // Record the failure and check whether retries are throttled.
3748 // Note that it's important for this check to come after the status
3749 // code check above, since we should only record failures whose statuses
3750 // match the configured retryable status codes, so that we don't count
3751 // things like failures due to malformed requests (INVALID_ARGUMENT).
3752 // Conversely, it's important for this to come before the remaining
3753 // checks, so that we don't fail to record failures due to other factors.
3754 if (retry_throttle_data_ != nullptr &&
3755 !retry_throttle_data_->RecordFailure()) {
3756 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3757 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries throttled", chand_,
3758 this);
3759 }
3760 return false;
3761 }
3762 // Check whether the call is committed.
3763 if (retry_committed_) {
3764 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3765 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: retries already committed",
3766 chand_, this);
3767 }
3768 return false;
3769 }
3770 // Check whether we have retries remaining.
3771 ++num_attempts_completed_;
3772 if (num_attempts_completed_ >= retry_policy_->max_attempts) {
3773 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3774 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: exceeded %d retry attempts",
3775 chand_, this, retry_policy_->max_attempts);
3776 }
3777 return false;
3778 }
3779 // If the call was cancelled from the surface, don't retry.
3780 if (cancel_error_ != GRPC_ERROR_NONE) {
3781 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3782 gpr_log(GPR_INFO,
3783 "chand=%p retrying_call=%p: call cancelled from surface, not "
3784 "retrying",
3785 chand_, this);
3786 }
3787 return false;
3788 }
3789 // Check server push-back.
3790 grpc_millis server_pushback_ms = -1;
3791 if (server_pushback_md != nullptr) {
3792 // If the value is "-1" or any other unparseable string, we do not retry.
3793 uint32_t ms;
3794 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
3795 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3796 gpr_log(
3797 GPR_INFO,
3798 "chand=%p retrying_call=%p: not retrying due to server push-back",
3799 chand_, this);
3800 }
3801 return false;
3802 } else {
3803 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3804 gpr_log(GPR_INFO,
3805 "chand=%p retrying_call=%p: server push-back: retry in %u ms",
3806 chand_, this, ms);
3807 }
3808 server_pushback_ms = static_cast<grpc_millis>(ms);
3809 }
3810 }
3811 DoRetry(retry_state, server_pushback_ms);
3812 return true;
3813 }
3814
3815 //
3816 // RetryingCall::SubchannelCallBatchData
3817 //
3818
3819 RetryingCall::SubchannelCallBatchData*
Create(RetryingCall * call,int refcount,bool set_on_complete)3820 RetryingCall::SubchannelCallBatchData::Create(RetryingCall* call, int refcount,
3821 bool set_on_complete) {
3822 return call->arena_->New<SubchannelCallBatchData>(call, refcount,
3823 set_on_complete);
3824 }
3825
SubchannelCallBatchData(RetryingCall * call,int refcount,bool set_on_complete)3826 RetryingCall::SubchannelCallBatchData::SubchannelCallBatchData(
3827 RetryingCall* call, int refcount, bool set_on_complete)
3828 : call(call), lb_call(call->lb_call_) {
3829 SubchannelCallRetryState* retry_state =
3830 static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3831 batch.payload = &retry_state->batch_payload;
3832 gpr_ref_init(&refs, refcount);
3833 if (set_on_complete) {
3834 GRPC_CLOSURE_INIT(&on_complete, RetryingCall::OnComplete, this,
3835 grpc_schedule_on_exec_ctx);
3836 batch.on_complete = &on_complete;
3837 }
3838 GRPC_CALL_STACK_REF(call->owning_call_, "batch_data");
3839 }
3840
Destroy()3841 void RetryingCall::SubchannelCallBatchData::Destroy() {
3842 SubchannelCallRetryState* retry_state =
3843 static_cast<SubchannelCallRetryState*>(lb_call->GetParentData());
3844 if (batch.send_initial_metadata) {
3845 grpc_metadata_batch_destroy(&retry_state->send_initial_metadata);
3846 }
3847 if (batch.send_trailing_metadata) {
3848 grpc_metadata_batch_destroy(&retry_state->send_trailing_metadata);
3849 }
3850 if (batch.recv_initial_metadata) {
3851 grpc_metadata_batch_destroy(&retry_state->recv_initial_metadata);
3852 }
3853 if (batch.recv_trailing_metadata) {
3854 grpc_metadata_batch_destroy(&retry_state->recv_trailing_metadata);
3855 }
3856 lb_call.reset();
3857 GRPC_CALL_STACK_UNREF(call->owning_call_, "batch_data");
3858 }
3859
3860 //
3861 // recv_initial_metadata callback handling
3862 //
3863
InvokeRecvInitialMetadataCallback(void * arg,grpc_error * error)3864 void RetryingCall::InvokeRecvInitialMetadataCallback(void* arg,
3865 grpc_error* error) {
3866 SubchannelCallBatchData* batch_data =
3867 static_cast<SubchannelCallBatchData*>(arg);
3868 // Find pending batch.
3869 PendingBatch* pending = batch_data->call->PendingBatchFind(
3870 "invoking recv_initial_metadata_ready for",
3871 [](grpc_transport_stream_op_batch* batch) {
3872 return batch->recv_initial_metadata &&
3873 batch->payload->recv_initial_metadata
3874 .recv_initial_metadata_ready != nullptr;
3875 });
3876 GPR_ASSERT(pending != nullptr);
3877 // Return metadata.
3878 SubchannelCallRetryState* retry_state =
3879 static_cast<SubchannelCallRetryState*>(
3880 batch_data->lb_call->GetParentData());
3881 grpc_metadata_batch_move(
3882 &retry_state->recv_initial_metadata,
3883 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
3884 // Update bookkeeping.
3885 // Note: Need to do this before invoking the callback, since invoking
3886 // the callback will result in yielding the call combiner.
3887 grpc_closure* recv_initial_metadata_ready =
3888 pending->batch->payload->recv_initial_metadata
3889 .recv_initial_metadata_ready;
3890 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
3891 nullptr;
3892 batch_data->call->MaybeClearPendingBatch(pending);
3893 batch_data->Unref();
3894 // Invoke callback.
3895 Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
3896 GRPC_ERROR_REF(error));
3897 }
3898
RecvInitialMetadataReady(void * arg,grpc_error * error)3899 void RetryingCall::RecvInitialMetadataReady(void* arg, grpc_error* error) {
3900 SubchannelCallBatchData* batch_data =
3901 static_cast<SubchannelCallBatchData*>(arg);
3902 RetryingCall* call = batch_data->call;
3903 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3904 gpr_log(
3905 GPR_INFO,
3906 "chand=%p retrying_call=%p: got recv_initial_metadata_ready, error=%s",
3907 call->chand_, call, grpc_error_string(error));
3908 }
3909 SubchannelCallRetryState* retry_state =
3910 static_cast<SubchannelCallRetryState*>(
3911 batch_data->lb_call->GetParentData());
3912 retry_state->completed_recv_initial_metadata = true;
3913 // If a retry was already dispatched, then we're not going to use the
3914 // result of this recv_initial_metadata op, so do nothing.
3915 if (retry_state->retry_dispatched) {
3916 GRPC_CALL_COMBINER_STOP(
3917 call->call_combiner_,
3918 "recv_initial_metadata_ready after retry dispatched");
3919 return;
3920 }
3921 // If we got an error or a Trailers-Only response and have not yet gotten
3922 // the recv_trailing_metadata_ready callback, then defer propagating this
3923 // callback back to the surface. We can evaluate whether to retry when
3924 // recv_trailing_metadata comes back.
3925 if (GPR_UNLIKELY((retry_state->trailing_metadata_available ||
3926 error != GRPC_ERROR_NONE) &&
3927 !retry_state->completed_recv_trailing_metadata)) {
3928 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3929 gpr_log(
3930 GPR_INFO,
3931 "chand=%p retrying_call=%p: deferring recv_initial_metadata_ready "
3932 "(Trailers-Only)",
3933 call->chand_, call);
3934 }
3935 retry_state->recv_initial_metadata_ready_deferred_batch = batch_data;
3936 retry_state->recv_initial_metadata_error = GRPC_ERROR_REF(error);
3937 if (!retry_state->started_recv_trailing_metadata) {
3938 // recv_trailing_metadata not yet started by application; start it
3939 // ourselves to get status.
3940 call->StartInternalRecvTrailingMetadata();
3941 } else {
3942 GRPC_CALL_COMBINER_STOP(
3943 call->call_combiner_,
3944 "recv_initial_metadata_ready trailers-only or error");
3945 }
3946 return;
3947 }
3948 // Received valid initial metadata, so commit the call.
3949 call->RetryCommit(retry_state);
3950 // Invoke the callback to return the result to the surface.
3951 // Manually invoking a callback function; it does not take ownership of error.
3952 call->InvokeRecvInitialMetadataCallback(batch_data, error);
3953 }
3954
3955 //
3956 // recv_message callback handling
3957 //
3958
InvokeRecvMessageCallback(void * arg,grpc_error * error)3959 void RetryingCall::InvokeRecvMessageCallback(void* arg, grpc_error* error) {
3960 SubchannelCallBatchData* batch_data =
3961 static_cast<SubchannelCallBatchData*>(arg);
3962 RetryingCall* call = batch_data->call;
3963 // Find pending op.
3964 PendingBatch* pending = call->PendingBatchFind(
3965 "invoking recv_message_ready for",
3966 [](grpc_transport_stream_op_batch* batch) {
3967 return batch->recv_message &&
3968 batch->payload->recv_message.recv_message_ready != nullptr;
3969 });
3970 GPR_ASSERT(pending != nullptr);
3971 // Return payload.
3972 SubchannelCallRetryState* retry_state =
3973 static_cast<SubchannelCallRetryState*>(
3974 batch_data->lb_call->GetParentData());
3975 *pending->batch->payload->recv_message.recv_message =
3976 std::move(retry_state->recv_message);
3977 // Update bookkeeping.
3978 // Note: Need to do this before invoking the callback, since invoking
3979 // the callback will result in yielding the call combiner.
3980 grpc_closure* recv_message_ready =
3981 pending->batch->payload->recv_message.recv_message_ready;
3982 pending->batch->payload->recv_message.recv_message_ready = nullptr;
3983 call->MaybeClearPendingBatch(pending);
3984 batch_data->Unref();
3985 // Invoke callback.
3986 Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
3987 }
3988
RecvMessageReady(void * arg,grpc_error * error)3989 void RetryingCall::RecvMessageReady(void* arg, grpc_error* error) {
3990 SubchannelCallBatchData* batch_data =
3991 static_cast<SubchannelCallBatchData*>(arg);
3992 RetryingCall* call = batch_data->call;
3993 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
3994 gpr_log(GPR_INFO,
3995 "chand=%p retrying_call=%p: got recv_message_ready, error=%s",
3996 call->chand_, call, grpc_error_string(error));
3997 }
3998 SubchannelCallRetryState* retry_state =
3999 static_cast<SubchannelCallRetryState*>(
4000 batch_data->lb_call->GetParentData());
4001 ++retry_state->completed_recv_message_count;
4002 // If a retry was already dispatched, then we're not going to use the
4003 // result of this recv_message op, so do nothing.
4004 if (retry_state->retry_dispatched) {
4005 GRPC_CALL_COMBINER_STOP(call->call_combiner_,
4006 "recv_message_ready after retry dispatched");
4007 return;
4008 }
4009 // If we got an error or the payload was nullptr and we have not yet gotten
4010 // the recv_trailing_metadata_ready callback, then defer propagating this
4011 // callback back to the surface. We can evaluate whether to retry when
4012 // recv_trailing_metadata comes back.
4013 if (GPR_UNLIKELY(
4014 (retry_state->recv_message == nullptr || error != GRPC_ERROR_NONE) &&
4015 !retry_state->completed_recv_trailing_metadata)) {
4016 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4017 gpr_log(
4018 GPR_INFO,
4019 "chand=%p retrying_call=%p: deferring recv_message_ready (nullptr "
4020 "message and recv_trailing_metadata pending)",
4021 call->chand_, call);
4022 }
4023 retry_state->recv_message_ready_deferred_batch = batch_data;
4024 retry_state->recv_message_error = GRPC_ERROR_REF(error);
4025 if (!retry_state->started_recv_trailing_metadata) {
4026 // recv_trailing_metadata not yet started by application; start it
4027 // ourselves to get status.
4028 call->StartInternalRecvTrailingMetadata();
4029 } else {
4030 GRPC_CALL_COMBINER_STOP(call->call_combiner_, "recv_message_ready null");
4031 }
4032 return;
4033 }
4034 // Received a valid message, so commit the call.
4035 call->RetryCommit(retry_state);
4036 // Invoke the callback to return the result to the surface.
4037 // Manually invoking a callback function; it does not take ownership of error.
4038 call->InvokeRecvMessageCallback(batch_data, error);
4039 }
4040
4041 //
4042 // recv_trailing_metadata handling
4043 //
4044
GetCallStatus(grpc_metadata_batch * md_batch,grpc_error * error,grpc_status_code * status,grpc_mdelem ** server_pushback_md)4045 void RetryingCall::GetCallStatus(grpc_metadata_batch* md_batch,
4046 grpc_error* error, grpc_status_code* status,
4047 grpc_mdelem** server_pushback_md) {
4048 if (error != GRPC_ERROR_NONE) {
4049 grpc_error_get_status(error, deadline_, status, nullptr, nullptr, nullptr);
4050 } else {
4051 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
4052 *status =
4053 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
4054 if (server_pushback_md != nullptr &&
4055 md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
4056 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
4057 }
4058 }
4059 GRPC_ERROR_UNREF(error);
4060 }
4061
AddClosureForRecvTrailingMetadataReady(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4062 void RetryingCall::AddClosureForRecvTrailingMetadataReady(
4063 SubchannelCallBatchData* batch_data, grpc_error* error,
4064 CallCombinerClosureList* closures) {
4065 // Find pending batch.
4066 PendingBatch* pending = PendingBatchFind(
4067 "invoking recv_trailing_metadata for",
4068 [](grpc_transport_stream_op_batch* batch) {
4069 return batch->recv_trailing_metadata &&
4070 batch->payload->recv_trailing_metadata
4071 .recv_trailing_metadata_ready != nullptr;
4072 });
4073 // If we generated the recv_trailing_metadata op internally via
4074 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
4075 if (pending == nullptr) {
4076 GRPC_ERROR_UNREF(error);
4077 return;
4078 }
4079 // Return metadata.
4080 SubchannelCallRetryState* retry_state =
4081 static_cast<SubchannelCallRetryState*>(
4082 batch_data->lb_call->GetParentData());
4083 grpc_metadata_batch_move(
4084 &retry_state->recv_trailing_metadata,
4085 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
4086 // Add closure.
4087 closures->Add(pending->batch->payload->recv_trailing_metadata
4088 .recv_trailing_metadata_ready,
4089 error, "recv_trailing_metadata_ready for pending batch");
4090 // Update bookkeeping.
4091 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
4092 nullptr;
4093 MaybeClearPendingBatch(pending);
4094 }
4095
AddClosuresForDeferredRecvCallbacks(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4096 void RetryingCall::AddClosuresForDeferredRecvCallbacks(
4097 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4098 CallCombinerClosureList* closures) {
4099 if (batch_data->batch.recv_trailing_metadata) {
4100 // Add closure for deferred recv_initial_metadata_ready.
4101 if (GPR_UNLIKELY(retry_state->recv_initial_metadata_ready_deferred_batch !=
4102 nullptr)) {
4103 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4104 InvokeRecvInitialMetadataCallback,
4105 retry_state->recv_initial_metadata_ready_deferred_batch,
4106 grpc_schedule_on_exec_ctx);
4107 closures->Add(&retry_state->recv_initial_metadata_ready,
4108 retry_state->recv_initial_metadata_error,
4109 "resuming recv_initial_metadata_ready");
4110 retry_state->recv_initial_metadata_ready_deferred_batch = nullptr;
4111 }
4112 // Add closure for deferred recv_message_ready.
4113 if (GPR_UNLIKELY(retry_state->recv_message_ready_deferred_batch !=
4114 nullptr)) {
4115 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready,
4116 InvokeRecvMessageCallback,
4117 retry_state->recv_message_ready_deferred_batch,
4118 grpc_schedule_on_exec_ctx);
4119 closures->Add(&retry_state->recv_message_ready,
4120 retry_state->recv_message_error,
4121 "resuming recv_message_ready");
4122 retry_state->recv_message_ready_deferred_batch = nullptr;
4123 }
4124 }
4125 }
4126
PendingBatchIsUnstarted(PendingBatch * pending,SubchannelCallRetryState * retry_state)4127 bool RetryingCall::PendingBatchIsUnstarted(
4128 PendingBatch* pending, SubchannelCallRetryState* retry_state) {
4129 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
4130 return false;
4131 }
4132 if (pending->batch->send_initial_metadata &&
4133 !retry_state->started_send_initial_metadata) {
4134 return true;
4135 }
4136 if (pending->batch->send_message &&
4137 retry_state->started_send_message_count < send_messages_.size()) {
4138 return true;
4139 }
4140 if (pending->batch->send_trailing_metadata &&
4141 !retry_state->started_send_trailing_metadata) {
4142 return true;
4143 }
4144 return false;
4145 }
4146
AddClosuresToFailUnstartedPendingBatches(SubchannelCallRetryState * retry_state,grpc_error * error,CallCombinerClosureList * closures)4147 void RetryingCall::AddClosuresToFailUnstartedPendingBatches(
4148 SubchannelCallRetryState* retry_state, grpc_error* error,
4149 CallCombinerClosureList* closures) {
4150 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4151 PendingBatch* pending = &pending_batches_[i];
4152 if (PendingBatchIsUnstarted(pending, retry_state)) {
4153 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4154 gpr_log(GPR_INFO,
4155 "chand=%p retrying_call=%p: failing unstarted pending batch at "
4156 "index "
4157 "%" PRIuPTR,
4158 chand_, this, i);
4159 }
4160 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
4161 "failing on_complete for pending batch");
4162 pending->batch->on_complete = nullptr;
4163 MaybeClearPendingBatch(pending);
4164 }
4165 }
4166 GRPC_ERROR_UNREF(error);
4167 }
4168
RunClosuresForCompletedCall(SubchannelCallBatchData * batch_data,grpc_error * error)4169 void RetryingCall::RunClosuresForCompletedCall(
4170 SubchannelCallBatchData* batch_data, grpc_error* error) {
4171 SubchannelCallRetryState* retry_state =
4172 static_cast<SubchannelCallRetryState*>(
4173 batch_data->lb_call->GetParentData());
4174 // Construct list of closures to execute.
4175 CallCombinerClosureList closures;
4176 // First, add closure for recv_trailing_metadata_ready.
4177 AddClosureForRecvTrailingMetadataReady(batch_data, GRPC_ERROR_REF(error),
4178 &closures);
4179 // If there are deferred recv_initial_metadata_ready or recv_message_ready
4180 // callbacks, add them to closures.
4181 AddClosuresForDeferredRecvCallbacks(batch_data, retry_state, &closures);
4182 // Add closures to fail any pending batches that have not yet been started.
4183 AddClosuresToFailUnstartedPendingBatches(retry_state, GRPC_ERROR_REF(error),
4184 &closures);
4185 // Don't need batch_data anymore.
4186 batch_data->Unref();
4187 // Schedule all of the closures identified above.
4188 // Note: This will release the call combiner.
4189 closures.RunClosures(call_combiner_);
4190 GRPC_ERROR_UNREF(error);
4191 }
4192
RecvTrailingMetadataReady(void * arg,grpc_error * error)4193 void RetryingCall::RecvTrailingMetadataReady(void* arg, grpc_error* error) {
4194 SubchannelCallBatchData* batch_data =
4195 static_cast<SubchannelCallBatchData*>(arg);
4196 RetryingCall* call = batch_data->call;
4197 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4198 gpr_log(
4199 GPR_INFO,
4200 "chand=%p retrying_call=%p: got recv_trailing_metadata_ready, error=%s",
4201 call->chand_, call, grpc_error_string(error));
4202 }
4203 SubchannelCallRetryState* retry_state =
4204 static_cast<SubchannelCallRetryState*>(
4205 batch_data->lb_call->GetParentData());
4206 retry_state->completed_recv_trailing_metadata = true;
4207 // Get the call's status and check for server pushback metadata.
4208 grpc_status_code status = GRPC_STATUS_OK;
4209 grpc_mdelem* server_pushback_md = nullptr;
4210 grpc_metadata_batch* md_batch =
4211 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata;
4212 call->GetCallStatus(md_batch, GRPC_ERROR_REF(error), &status,
4213 &server_pushback_md);
4214 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4215 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: call finished, status=%s",
4216 call->chand_, call, grpc_status_code_to_string(status));
4217 }
4218 // Check if we should retry.
4219 if (call->MaybeRetry(batch_data, status, server_pushback_md)) {
4220 // Unref batch_data for deferred recv_initial_metadata_ready or
4221 // recv_message_ready callbacks, if any.
4222 if (retry_state->recv_initial_metadata_ready_deferred_batch != nullptr) {
4223 batch_data->Unref();
4224 GRPC_ERROR_UNREF(retry_state->recv_initial_metadata_error);
4225 }
4226 if (retry_state->recv_message_ready_deferred_batch != nullptr) {
4227 batch_data->Unref();
4228 GRPC_ERROR_UNREF(retry_state->recv_message_error);
4229 }
4230 batch_data->Unref();
4231 return;
4232 }
4233 // Not retrying, so commit the call.
4234 call->RetryCommit(retry_state);
4235 // Run any necessary closures.
4236 call->RunClosuresForCompletedCall(batch_data, GRPC_ERROR_REF(error));
4237 }
4238
4239 //
4240 // on_complete callback handling
4241 //
4242
AddClosuresForCompletedPendingBatch(SubchannelCallBatchData * batch_data,grpc_error * error,CallCombinerClosureList * closures)4243 void RetryingCall::AddClosuresForCompletedPendingBatch(
4244 SubchannelCallBatchData* batch_data, grpc_error* error,
4245 CallCombinerClosureList* closures) {
4246 PendingBatch* pending = PendingBatchFind(
4247 "completed", [batch_data](grpc_transport_stream_op_batch* batch) {
4248 // Match the pending batch with the same set of send ops as the
4249 // subchannel batch we've just completed.
4250 return batch->on_complete != nullptr &&
4251 batch_data->batch.send_initial_metadata ==
4252 batch->send_initial_metadata &&
4253 batch_data->batch.send_message == batch->send_message &&
4254 batch_data->batch.send_trailing_metadata ==
4255 batch->send_trailing_metadata;
4256 });
4257 // If batch_data is a replay batch, then there will be no pending
4258 // batch to complete.
4259 if (pending == nullptr) {
4260 GRPC_ERROR_UNREF(error);
4261 return;
4262 }
4263 // Add closure.
4264 closures->Add(pending->batch->on_complete, error,
4265 "on_complete for pending batch");
4266 pending->batch->on_complete = nullptr;
4267 MaybeClearPendingBatch(pending);
4268 }
4269
AddClosuresForReplayOrPendingSendOps(SubchannelCallBatchData * batch_data,SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4270 void RetryingCall::AddClosuresForReplayOrPendingSendOps(
4271 SubchannelCallBatchData* batch_data, SubchannelCallRetryState* retry_state,
4272 CallCombinerClosureList* closures) {
4273 bool have_pending_send_message_ops =
4274 retry_state->started_send_message_count < send_messages_.size();
4275 bool have_pending_send_trailing_metadata_op =
4276 seen_send_trailing_metadata_ &&
4277 !retry_state->started_send_trailing_metadata;
4278 if (!have_pending_send_message_ops &&
4279 !have_pending_send_trailing_metadata_op) {
4280 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4281 PendingBatch* pending = &pending_batches_[i];
4282 grpc_transport_stream_op_batch* batch = pending->batch;
4283 if (batch == nullptr || pending->send_ops_cached) continue;
4284 if (batch->send_message) have_pending_send_message_ops = true;
4285 if (batch->send_trailing_metadata) {
4286 have_pending_send_trailing_metadata_op = true;
4287 }
4288 }
4289 }
4290 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
4291 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4292 gpr_log(GPR_INFO,
4293 "chand=%p retrying_call=%p: starting next batch for pending send "
4294 "op(s)",
4295 chand_, this);
4296 }
4297 GRPC_CLOSURE_INIT(&batch_data->batch.handler_private.closure,
4298 StartRetriableSubchannelBatches, this,
4299 grpc_schedule_on_exec_ctx);
4300 closures->Add(&batch_data->batch.handler_private.closure, GRPC_ERROR_NONE,
4301 "starting next batch for send_* op(s)");
4302 }
4303 }
4304
OnComplete(void * arg,grpc_error * error)4305 void RetryingCall::OnComplete(void* arg, grpc_error* error) {
4306 SubchannelCallBatchData* batch_data =
4307 static_cast<SubchannelCallBatchData*>(arg);
4308 RetryingCall* call = batch_data->call;
4309 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4310 gpr_log(GPR_INFO,
4311 "chand=%p retrying_call=%p: got on_complete, error=%s, batch=%s",
4312 call->chand_, call, grpc_error_string(error),
4313 grpc_transport_stream_op_batch_string(&batch_data->batch).c_str());
4314 }
4315 SubchannelCallRetryState* retry_state =
4316 static_cast<SubchannelCallRetryState*>(
4317 batch_data->lb_call->GetParentData());
4318 // Update bookkeeping in retry_state.
4319 if (batch_data->batch.send_initial_metadata) {
4320 retry_state->completed_send_initial_metadata = true;
4321 }
4322 if (batch_data->batch.send_message) {
4323 ++retry_state->completed_send_message_count;
4324 }
4325 if (batch_data->batch.send_trailing_metadata) {
4326 retry_state->completed_send_trailing_metadata = true;
4327 }
4328 // If the call is committed, free cached data for send ops that we've just
4329 // completed.
4330 if (call->retry_committed_) {
4331 call->FreeCachedSendOpDataForCompletedBatch(batch_data, retry_state);
4332 }
4333 // Construct list of closures to execute.
4334 CallCombinerClosureList closures;
4335 // If a retry was already dispatched, that means we saw
4336 // recv_trailing_metadata before this, so we do nothing here.
4337 // Otherwise, invoke the callback to return the result to the surface.
4338 if (!retry_state->retry_dispatched) {
4339 // Add closure for the completed pending batch, if any.
4340 call->AddClosuresForCompletedPendingBatch(batch_data, GRPC_ERROR_REF(error),
4341 &closures);
4342 // If needed, add a callback to start any replay or pending send ops on
4343 // the subchannel call.
4344 if (!retry_state->completed_recv_trailing_metadata) {
4345 call->AddClosuresForReplayOrPendingSendOps(batch_data, retry_state,
4346 &closures);
4347 }
4348 }
4349 // Track number of pending subchannel send batches and determine if this
4350 // was the last one.
4351 --call->num_pending_retriable_subchannel_send_batches_;
4352 const bool last_send_batch_complete =
4353 call->num_pending_retriable_subchannel_send_batches_ == 0;
4354 // Don't need batch_data anymore.
4355 batch_data->Unref();
4356 // Schedule all of the closures identified above.
4357 // Note: This yeilds the call combiner.
4358 closures.RunClosures(call->call_combiner_);
4359 // If this was the last subchannel send batch, unref the call stack.
4360 if (last_send_batch_complete) {
4361 GRPC_CALL_STACK_UNREF(call->owning_call_, "subchannel_send_batches");
4362 }
4363 }
4364
4365 //
4366 // subchannel batch construction
4367 //
4368
StartBatchInCallCombiner(void * arg,grpc_error *)4369 void RetryingCall::StartBatchInCallCombiner(void* arg,
4370 grpc_error* /*ignored*/) {
4371 grpc_transport_stream_op_batch* batch =
4372 static_cast<grpc_transport_stream_op_batch*>(arg);
4373 auto* lb_call =
4374 static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
4375 // Note: This will release the call combiner.
4376 lb_call->StartTransportStreamOpBatch(batch);
4377 }
4378
AddClosureForSubchannelBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)4379 void RetryingCall::AddClosureForSubchannelBatch(
4380 grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
4381 batch->handler_private.extra_arg = lb_call_.get();
4382 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
4383 batch, grpc_schedule_on_exec_ctx);
4384 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4385 gpr_log(GPR_INFO,
4386 "chand=%p retrying_call=%p: starting subchannel batch: %s", chand_,
4387 this, grpc_transport_stream_op_batch_string(batch).c_str());
4388 }
4389 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
4390 "start_subchannel_batch");
4391 }
4392
AddRetriableSendInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4393 void RetryingCall::AddRetriableSendInitialMetadataOp(
4394 SubchannelCallRetryState* retry_state,
4395 SubchannelCallBatchData* batch_data) {
4396 // Maps the number of retries to the corresponding metadata value slice.
4397 const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
4398 &GRPC_MDSTR_3, &GRPC_MDSTR_4};
4399 // We need to make a copy of the metadata batch for each attempt, since
4400 // the filters in the subchannel stack may modify this batch, and we don't
4401 // want those modifications to be passed forward to subsequent attempts.
4402 //
4403 // If we've already completed one or more attempts, add the
4404 // grpc-retry-attempts header.
4405 retry_state->send_initial_metadata_storage =
4406 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4407 sizeof(grpc_linked_mdelem) *
4408 (send_initial_metadata_.list.count + (num_attempts_completed_ > 0))));
4409 grpc_metadata_batch_copy(&send_initial_metadata_,
4410 &retry_state->send_initial_metadata,
4411 retry_state->send_initial_metadata_storage);
4412 if (GPR_UNLIKELY(retry_state->send_initial_metadata.idx.named
4413 .grpc_previous_rpc_attempts != nullptr)) {
4414 grpc_metadata_batch_remove(&retry_state->send_initial_metadata,
4415 GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4416 }
4417 if (GPR_UNLIKELY(num_attempts_completed_ > 0)) {
4418 grpc_mdelem retry_md = grpc_mdelem_create(
4419 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
4420 *retry_count_strings[num_attempts_completed_ - 1], nullptr);
4421 grpc_error* error = grpc_metadata_batch_add_tail(
4422 &retry_state->send_initial_metadata,
4423 &retry_state
4424 ->send_initial_metadata_storage[send_initial_metadata_.list.count],
4425 retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
4426 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
4427 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
4428 grpc_error_string(error));
4429 GPR_ASSERT(false);
4430 }
4431 }
4432 retry_state->started_send_initial_metadata = true;
4433 batch_data->batch.send_initial_metadata = true;
4434 batch_data->batch.payload->send_initial_metadata.send_initial_metadata =
4435 &retry_state->send_initial_metadata;
4436 batch_data->batch.payload->send_initial_metadata.send_initial_metadata_flags =
4437 send_initial_metadata_flags_;
4438 batch_data->batch.payload->send_initial_metadata.peer_string = peer_string_;
4439 }
4440
AddRetriableSendMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4441 void RetryingCall::AddRetriableSendMessageOp(
4442 SubchannelCallRetryState* retry_state,
4443 SubchannelCallBatchData* batch_data) {
4444 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4445 gpr_log(GPR_INFO,
4446 "chand=%p retrying_call=%p: starting calld->send_messages[%" PRIuPTR
4447 "]",
4448 chand_, this, retry_state->started_send_message_count);
4449 }
4450 ByteStreamCache* cache =
4451 send_messages_[retry_state->started_send_message_count];
4452 ++retry_state->started_send_message_count;
4453 retry_state->send_message.Init(cache);
4454 batch_data->batch.send_message = true;
4455 batch_data->batch.payload->send_message.send_message.reset(
4456 retry_state->send_message.get());
4457 }
4458
AddRetriableSendTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4459 void RetryingCall::AddRetriableSendTrailingMetadataOp(
4460 SubchannelCallRetryState* retry_state,
4461 SubchannelCallBatchData* batch_data) {
4462 // We need to make a copy of the metadata batch for each attempt, since
4463 // the filters in the subchannel stack may modify this batch, and we don't
4464 // want those modifications to be passed forward to subsequent attempts.
4465 retry_state->send_trailing_metadata_storage =
4466 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
4467 sizeof(grpc_linked_mdelem) * send_trailing_metadata_.list.count));
4468 grpc_metadata_batch_copy(&send_trailing_metadata_,
4469 &retry_state->send_trailing_metadata,
4470 retry_state->send_trailing_metadata_storage);
4471 retry_state->started_send_trailing_metadata = true;
4472 batch_data->batch.send_trailing_metadata = true;
4473 batch_data->batch.payload->send_trailing_metadata.send_trailing_metadata =
4474 &retry_state->send_trailing_metadata;
4475 }
4476
AddRetriableRecvInitialMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4477 void RetryingCall::AddRetriableRecvInitialMetadataOp(
4478 SubchannelCallRetryState* retry_state,
4479 SubchannelCallBatchData* batch_data) {
4480 retry_state->started_recv_initial_metadata = true;
4481 batch_data->batch.recv_initial_metadata = true;
4482 grpc_metadata_batch_init(&retry_state->recv_initial_metadata);
4483 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata =
4484 &retry_state->recv_initial_metadata;
4485 batch_data->batch.payload->recv_initial_metadata.trailing_metadata_available =
4486 &retry_state->trailing_metadata_available;
4487 GRPC_CLOSURE_INIT(&retry_state->recv_initial_metadata_ready,
4488 RecvInitialMetadataReady, batch_data,
4489 grpc_schedule_on_exec_ctx);
4490 batch_data->batch.payload->recv_initial_metadata.recv_initial_metadata_ready =
4491 &retry_state->recv_initial_metadata_ready;
4492 }
4493
AddRetriableRecvMessageOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4494 void RetryingCall::AddRetriableRecvMessageOp(
4495 SubchannelCallRetryState* retry_state,
4496 SubchannelCallBatchData* batch_data) {
4497 ++retry_state->started_recv_message_count;
4498 batch_data->batch.recv_message = true;
4499 batch_data->batch.payload->recv_message.recv_message =
4500 &retry_state->recv_message;
4501 GRPC_CLOSURE_INIT(&retry_state->recv_message_ready, RecvMessageReady,
4502 batch_data, grpc_schedule_on_exec_ctx);
4503 batch_data->batch.payload->recv_message.recv_message_ready =
4504 &retry_state->recv_message_ready;
4505 }
4506
AddRetriableRecvTrailingMetadataOp(SubchannelCallRetryState * retry_state,SubchannelCallBatchData * batch_data)4507 void RetryingCall::AddRetriableRecvTrailingMetadataOp(
4508 SubchannelCallRetryState* retry_state,
4509 SubchannelCallBatchData* batch_data) {
4510 retry_state->started_recv_trailing_metadata = true;
4511 batch_data->batch.recv_trailing_metadata = true;
4512 grpc_metadata_batch_init(&retry_state->recv_trailing_metadata);
4513 batch_data->batch.payload->recv_trailing_metadata.recv_trailing_metadata =
4514 &retry_state->recv_trailing_metadata;
4515 batch_data->batch.payload->recv_trailing_metadata.collect_stats =
4516 &retry_state->collect_stats;
4517 GRPC_CLOSURE_INIT(&retry_state->recv_trailing_metadata_ready,
4518 RecvTrailingMetadataReady, batch_data,
4519 grpc_schedule_on_exec_ctx);
4520 batch_data->batch.payload->recv_trailing_metadata
4521 .recv_trailing_metadata_ready =
4522 &retry_state->recv_trailing_metadata_ready;
4523 }
4524
StartInternalRecvTrailingMetadata()4525 void RetryingCall::StartInternalRecvTrailingMetadata() {
4526 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4527 gpr_log(
4528 GPR_INFO,
4529 "chand=%p retrying_call=%p: call failed but recv_trailing_metadata not "
4530 "started; starting it internally",
4531 chand_, this);
4532 }
4533 SubchannelCallRetryState* retry_state =
4534 static_cast<SubchannelCallRetryState*>(lb_call_->GetParentData());
4535 // Create batch_data with 2 refs, since this batch will be unreffed twice:
4536 // once for the recv_trailing_metadata_ready callback when the subchannel
4537 // batch returns, and again when we actually get a recv_trailing_metadata
4538 // op from the surface.
4539 SubchannelCallBatchData* batch_data =
4540 SubchannelCallBatchData::Create(this, 2, false /* set_on_complete */);
4541 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4542 retry_state->recv_trailing_metadata_internal_batch = batch_data;
4543 // Note: This will release the call combiner.
4544 lb_call_->StartTransportStreamOpBatch(&batch_data->batch);
4545 }
4546
4547 // If there are any cached send ops that need to be replayed on the
4548 // current subchannel call, creates and returns a new subchannel batch
4549 // to replay those ops. Otherwise, returns nullptr.
4550 RetryingCall::SubchannelCallBatchData*
MaybeCreateSubchannelBatchForReplay(SubchannelCallRetryState * retry_state)4551 RetryingCall::MaybeCreateSubchannelBatchForReplay(
4552 SubchannelCallRetryState* retry_state) {
4553 SubchannelCallBatchData* replay_batch_data = nullptr;
4554 // send_initial_metadata.
4555 if (seen_send_initial_metadata_ &&
4556 !retry_state->started_send_initial_metadata &&
4557 !pending_send_initial_metadata_) {
4558 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4559 gpr_log(GPR_INFO,
4560 "chand=%p retrying_call=%p: replaying previously completed "
4561 "send_initial_metadata op",
4562 chand_, this);
4563 }
4564 replay_batch_data =
4565 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4566 AddRetriableSendInitialMetadataOp(retry_state, replay_batch_data);
4567 }
4568 // send_message.
4569 // Note that we can only have one send_message op in flight at a time.
4570 if (retry_state->started_send_message_count < send_messages_.size() &&
4571 retry_state->started_send_message_count ==
4572 retry_state->completed_send_message_count &&
4573 !pending_send_message_) {
4574 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4575 gpr_log(GPR_INFO,
4576 "chand=%p retrying_call=%p: replaying previously completed "
4577 "send_message op",
4578 chand_, this);
4579 }
4580 if (replay_batch_data == nullptr) {
4581 replay_batch_data =
4582 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4583 }
4584 AddRetriableSendMessageOp(retry_state, replay_batch_data);
4585 }
4586 // send_trailing_metadata.
4587 // Note that we only add this op if we have no more send_message ops
4588 // to start, since we can't send down any more send_message ops after
4589 // send_trailing_metadata.
4590 if (seen_send_trailing_metadata_ &&
4591 retry_state->started_send_message_count == send_messages_.size() &&
4592 !retry_state->started_send_trailing_metadata &&
4593 !pending_send_trailing_metadata_) {
4594 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4595 gpr_log(GPR_INFO,
4596 "chand=%p retrying_call=%p: replaying previously completed "
4597 "send_trailing_metadata op",
4598 chand_, this);
4599 }
4600 if (replay_batch_data == nullptr) {
4601 replay_batch_data =
4602 SubchannelCallBatchData::Create(this, 1, true /* set_on_complete */);
4603 }
4604 AddRetriableSendTrailingMetadataOp(retry_state, replay_batch_data);
4605 }
4606 return replay_batch_data;
4607 }
4608
AddSubchannelBatchesForPendingBatches(SubchannelCallRetryState * retry_state,CallCombinerClosureList * closures)4609 void RetryingCall::AddSubchannelBatchesForPendingBatches(
4610 SubchannelCallRetryState* retry_state, CallCombinerClosureList* closures) {
4611 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4612 PendingBatch* pending = &pending_batches_[i];
4613 grpc_transport_stream_op_batch* batch = pending->batch;
4614 if (batch == nullptr) continue;
4615 // Skip any batch that either (a) has already been started on this
4616 // subchannel call or (b) we can't start yet because we're still
4617 // replaying send ops that need to be completed first.
4618 // TODO(roth): Note that if any one op in the batch can't be sent
4619 // yet due to ops that we're replaying, we don't start any of the ops
4620 // in the batch. This is probably okay, but it could conceivably
4621 // lead to increased latency in some cases -- e.g., we could delay
4622 // starting a recv op due to it being in the same batch with a send
4623 // op. If/when we revamp the callback protocol in
4624 // transport_stream_op_batch, we may be able to fix this.
4625 if (batch->send_initial_metadata &&
4626 retry_state->started_send_initial_metadata) {
4627 continue;
4628 }
4629 if (batch->send_message && retry_state->completed_send_message_count <
4630 retry_state->started_send_message_count) {
4631 continue;
4632 }
4633 // Note that we only start send_trailing_metadata if we have no more
4634 // send_message ops to start, since we can't send down any more
4635 // send_message ops after send_trailing_metadata.
4636 if (batch->send_trailing_metadata &&
4637 (retry_state->started_send_message_count + batch->send_message <
4638 send_messages_.size() ||
4639 retry_state->started_send_trailing_metadata)) {
4640 continue;
4641 }
4642 if (batch->recv_initial_metadata &&
4643 retry_state->started_recv_initial_metadata) {
4644 continue;
4645 }
4646 if (batch->recv_message && retry_state->completed_recv_message_count <
4647 retry_state->started_recv_message_count) {
4648 continue;
4649 }
4650 if (batch->recv_trailing_metadata &&
4651 retry_state->started_recv_trailing_metadata) {
4652 // If we previously completed a recv_trailing_metadata op
4653 // initiated by StartInternalRecvTrailingMetadata(), use the
4654 // result of that instead of trying to re-start this op.
4655 if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
4656 nullptr))) {
4657 // If the batch completed, then trigger the completion callback
4658 // directly, so that we return the previously returned results to
4659 // the application. Otherwise, just unref the internally
4660 // started subchannel batch, since we'll propagate the
4661 // completion when it completes.
4662 if (retry_state->completed_recv_trailing_metadata) {
4663 // Batches containing recv_trailing_metadata always succeed.
4664 closures->Add(
4665 &retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
4666 "re-executing recv_trailing_metadata_ready to propagate "
4667 "internally triggered result");
4668 } else {
4669 retry_state->recv_trailing_metadata_internal_batch->Unref();
4670 }
4671 retry_state->recv_trailing_metadata_internal_batch = nullptr;
4672 }
4673 continue;
4674 }
4675 // If we're not retrying, just send the batch as-is.
4676 // TODO(roth): This condition doesn't seem exactly right -- maybe need a
4677 // notion of "draining" once we've committed and are done replaying?
4678 if (retry_policy_ == nullptr || retry_committed_) {
4679 AddClosureForSubchannelBatch(batch, closures);
4680 PendingBatchClear(pending);
4681 continue;
4682 }
4683 // Create batch with the right number of callbacks.
4684 const bool has_send_ops = batch->send_initial_metadata ||
4685 batch->send_message ||
4686 batch->send_trailing_metadata;
4687 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
4688 batch->recv_message +
4689 batch->recv_trailing_metadata;
4690 SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
4691 this, num_callbacks, has_send_ops /* set_on_complete */);
4692 // Cache send ops if needed.
4693 MaybeCacheSendOpsForBatch(pending);
4694 // send_initial_metadata.
4695 if (batch->send_initial_metadata) {
4696 AddRetriableSendInitialMetadataOp(retry_state, batch_data);
4697 }
4698 // send_message.
4699 if (batch->send_message) {
4700 AddRetriableSendMessageOp(retry_state, batch_data);
4701 }
4702 // send_trailing_metadata.
4703 if (batch->send_trailing_metadata) {
4704 AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
4705 }
4706 // recv_initial_metadata.
4707 if (batch->recv_initial_metadata) {
4708 // recv_flags is only used on the server side.
4709 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
4710 AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
4711 }
4712 // recv_message.
4713 if (batch->recv_message) {
4714 AddRetriableRecvMessageOp(retry_state, batch_data);
4715 }
4716 // recv_trailing_metadata.
4717 if (batch->recv_trailing_metadata) {
4718 AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
4719 }
4720 AddClosureForSubchannelBatch(&batch_data->batch, closures);
4721 // Track number of pending subchannel send batches.
4722 // If this is the first one, take a ref to the call stack.
4723 if (batch->send_initial_metadata || batch->send_message ||
4724 batch->send_trailing_metadata) {
4725 if (num_pending_retriable_subchannel_send_batches_ == 0) {
4726 GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
4727 }
4728 ++num_pending_retriable_subchannel_send_batches_;
4729 }
4730 }
4731 }
4732
StartRetriableSubchannelBatches(void * arg,grpc_error *)4733 void RetryingCall::StartRetriableSubchannelBatches(void* arg,
4734 grpc_error* /*ignored*/) {
4735 RetryingCall* call = static_cast<RetryingCall*>(arg);
4736 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4737 gpr_log(GPR_INFO,
4738 "chand=%p retrying_call=%p: constructing retriable batches",
4739 call->chand_, call);
4740 }
4741 SubchannelCallRetryState* retry_state =
4742 static_cast<SubchannelCallRetryState*>(call->lb_call_->GetParentData());
4743 // Construct list of closures to execute, one for each pending batch.
4744 CallCombinerClosureList closures;
4745 // Replay previously-returned send_* ops if needed.
4746 SubchannelCallBatchData* replay_batch_data =
4747 call->MaybeCreateSubchannelBatchForReplay(retry_state);
4748 if (replay_batch_data != nullptr) {
4749 call->AddClosureForSubchannelBatch(&replay_batch_data->batch, &closures);
4750 // Track number of pending subchannel send batches.
4751 // If this is the first one, take a ref to the call stack.
4752 if (call->num_pending_retriable_subchannel_send_batches_ == 0) {
4753 GRPC_CALL_STACK_REF(call->owning_call_, "subchannel_send_batches");
4754 }
4755 ++call->num_pending_retriable_subchannel_send_batches_;
4756 }
4757 // Now add pending batches.
4758 call->AddSubchannelBatchesForPendingBatches(retry_state, &closures);
4759 // Start batches on subchannel call.
4760 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4761 gpr_log(GPR_INFO,
4762 "chand=%p retrying_call=%p: starting %" PRIuPTR
4763 " retriable batches on lb_call=%p",
4764 call->chand_, call, closures.size(), call->lb_call_.get());
4765 }
4766 // Note: This will yield the call combiner.
4767 closures.RunClosures(call->call_combiner_);
4768 }
4769
CreateLbCall(void * arg,grpc_error *)4770 void RetryingCall::CreateLbCall(void* arg, grpc_error* /*error*/) {
4771 auto* call = static_cast<RetryingCall*>(arg);
4772 const size_t parent_data_size =
4773 call->enable_retries_ ? sizeof(SubchannelCallRetryState) : 0;
4774 grpc_call_element_args args = {call->owning_call_, nullptr,
4775 call->call_context_, call->path_,
4776 call->call_start_time_, call->deadline_,
4777 call->arena_, call->call_combiner_};
4778 call->lb_call_ = LoadBalancedCall::Create(call->chand_, args, call->pollent_,
4779 parent_data_size);
4780 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
4781 gpr_log(GPR_INFO, "chand=%p retrying_call=%p: create lb_call=%p",
4782 call->chand_, call, call->lb_call_.get());
4783 }
4784 if (parent_data_size > 0) {
4785 new (call->lb_call_->GetParentData())
4786 SubchannelCallRetryState(call->call_context_);
4787 }
4788 call->PendingBatchesResume();
4789 }
4790
4791 //
4792 // LoadBalancedCall::Metadata
4793 //
4794
4795 class LoadBalancedCall::Metadata
4796 : public LoadBalancingPolicy::MetadataInterface {
4797 public:
Metadata(LoadBalancedCall * lb_call,grpc_metadata_batch * batch)4798 Metadata(LoadBalancedCall* lb_call, grpc_metadata_batch* batch)
4799 : lb_call_(lb_call), batch_(batch) {}
4800
Add(absl::string_view key,absl::string_view value)4801 void Add(absl::string_view key, absl::string_view value) override {
4802 grpc_linked_mdelem* linked_mdelem = static_cast<grpc_linked_mdelem*>(
4803 lb_call_->arena_->Alloc(sizeof(grpc_linked_mdelem)));
4804 linked_mdelem->md = grpc_mdelem_from_slices(
4805 ExternallyManagedSlice(key.data(), key.size()),
4806 ExternallyManagedSlice(value.data(), value.size()));
4807 GPR_ASSERT(grpc_metadata_batch_link_tail(batch_, linked_mdelem) ==
4808 GRPC_ERROR_NONE);
4809 }
4810
begin() const4811 iterator begin() const override {
4812 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4813 "iterator size too large");
4814 return iterator(
4815 this, reinterpret_cast<intptr_t>(MaybeSkipEntry(batch_->list.head)));
4816 }
end() const4817 iterator end() const override {
4818 static_assert(sizeof(grpc_linked_mdelem*) <= sizeof(intptr_t),
4819 "iterator size too large");
4820 return iterator(this, 0);
4821 }
4822
erase(iterator it)4823 iterator erase(iterator it) override {
4824 grpc_linked_mdelem* linked_mdelem =
4825 reinterpret_cast<grpc_linked_mdelem*>(GetIteratorHandle(it));
4826 intptr_t handle = reinterpret_cast<intptr_t>(linked_mdelem->next);
4827 grpc_metadata_batch_remove(batch_, linked_mdelem);
4828 return iterator(this, handle);
4829 }
4830
4831 private:
MaybeSkipEntry(grpc_linked_mdelem * entry) const4832 grpc_linked_mdelem* MaybeSkipEntry(grpc_linked_mdelem* entry) const {
4833 if (entry != nullptr && batch_->idx.named.path == entry) {
4834 return entry->next;
4835 }
4836 return entry;
4837 }
4838
IteratorHandleNext(intptr_t handle) const4839 intptr_t IteratorHandleNext(intptr_t handle) const override {
4840 grpc_linked_mdelem* linked_mdelem =
4841 reinterpret_cast<grpc_linked_mdelem*>(handle);
4842 return reinterpret_cast<intptr_t>(MaybeSkipEntry(linked_mdelem->next));
4843 }
4844
IteratorHandleGet(intptr_t handle) const4845 std::pair<absl::string_view, absl::string_view> IteratorHandleGet(
4846 intptr_t handle) const override {
4847 grpc_linked_mdelem* linked_mdelem =
4848 reinterpret_cast<grpc_linked_mdelem*>(handle);
4849 return std::make_pair(StringViewFromSlice(GRPC_MDKEY(linked_mdelem->md)),
4850 StringViewFromSlice(GRPC_MDVALUE(linked_mdelem->md)));
4851 }
4852
4853 LoadBalancedCall* lb_call_;
4854 grpc_metadata_batch* batch_;
4855 };
4856
4857 //
4858 // LoadBalancedCall::LbCallState
4859 //
4860
4861 class LoadBalancedCall::LbCallState : public LoadBalancingPolicy::CallState {
4862 public:
LbCallState(LoadBalancedCall * lb_call)4863 explicit LbCallState(LoadBalancedCall* lb_call) : lb_call_(lb_call) {}
4864
Alloc(size_t size)4865 void* Alloc(size_t size) override { return lb_call_->arena_->Alloc(size); }
4866
GetBackendMetricData()4867 const LoadBalancingPolicy::BackendMetricData* GetBackendMetricData()
4868 override {
4869 if (lb_call_->backend_metric_data_ == nullptr) {
4870 grpc_linked_mdelem* md = lb_call_->recv_trailing_metadata_->idx.named
4871 .x_endpoint_load_metrics_bin;
4872 if (md != nullptr) {
4873 lb_call_->backend_metric_data_ =
4874 ParseBackendMetricData(GRPC_MDVALUE(md->md), lb_call_->arena_);
4875 }
4876 }
4877 return lb_call_->backend_metric_data_;
4878 }
4879
ExperimentalGetCallAttribute(const char * key)4880 absl::string_view ExperimentalGetCallAttribute(const char* key) override {
4881 auto* service_config_call_data = static_cast<ServiceConfigCallData*>(
4882 lb_call_->call_context_[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
4883 auto& call_attributes = service_config_call_data->call_attributes();
4884 auto it = call_attributes.find(key);
4885 if (it == call_attributes.end()) return absl::string_view();
4886 return it->second;
4887 }
4888
4889 private:
4890 LoadBalancedCall* lb_call_;
4891 };
4892
4893 //
4894 // LoadBalancedCall
4895 //
4896
Create(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent,size_t parent_data_size)4897 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Create(
4898 ChannelData* chand, const grpc_call_element_args& args,
4899 grpc_polling_entity* pollent, size_t parent_data_size) {
4900 const size_t alloc_size =
4901 parent_data_size > 0
4902 ? (GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall)) +
4903 parent_data_size)
4904 : sizeof(LoadBalancedCall);
4905 auto* lb_call = static_cast<LoadBalancedCall*>(args.arena->Alloc(alloc_size));
4906 new (lb_call) LoadBalancedCall(chand, args, pollent);
4907 return lb_call;
4908 }
4909
LoadBalancedCall(ChannelData * chand,const grpc_call_element_args & args,grpc_polling_entity * pollent)4910 LoadBalancedCall::LoadBalancedCall(ChannelData* chand,
4911 const grpc_call_element_args& args,
4912 grpc_polling_entity* pollent)
4913 : refs_(1, GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)
4914 ? "LoadBalancedCall"
4915 : nullptr),
4916 chand_(chand),
4917 path_(grpc_slice_ref_internal(args.path)),
4918 call_start_time_(args.start_time),
4919 deadline_(args.deadline),
4920 arena_(args.arena),
4921 owning_call_(args.call_stack),
4922 call_combiner_(args.call_combiner),
4923 call_context_(args.context),
4924 pollent_(pollent) {}
4925
~LoadBalancedCall()4926 LoadBalancedCall::~LoadBalancedCall() {
4927 grpc_slice_unref_internal(path_);
4928 GRPC_ERROR_UNREF(cancel_error_);
4929 if (backend_metric_data_ != nullptr) {
4930 backend_metric_data_
4931 ->LoadBalancingPolicy::BackendMetricData::~BackendMetricData();
4932 }
4933 // Make sure there are no remaining pending batches.
4934 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
4935 GPR_ASSERT(pending_batches_[i] == nullptr);
4936 }
4937 }
4938
Ref()4939 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref() {
4940 IncrementRefCount();
4941 return RefCountedPtr<LoadBalancedCall>(this);
4942 }
4943
Ref(const DebugLocation & location,const char * reason)4944 RefCountedPtr<LoadBalancedCall> LoadBalancedCall::Ref(
4945 const DebugLocation& location, const char* reason) {
4946 IncrementRefCount(location, reason);
4947 return RefCountedPtr<LoadBalancedCall>(this);
4948 }
4949
Unref()4950 void LoadBalancedCall::Unref() {
4951 if (GPR_UNLIKELY(refs_.Unref())) {
4952 this->~LoadBalancedCall();
4953 }
4954 }
4955
Unref(const DebugLocation & location,const char * reason)4956 void LoadBalancedCall::Unref(const DebugLocation& location,
4957 const char* reason) {
4958 if (GPR_UNLIKELY(refs_.Unref(location, reason))) {
4959 this->~LoadBalancedCall();
4960 }
4961 }
4962
IncrementRefCount()4963 void LoadBalancedCall::IncrementRefCount() { refs_.Ref(); }
4964
IncrementRefCount(const DebugLocation & location,const char * reason)4965 void LoadBalancedCall::IncrementRefCount(const DebugLocation& location,
4966 const char* reason) {
4967 refs_.Ref(location, reason);
4968 }
4969
GetParentData()4970 void* LoadBalancedCall::GetParentData() {
4971 return reinterpret_cast<char*>(this) +
4972 GPR_ROUND_UP_TO_ALIGNMENT_SIZE(sizeof(LoadBalancedCall));
4973 }
4974
GetBatchIndex(grpc_transport_stream_op_batch * batch)4975 size_t LoadBalancedCall::GetBatchIndex(grpc_transport_stream_op_batch* batch) {
4976 // Note: It is important the send_initial_metadata be the first entry
4977 // here, since the code in pick_subchannel_locked() assumes it will be.
4978 if (batch->send_initial_metadata) return 0;
4979 if (batch->send_message) return 1;
4980 if (batch->send_trailing_metadata) return 2;
4981 if (batch->recv_initial_metadata) return 3;
4982 if (batch->recv_message) return 4;
4983 if (batch->recv_trailing_metadata) return 5;
4984 GPR_UNREACHABLE_CODE(return (size_t)-1);
4985 }
4986
4987 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)4988 void LoadBalancedCall::PendingBatchesAdd(
4989 grpc_transport_stream_op_batch* batch) {
4990 const size_t idx = GetBatchIndex(batch);
4991 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
4992 gpr_log(GPR_INFO,
4993 "chand=%p lb_call=%p: adding pending batch at index %" PRIuPTR,
4994 chand_, this, idx);
4995 }
4996 GPR_ASSERT(pending_batches_[idx] == nullptr);
4997 pending_batches_[idx] = batch;
4998 }
4999
5000 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error * error)5001 void LoadBalancedCall::FailPendingBatchInCallCombiner(void* arg,
5002 grpc_error* error) {
5003 grpc_transport_stream_op_batch* batch =
5004 static_cast<grpc_transport_stream_op_batch*>(arg);
5005 auto* self = static_cast<LoadBalancedCall*>(batch->handler_private.extra_arg);
5006 // Note: This will release the call combiner.
5007 grpc_transport_stream_op_batch_finish_with_failure(
5008 batch, GRPC_ERROR_REF(error), self->call_combiner_);
5009 }
5010
5011 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error * error,YieldCallCombinerPredicate yield_call_combiner_predicate)5012 void LoadBalancedCall::PendingBatchesFail(
5013 grpc_error* error,
5014 YieldCallCombinerPredicate yield_call_combiner_predicate) {
5015 GPR_ASSERT(error != GRPC_ERROR_NONE);
5016 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5017 size_t num_batches = 0;
5018 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5019 if (pending_batches_[i] != nullptr) ++num_batches;
5020 }
5021 gpr_log(GPR_INFO,
5022 "chand=%p lb_call=%p: failing %" PRIuPTR " pending batches: %s",
5023 chand_, this, num_batches, grpc_error_string(error));
5024 }
5025 CallCombinerClosureList closures;
5026 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5027 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5028 if (batch != nullptr) {
5029 batch->handler_private.extra_arg = this;
5030 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5031 FailPendingBatchInCallCombiner, batch,
5032 grpc_schedule_on_exec_ctx);
5033 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
5034 "PendingBatchesFail");
5035 batch = nullptr;
5036 }
5037 }
5038 if (yield_call_combiner_predicate(closures)) {
5039 closures.RunClosures(call_combiner_);
5040 } else {
5041 closures.RunClosuresWithoutYielding(call_combiner_);
5042 }
5043 GRPC_ERROR_UNREF(error);
5044 }
5045
5046 // This is called via the call combiner, so access to calld is synchronized.
ResumePendingBatchInCallCombiner(void * arg,grpc_error *)5047 void LoadBalancedCall::ResumePendingBatchInCallCombiner(
5048 void* arg, grpc_error* /*ignored*/) {
5049 grpc_transport_stream_op_batch* batch =
5050 static_cast<grpc_transport_stream_op_batch*>(arg);
5051 SubchannelCall* subchannel_call =
5052 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
5053 // Note: This will release the call combiner.
5054 subchannel_call->StartTransportStreamOpBatch(batch);
5055 }
5056
5057 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesResume()5058 void LoadBalancedCall::PendingBatchesResume() {
5059 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5060 size_t num_batches = 0;
5061 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5062 if (pending_batches_[i] != nullptr) ++num_batches;
5063 }
5064 gpr_log(GPR_INFO,
5065 "chand=%p lb_call=%p: starting %" PRIuPTR
5066 " pending batches on subchannel_call=%p",
5067 chand_, this, num_batches, subchannel_call_.get());
5068 }
5069 CallCombinerClosureList closures;
5070 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
5071 grpc_transport_stream_op_batch*& batch = pending_batches_[i];
5072 if (batch != nullptr) {
5073 batch->handler_private.extra_arg = subchannel_call_.get();
5074 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
5075 ResumePendingBatchInCallCombiner, batch,
5076 grpc_schedule_on_exec_ctx);
5077 closures.Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
5078 "PendingBatchesResume");
5079 batch = nullptr;
5080 }
5081 }
5082 // Note: This will release the call combiner.
5083 closures.RunClosures(call_combiner_);
5084 }
5085
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)5086 void LoadBalancedCall::StartTransportStreamOpBatch(
5087 grpc_transport_stream_op_batch* batch) {
5088 // Intercept recv_trailing_metadata_ready for LB callback.
5089 if (batch->recv_trailing_metadata) {
5090 InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(batch);
5091 }
5092 // If we've previously been cancelled, immediately fail any new batches.
5093 if (GPR_UNLIKELY(cancel_error_ != GRPC_ERROR_NONE)) {
5094 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5095 gpr_log(GPR_INFO, "chand=%p lb_call=%p: failing batch with error: %s",
5096 chand_, this, grpc_error_string(cancel_error_));
5097 }
5098 // Note: This will release the call combiner.
5099 grpc_transport_stream_op_batch_finish_with_failure(
5100 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5101 return;
5102 }
5103 // Handle cancellation.
5104 if (GPR_UNLIKELY(batch->cancel_stream)) {
5105 // Stash a copy of cancel_error in our call data, so that we can use
5106 // it for subsequent operations. This ensures that if the call is
5107 // cancelled before any batches are passed down (e.g., if the deadline
5108 // is in the past when the call starts), we can return the right
5109 // error to the caller when the first batch does get passed down.
5110 GRPC_ERROR_UNREF(cancel_error_);
5111 cancel_error_ = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
5112 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5113 gpr_log(GPR_INFO, "chand=%p lb_call=%p: recording cancel_error=%s",
5114 chand_, this, grpc_error_string(cancel_error_));
5115 }
5116 // If we do not have a subchannel call (i.e., a pick has not yet
5117 // been started), fail all pending batches. Otherwise, send the
5118 // cancellation down to the subchannel call.
5119 if (subchannel_call_ == nullptr) {
5120 PendingBatchesFail(GRPC_ERROR_REF(cancel_error_), NoYieldCallCombiner);
5121 // Note: This will release the call combiner.
5122 grpc_transport_stream_op_batch_finish_with_failure(
5123 batch, GRPC_ERROR_REF(cancel_error_), call_combiner_);
5124 } else {
5125 // Note: This will release the call combiner.
5126 subchannel_call_->StartTransportStreamOpBatch(batch);
5127 }
5128 return;
5129 }
5130 // Add the batch to the pending list.
5131 PendingBatchesAdd(batch);
5132 // Check if we've already gotten a subchannel call.
5133 // Note that once we have picked a subchannel, we do not need to acquire
5134 // the channel's data plane mutex, which is more efficient (especially for
5135 // streaming calls).
5136 if (subchannel_call_ != nullptr) {
5137 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5138 gpr_log(GPR_INFO,
5139 "chand=%p lb_call=%p: starting batch on subchannel_call=%p",
5140 chand_, this, subchannel_call_.get());
5141 }
5142 PendingBatchesResume();
5143 return;
5144 }
5145 // We do not yet have a subchannel call.
5146 // For batches containing a send_initial_metadata op, acquire the
5147 // channel's data plane mutex to pick a subchannel.
5148 if (GPR_LIKELY(batch->send_initial_metadata)) {
5149 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5150 gpr_log(GPR_INFO,
5151 "chand=%p lb_call=%p: grabbing data plane mutex to perform pick",
5152 chand_, this);
5153 }
5154 PickSubchannel(this, GRPC_ERROR_NONE);
5155 } else {
5156 // For all other batches, release the call combiner.
5157 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_call_trace)) {
5158 gpr_log(GPR_INFO,
5159 "chand=%p lb_call=%p: saved batch, yielding call combiner",
5160 chand_, this);
5161 }
5162 GRPC_CALL_COMBINER_STOP(call_combiner_,
5163 "batch does not include send_initial_metadata");
5164 }
5165 }
5166
RecvTrailingMetadataReadyForLoadBalancingPolicy(void * arg,grpc_error * error)5167 void LoadBalancedCall::RecvTrailingMetadataReadyForLoadBalancingPolicy(
5168 void* arg, grpc_error* error) {
5169 auto* self = static_cast<LoadBalancedCall*>(arg);
5170 if (self->lb_recv_trailing_metadata_ready_ != nullptr) {
5171 // Set error if call did not succeed.
5172 grpc_error* error_for_lb = GRPC_ERROR_NONE;
5173 if (error != GRPC_ERROR_NONE) {
5174 error_for_lb = error;
5175 } else {
5176 const auto& fields = self->recv_trailing_metadata_->idx.named;
5177 GPR_ASSERT(fields.grpc_status != nullptr);
5178 grpc_status_code status =
5179 grpc_get_status_code_from_metadata(fields.grpc_status->md);
5180 std::string msg;
5181 if (status != GRPC_STATUS_OK) {
5182 error_for_lb = grpc_error_set_int(
5183 GRPC_ERROR_CREATE_FROM_STATIC_STRING("call failed"),
5184 GRPC_ERROR_INT_GRPC_STATUS, status);
5185 if (fields.grpc_message != nullptr) {
5186 error_for_lb = grpc_error_set_str(
5187 error_for_lb, GRPC_ERROR_STR_GRPC_MESSAGE,
5188 grpc_slice_ref_internal(GRPC_MDVALUE(fields.grpc_message->md)));
5189 }
5190 }
5191 }
5192 // Invoke callback to LB policy.
5193 Metadata trailing_metadata(self, self->recv_trailing_metadata_);
5194 LbCallState lb_call_state(self);
5195 self->lb_recv_trailing_metadata_ready_(error_for_lb, &trailing_metadata,
5196 &lb_call_state);
5197 if (error == GRPC_ERROR_NONE) GRPC_ERROR_UNREF(error_for_lb);
5198 }
5199 // Chain to original callback.
5200 Closure::Run(DEBUG_LOCATION, self->original_recv_trailing_metadata_ready_,
5201 GRPC_ERROR_REF(error));
5202 }
5203
5204 // TODO(roth): Consider not intercepting this callback unless we
5205 // actually need to, if this causes a performance problem.
InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(grpc_transport_stream_op_batch * batch)5206 void LoadBalancedCall::InjectRecvTrailingMetadataReadyForLoadBalancingPolicy(
5207 grpc_transport_stream_op_batch* batch) {
5208 recv_trailing_metadata_ =
5209 batch->payload->recv_trailing_metadata.recv_trailing_metadata;
5210 original_recv_trailing_metadata_ready_ =
5211 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready;
5212 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
5213 RecvTrailingMetadataReadyForLoadBalancingPolicy, this,
5214 grpc_schedule_on_exec_ctx);
5215 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
5216 &recv_trailing_metadata_ready_;
5217 }
5218
CreateSubchannelCall()5219 void LoadBalancedCall::CreateSubchannelCall() {
5220 SubchannelCall::Args call_args = {
5221 std::move(connected_subchannel_), pollent_, path_, call_start_time_,
5222 deadline_, arena_,
5223 // TODO(roth): When we implement hedging support, we will probably
5224 // need to use a separate call context for each subchannel call.
5225 call_context_, call_combiner_};
5226 grpc_error* error = GRPC_ERROR_NONE;
5227 subchannel_call_ = SubchannelCall::Create(std::move(call_args), &error);
5228 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5229 gpr_log(GPR_INFO,
5230 "chand=%p lb_call=%p: create subchannel_call=%p: error=%s", chand_,
5231 this, subchannel_call_.get(), grpc_error_string(error));
5232 }
5233 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
5234 PendingBatchesFail(error, YieldCallCombiner);
5235 } else {
5236 PendingBatchesResume();
5237 }
5238 }
5239
5240 // A class to handle the call combiner cancellation callback for a
5241 // queued pick.
5242 // TODO(roth): When we implement hedging support, we won't be able to
5243 // register a call combiner cancellation closure for each LB pick,
5244 // because there may be multiple LB picks happening in parallel.
5245 // Instead, we will probably need to maintain a list in the CallData
5246 // object of pending LB picks to be cancelled when the closure runs.
5247 class LoadBalancedCall::LbQueuedCallCanceller {
5248 public:
LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)5249 explicit LbQueuedCallCanceller(RefCountedPtr<LoadBalancedCall> lb_call)
5250 : lb_call_(std::move(lb_call)) {
5251 GRPC_CALL_STACK_REF(lb_call_->owning_call_, "LbQueuedCallCanceller");
5252 GRPC_CLOSURE_INIT(&closure_, &CancelLocked, this, nullptr);
5253 lb_call_->call_combiner_->SetNotifyOnCancel(&closure_);
5254 }
5255
5256 private:
CancelLocked(void * arg,grpc_error * error)5257 static void CancelLocked(void* arg, grpc_error* error) {
5258 auto* self = static_cast<LbQueuedCallCanceller*>(arg);
5259 auto* lb_call = self->lb_call_.get();
5260 auto* chand = lb_call->chand_;
5261 {
5262 MutexLock lock(chand->data_plane_mu());
5263 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5264 gpr_log(GPR_INFO,
5265 "chand=%p lb_call=%p: cancelling queued pick: "
5266 "error=%s self=%p calld->pick_canceller=%p",
5267 chand, lb_call, grpc_error_string(error), self,
5268 lb_call->lb_call_canceller_);
5269 }
5270 if (lb_call->lb_call_canceller_ == self && error != GRPC_ERROR_NONE) {
5271 // Remove pick from list of queued picks.
5272 lb_call->MaybeRemoveCallFromLbQueuedCallsLocked();
5273 // Fail pending batches on the call.
5274 lb_call->PendingBatchesFail(GRPC_ERROR_REF(error),
5275 YieldCallCombinerIfPendingBatchesFound);
5276 }
5277 }
5278 GRPC_CALL_STACK_UNREF(lb_call->owning_call_, "LbQueuedCallCanceller");
5279 delete self;
5280 }
5281
5282 RefCountedPtr<LoadBalancedCall> lb_call_;
5283 grpc_closure closure_;
5284 };
5285
MaybeRemoveCallFromLbQueuedCallsLocked()5286 void LoadBalancedCall::MaybeRemoveCallFromLbQueuedCallsLocked() {
5287 if (!queued_pending_lb_pick_) return;
5288 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5289 gpr_log(GPR_INFO, "chand=%p lb_call=%p: removing from queued picks list",
5290 chand_, this);
5291 }
5292 chand_->RemoveLbQueuedCall(&queued_call_, pollent_);
5293 queued_pending_lb_pick_ = false;
5294 // Lame the call combiner canceller.
5295 lb_call_canceller_ = nullptr;
5296 }
5297
MaybeAddCallToLbQueuedCallsLocked()5298 void LoadBalancedCall::MaybeAddCallToLbQueuedCallsLocked() {
5299 if (queued_pending_lb_pick_) return;
5300 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5301 gpr_log(GPR_INFO, "chand=%p lb_call=%p: adding to queued picks list",
5302 chand_, this);
5303 }
5304 queued_pending_lb_pick_ = true;
5305 queued_call_.lb_call = this;
5306 chand_->AddLbQueuedCall(&queued_call_, pollent_);
5307 // Register call combiner cancellation callback.
5308 lb_call_canceller_ = new LbQueuedCallCanceller(Ref());
5309 }
5310
AsyncPickDone(grpc_error * error)5311 void LoadBalancedCall::AsyncPickDone(grpc_error* error) {
5312 GRPC_CLOSURE_INIT(&pick_closure_, PickDone, this, grpc_schedule_on_exec_ctx);
5313 ExecCtx::Run(DEBUG_LOCATION, &pick_closure_, error);
5314 }
5315
PickDone(void * arg,grpc_error * error)5316 void LoadBalancedCall::PickDone(void* arg, grpc_error* error) {
5317 auto* self = static_cast<LoadBalancedCall*>(arg);
5318 if (error != GRPC_ERROR_NONE) {
5319 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5320 gpr_log(GPR_INFO,
5321 "chand=%p lb_call=%p: failed to pick subchannel: error=%s",
5322 self->chand_, self, grpc_error_string(error));
5323 }
5324 self->PendingBatchesFail(GRPC_ERROR_REF(error), YieldCallCombiner);
5325 return;
5326 }
5327 self->CreateSubchannelCall();
5328 }
5329
PickResultTypeName(LoadBalancingPolicy::PickResult::ResultType type)5330 const char* PickResultTypeName(
5331 LoadBalancingPolicy::PickResult::ResultType type) {
5332 switch (type) {
5333 case LoadBalancingPolicy::PickResult::PICK_COMPLETE:
5334 return "COMPLETE";
5335 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5336 return "QUEUE";
5337 case LoadBalancingPolicy::PickResult::PICK_FAILED:
5338 return "FAILED";
5339 }
5340 GPR_UNREACHABLE_CODE(return "UNKNOWN");
5341 }
5342
PickSubchannel(void * arg,grpc_error * error)5343 void LoadBalancedCall::PickSubchannel(void* arg, grpc_error* error) {
5344 auto* self = static_cast<LoadBalancedCall*>(arg);
5345 bool pick_complete;
5346 {
5347 MutexLock lock(self->chand_->data_plane_mu());
5348 pick_complete = self->PickSubchannelLocked(&error);
5349 }
5350 if (pick_complete) {
5351 PickDone(self, error);
5352 GRPC_ERROR_UNREF(error);
5353 }
5354 }
5355
PickSubchannelLocked(grpc_error ** error)5356 bool LoadBalancedCall::PickSubchannelLocked(grpc_error** error) {
5357 GPR_ASSERT(connected_subchannel_ == nullptr);
5358 GPR_ASSERT(subchannel_call_ == nullptr);
5359 // Grab initial metadata.
5360 auto& send_initial_metadata =
5361 pending_batches_[0]->payload->send_initial_metadata;
5362 grpc_metadata_batch* initial_metadata_batch =
5363 send_initial_metadata.send_initial_metadata;
5364 const uint32_t send_initial_metadata_flags =
5365 send_initial_metadata.send_initial_metadata_flags;
5366 // Perform LB pick.
5367 LoadBalancingPolicy::PickArgs pick_args;
5368 pick_args.path = StringViewFromSlice(path_);
5369 LbCallState lb_call_state(this);
5370 pick_args.call_state = &lb_call_state;
5371 Metadata initial_metadata(this, initial_metadata_batch);
5372 pick_args.initial_metadata = &initial_metadata;
5373 auto result = chand_->picker()->Pick(pick_args);
5374 if (GRPC_TRACE_FLAG_ENABLED(grpc_client_channel_routing_trace)) {
5375 gpr_log(
5376 GPR_INFO,
5377 "chand=%p lb_call=%p: LB pick returned %s (subchannel=%p, error=%s)",
5378 chand_, this, PickResultTypeName(result.type), result.subchannel.get(),
5379 grpc_error_string(result.error));
5380 }
5381 switch (result.type) {
5382 case LoadBalancingPolicy::PickResult::PICK_FAILED: {
5383 // If we're shutting down, fail all RPCs.
5384 grpc_error* disconnect_error = chand_->disconnect_error();
5385 if (disconnect_error != GRPC_ERROR_NONE) {
5386 GRPC_ERROR_UNREF(result.error);
5387 MaybeRemoveCallFromLbQueuedCallsLocked();
5388 *error = GRPC_ERROR_REF(disconnect_error);
5389 return true;
5390 }
5391 // If wait_for_ready is false, then the error indicates the RPC
5392 // attempt's final status.
5393 if ((send_initial_metadata_flags &
5394 GRPC_INITIAL_METADATA_WAIT_FOR_READY) == 0) {
5395 grpc_error* new_error =
5396 GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
5397 "Failed to pick subchannel", &result.error, 1);
5398 GRPC_ERROR_UNREF(result.error);
5399 *error = new_error;
5400 MaybeRemoveCallFromLbQueuedCallsLocked();
5401 return true;
5402 }
5403 // If wait_for_ready is true, then queue to retry when we get a new
5404 // picker.
5405 GRPC_ERROR_UNREF(result.error);
5406 }
5407 // Fallthrough
5408 case LoadBalancingPolicy::PickResult::PICK_QUEUE:
5409 MaybeAddCallToLbQueuedCallsLocked();
5410 return false;
5411 default: // PICK_COMPLETE
5412 MaybeRemoveCallFromLbQueuedCallsLocked();
5413 // Handle drops.
5414 if (GPR_UNLIKELY(result.subchannel == nullptr)) {
5415 result.error = grpc_error_set_int(
5416 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
5417 "Call dropped by load balancing policy"),
5418 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
5419 } else {
5420 // Grab a ref to the connected subchannel while we're still
5421 // holding the data plane mutex.
5422 connected_subchannel_ =
5423 chand_->GetConnectedSubchannelInDataPlane(result.subchannel.get());
5424 GPR_ASSERT(connected_subchannel_ != nullptr);
5425 }
5426 lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
5427 *error = result.error;
5428 return true;
5429 }
5430 }
5431
5432 } // namespace
5433 } // namespace grpc_core
5434
5435 /*************************************************************************
5436 * EXPORTED SYMBOLS
5437 */
5438
5439 using grpc_core::CallData;
5440 using grpc_core::ChannelData;
5441
5442 const grpc_channel_filter grpc_client_channel_filter = {
5443 CallData::StartTransportStreamOpBatch,
5444 ChannelData::StartTransportOp,
5445 sizeof(CallData),
5446 CallData::Init,
5447 CallData::SetPollent,
5448 CallData::Destroy,
5449 sizeof(ChannelData),
5450 ChannelData::Init,
5451 ChannelData::Destroy,
5452 ChannelData::GetChannelInfo,
5453 "client-channel",
5454 };
5455
grpc_client_channel_check_connectivity_state(grpc_channel_element * elem,int try_to_connect)5456 grpc_connectivity_state grpc_client_channel_check_connectivity_state(
5457 grpc_channel_element* elem, int try_to_connect) {
5458 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5459 return chand->CheckConnectivityState(try_to_connect);
5460 }
5461
grpc_client_channel_num_external_connectivity_watchers(grpc_channel_element * elem)5462 int grpc_client_channel_num_external_connectivity_watchers(
5463 grpc_channel_element* elem) {
5464 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5465 return chand->NumExternalConnectivityWatchers();
5466 }
5467
grpc_client_channel_watch_connectivity_state(grpc_channel_element * elem,grpc_polling_entity pollent,grpc_connectivity_state * state,grpc_closure * on_complete,grpc_closure * watcher_timer_init)5468 void grpc_client_channel_watch_connectivity_state(
5469 grpc_channel_element* elem, grpc_polling_entity pollent,
5470 grpc_connectivity_state* state, grpc_closure* on_complete,
5471 grpc_closure* watcher_timer_init) {
5472 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5473 if (state == nullptr) {
5474 // Handle cancellation.
5475 GPR_ASSERT(watcher_timer_init == nullptr);
5476 chand->RemoveExternalConnectivityWatcher(on_complete, /*cancel=*/true);
5477 return;
5478 }
5479 // Handle addition.
5480 return chand->AddExternalConnectivityWatcher(pollent, state, on_complete,
5481 watcher_timer_init);
5482 }
5483
grpc_client_channel_start_connectivity_watch(grpc_channel_element * elem,grpc_connectivity_state initial_state,grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface> watcher)5484 void grpc_client_channel_start_connectivity_watch(
5485 grpc_channel_element* elem, grpc_connectivity_state initial_state,
5486 grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
5487 watcher) {
5488 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5489 chand->AddConnectivityWatcher(initial_state, std::move(watcher));
5490 }
5491
grpc_client_channel_stop_connectivity_watch(grpc_channel_element * elem,grpc_core::AsyncConnectivityStateWatcherInterface * watcher)5492 void grpc_client_channel_stop_connectivity_watch(
5493 grpc_channel_element* elem,
5494 grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
5495 auto* chand = static_cast<ChannelData*>(elem->channel_data);
5496 chand->RemoveConnectivityWatcher(watcher);
5497 }
5498