1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include "src/core/ext/filters/client_channel/retry_filter.h"
20
21 #include "absl/container/inlined_vector.h"
22 #include "absl/status/statusor.h"
23 #include "absl/strings/strip.h"
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/retry_service_config.h"
29 #include "src/core/ext/filters/client_channel/retry_throttle.h"
30 #include "src/core/ext/filters/client_channel/service_config.h"
31 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
32 #include "src/core/lib/backoff/backoff.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/channel/status_util.h"
36 #include "src/core/lib/gprpp/manual_constructor.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/transport/error_utils.h"
41 #include "src/core/lib/transport/metadata.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/uri/uri_parser.h"
46
47 //
48 // Retry filter
49 //
50
51 // This filter is intended to be used in the DynamicFilter stack in the
52 // client channel, which is situated between the name resolver and the
53 // LB policy. Normally, the last filter in the DynamicFilter stack is
54 // the DynamicTerminationFilter (see client_channel.cc), which creates a
55 // LoadBalancedCall and delegates to it. However, when retries are
56 // enabled, this filter is used instead of the DynamicTerminationFilter.
57 //
58 // In order to support retries, we act as a proxy for stream op batches.
59 // When we get a batch from the surface, we add it to our list of pending
60 // batches, and we then use those batches to construct separate "child"
61 // batches to be started on an LB call. When the child batches return, we
62 // then decide which pending batches have been completed and schedule their
63 // callbacks accordingly. If a call attempt fails and we want to retry it,
64 // we create a new LB call and start again, constructing new "child" batches
65 // for the new LB call.
66 //
67 // Note that retries are committed when receiving data from the server
68 // (except for Trailers-Only responses). However, there may be many
69 // send ops started before receiving any data, so we may have already
70 // completed some number of send ops (and returned the completions up to
71 // the surface) by the time we realize that we need to retry. To deal
72 // with this, we cache data for send ops, so that we can replay them on a
73 // different LB call even after we have completed the original batches.
74 //
75 // The code is structured as follows:
76 // - In CallData (in the parent channel), we maintain a list of pending
77 // ops and cached data for send ops.
78 // - There is a CallData::CallAttempt object for each retry attempt.
79 // This object contains the LB call for that attempt and state to indicate
80 // which ops from the CallData object have already been sent down to that
81 // LB call.
82 // - There is a CallData::CallAttempt::BatchData object for each "child"
83 // batch sent on the LB call.
84 //
85 // When constructing the "child" batches, we compare the state in the
86 // CallAttempt object against the state in the CallData object to see
87 // which batches need to be sent on the LB call for a given attempt.
88
89 // TODO(roth): In subsequent PRs:
90 // - add support for transparent retries (including initial metadata)
91 // - figure out how to record stats in census for retries
92 // (census filter is on top of this one)
93 // - add census stats for retries
94
95 // By default, we buffer 256 KiB per RPC for retries.
96 // TODO(roth): Do we have any data to suggest a better value?
97 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
98
99 // This value was picked arbitrarily. It can be changed if there is
100 // any even moderately compelling reason to do so.
101 #define RETRY_BACKOFF_JITTER 0.2
102
103 namespace grpc_core {
104
105 namespace {
106
107 using internal::RetryGlobalConfig;
108 using internal::RetryMethodConfig;
109 using internal::RetryServiceConfigParser;
110 using internal::ServerRetryThrottleData;
111
112 TraceFlag grpc_retry_trace(false, "retry");
113
114 //
115 // RetryFilter
116 //
117
118 class RetryFilter {
119 public:
120 class CallData;
121
Init(grpc_channel_element * elem,grpc_channel_element_args * args)122 static grpc_error_handle Init(grpc_channel_element* elem,
123 grpc_channel_element_args* args) {
124 GPR_ASSERT(args->is_last);
125 GPR_ASSERT(elem->filter == &kRetryFilterVtable);
126 grpc_error_handle error = GRPC_ERROR_NONE;
127 new (elem->channel_data) RetryFilter(args->channel_args, &error);
128 return error;
129 }
130
Destroy(grpc_channel_element * elem)131 static void Destroy(grpc_channel_element* elem) {
132 auto* chand = static_cast<RetryFilter*>(elem->channel_data);
133 chand->~RetryFilter();
134 }
135
136 // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)137 static void StartTransportOp(grpc_channel_element* /*elem*/,
138 grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)139 static void GetChannelInfo(grpc_channel_element* /*elem*/,
140 const grpc_channel_info* /*info*/) {}
141
142 private:
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)143 static size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
144 return static_cast<size_t>(grpc_channel_args_find_integer(
145 args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE,
146 {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
147 }
148
RetryFilter(const grpc_channel_args * args,grpc_error_handle * error)149 RetryFilter(const grpc_channel_args* args, grpc_error_handle* error)
150 : client_channel_(grpc_channel_args_find_pointer<ClientChannel>(
151 args, GRPC_ARG_CLIENT_CHANNEL)),
152 per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)) {
153 // Get retry throttling parameters from service config.
154 auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>(
155 args, GRPC_ARG_SERVICE_CONFIG_OBJ);
156 if (service_config == nullptr) return;
157 const auto* config = static_cast<const RetryGlobalConfig*>(
158 service_config->GetGlobalParsedConfig(
159 RetryServiceConfigParser::ParserIndex()));
160 if (config == nullptr) return;
161 // Get server name from target URI.
162 const char* server_uri =
163 grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
164 if (server_uri == nullptr) {
165 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
166 "server URI channel arg missing or wrong type in client channel "
167 "filter");
168 return;
169 }
170 absl::StatusOr<URI> uri = URI::Parse(server_uri);
171 if (!uri.ok() || uri->path().empty()) {
172 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
173 "could not extract server name from target URI");
174 return;
175 }
176 std::string server_name(absl::StripPrefix(uri->path(), "/"));
177 // Get throttling config for server_name.
178 retry_throttle_data_ = internal::ServerRetryThrottleMap::GetDataForServer(
179 server_name, config->max_milli_tokens(), config->milli_token_ratio());
180 }
181
182 ClientChannel* client_channel_;
183 size_t per_rpc_retry_buffer_size_;
184 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
185 };
186
187 //
188 // RetryFilter::CallData
189 //
190
191 class RetryFilter::CallData {
192 public:
193 static grpc_error_handle Init(grpc_call_element* elem,
194 const grpc_call_element_args* args);
195 static void Destroy(grpc_call_element* elem,
196 const grpc_call_final_info* /*final_info*/,
197 grpc_closure* then_schedule_closure);
198 static void StartTransportStreamOpBatch(
199 grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
200 static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
201
202 private:
203 class Canceller;
204 class CallStackDestructionBarrier;
205
206 // Pending batches stored in call data.
207 struct PendingBatch {
208 // The pending batch. If nullptr, this slot is empty.
209 grpc_transport_stream_op_batch* batch = nullptr;
210 // Indicates whether payload for send ops has been cached in CallData.
211 bool send_ops_cached = false;
212 };
213
214 // State associated with each call attempt.
215 // Allocated on the arena.
216 class CallAttempt
217 : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
218 public:
219 explicit CallAttempt(CallData* calld);
220
lb_call() const221 ClientChannel::LoadBalancedCall* lb_call() const { return lb_call_.get(); }
222
223 // Constructs and starts whatever batches are needed on this call
224 // attempt.
225 void StartRetriableBatches();
226
227 // Frees cached send ops that have already been completed after
228 // committing the call.
229 void FreeCachedSendOpDataAfterCommit();
230
231 private:
232 // State used for starting a retryable batch on the call attempt's LB call.
233 // This provides its own grpc_transport_stream_op_batch and other data
234 // structures needed to populate the ops in the batch.
235 // We allocate one struct on the arena for each attempt at starting a
236 // batch on a given LB call.
237 class BatchData
238 : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
239 public:
240 BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
241 bool set_on_complete);
242 ~BatchData() override;
243
batch()244 grpc_transport_stream_op_batch* batch() { return &batch_; }
245
246 // Adds retriable send_initial_metadata op to batch_data.
247 void AddRetriableSendInitialMetadataOp();
248 // Adds retriable send_message op to batch_data.
249 void AddRetriableSendMessageOp();
250 // Adds retriable send_trailing_metadata op to batch_data.
251 void AddRetriableSendTrailingMetadataOp();
252 // Adds retriable recv_initial_metadata op to batch_data.
253 void AddRetriableRecvInitialMetadataOp();
254 // Adds retriable recv_message op to batch_data.
255 void AddRetriableRecvMessageOp();
256 // Adds retriable recv_trailing_metadata op to batch_data.
257 void AddRetriableRecvTrailingMetadataOp();
258
259 private:
260 // Returns true if the call is being retried.
261 bool MaybeRetry(grpc_status_code status, grpc_mdelem* server_pushback_md,
262 bool is_lb_drop);
263
264 // Frees cached send ops that were completed by the completed batch in
265 // batch_data. Used when batches are completed after the call is
266 // committed.
267 void FreeCachedSendOpDataForCompletedBatch();
268
269 // Invokes recv_initial_metadata_ready for a batch.
270 static void InvokeRecvInitialMetadataCallback(void* arg,
271 grpc_error_handle error);
272 // Intercepts recv_initial_metadata_ready callback for retries.
273 // Commits the call and returns the initial metadata up the stack.
274 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
275
276 // Invokes recv_message_ready for a batch.
277 static void InvokeRecvMessageCallback(void* arg, grpc_error_handle error);
278 // Intercepts recv_message_ready callback for retries.
279 // Commits the call and returns the message up the stack.
280 static void RecvMessageReady(void* arg, grpc_error_handle error);
281
282 // Adds recv_trailing_metadata_ready closure to closures.
283 void AddClosureForRecvTrailingMetadataReady(
284 grpc_error_handle error, CallCombinerClosureList* closures);
285 // Adds any necessary closures for deferred recv_initial_metadata and
286 // recv_message callbacks to closures.
287 void AddClosuresForDeferredRecvCallbacks(
288 CallCombinerClosureList* closures);
289 // For any pending batch containing an op that has not yet been started,
290 // adds the pending batch's completion closures to closures.
291 void AddClosuresToFailUnstartedPendingBatches(
292 grpc_error_handle error, CallCombinerClosureList* closures);
293 // Runs necessary closures upon completion of a call attempt.
294 void RunClosuresForCompletedCall(grpc_error_handle error);
295 // Intercepts recv_trailing_metadata_ready callback for retries.
296 // Commits the call and returns the trailing metadata up the stack.
297 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
298
299 // Adds the on_complete closure for the pending batch completed in
300 // batch_data to closures.
301 void AddClosuresForCompletedPendingBatch(
302 grpc_error_handle error, CallCombinerClosureList* closures);
303
304 // If there are any cached ops to replay or pending ops to start on the
305 // LB call, adds them to closures.
306 void AddClosuresForReplayOrPendingSendOps(
307 CallCombinerClosureList* closures);
308
309 // Callback used to intercept on_complete from LB calls.
310 static void OnComplete(void* arg, grpc_error_handle error);
311
312 RefCountedPtr<CallAttempt> call_attempt_;
313 // The batch to use in the LB call.
314 // Its payload field points to CallAttempt::batch_payload_.
315 grpc_transport_stream_op_batch batch_;
316 // For intercepting on_complete.
317 grpc_closure on_complete_;
318 };
319
320 // Creates a BatchData object on the call's arena with the
321 // specified refcount. If set_on_complete is true, the batch's
322 // on_complete callback will be set to point to on_complete();
323 // otherwise, the batch's on_complete callback will be null.
CreateBatch(int refcount,bool set_on_complete)324 BatchData* CreateBatch(int refcount, bool set_on_complete) {
325 return calld_->arena_->New<BatchData>(Ref(), refcount, set_on_complete);
326 }
327
328 // If there are any cached send ops that need to be replayed on this
329 // call attempt, creates and returns a new batch to replay those ops.
330 // Otherwise, returns nullptr.
331 BatchData* MaybeCreateBatchForReplay();
332
333 // Adds batches for pending batches to closures.
334 void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
335
336 // Adds whatever batches are needed on this attempt to closures.
337 void AddRetriableBatches(CallCombinerClosureList* closures);
338
339 // Returns true if any op in the batch was not yet started on this attempt.
340 bool PendingBatchIsUnstarted(PendingBatch* pending);
341
342 // Helper function used to start a recv_trailing_metadata batch. This
343 // is used in the case where a recv_initial_metadata or recv_message
344 // op fails in a way that we know the call is over but when the application
345 // has not yet started its own recv_trailing_metadata op.
346 void StartInternalRecvTrailingMetadata();
347
348 CallData* calld_;
349 RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call_;
350
351 // BatchData.batch.payload points to this.
352 grpc_transport_stream_op_batch_payload batch_payload_;
353 // For send_initial_metadata.
354 // Note that we need to make a copy of the initial metadata for each
355 // call attempt instead of just referring to the copy in call_data,
356 // because filters in the subchannel stack may modify the metadata,
357 // so we need to start in a pristine state for each attempt of the call.
358 grpc_linked_mdelem* send_initial_metadata_storage_;
359 grpc_metadata_batch send_initial_metadata_;
360 // For send_message.
361 // TODO(roth): Restructure this to eliminate use of ManualConstructor.
362 ManualConstructor<ByteStreamCache::CachingByteStream> send_message_;
363 // For send_trailing_metadata.
364 grpc_linked_mdelem* send_trailing_metadata_storage_;
365 grpc_metadata_batch send_trailing_metadata_;
366 // For intercepting recv_initial_metadata.
367 grpc_metadata_batch recv_initial_metadata_;
368 grpc_closure recv_initial_metadata_ready_;
369 bool trailing_metadata_available_ = false;
370 // For intercepting recv_message.
371 grpc_closure recv_message_ready_;
372 OrphanablePtr<ByteStream> recv_message_;
373 // For intercepting recv_trailing_metadata.
374 grpc_metadata_batch recv_trailing_metadata_;
375 grpc_transport_stream_stats collect_stats_;
376 grpc_closure recv_trailing_metadata_ready_;
377 // These fields indicate which ops have been started and completed on
378 // this call attempt.
379 size_t started_send_message_count_ = 0;
380 size_t completed_send_message_count_ = 0;
381 size_t started_recv_message_count_ = 0;
382 size_t completed_recv_message_count_ = 0;
383 bool started_send_initial_metadata_ : 1;
384 bool completed_send_initial_metadata_ : 1;
385 bool started_send_trailing_metadata_ : 1;
386 bool completed_send_trailing_metadata_ : 1;
387 bool started_recv_initial_metadata_ : 1;
388 bool completed_recv_initial_metadata_ : 1;
389 bool started_recv_trailing_metadata_ : 1;
390 bool completed_recv_trailing_metadata_ : 1;
391 // State for callback processing.
392 BatchData* recv_initial_metadata_ready_deferred_batch_ = nullptr;
393 grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
394 BatchData* recv_message_ready_deferred_batch_ = nullptr;
395 grpc_error_handle recv_message_error_ = GRPC_ERROR_NONE;
396 BatchData* recv_trailing_metadata_internal_batch_ = nullptr;
397 // NOTE: Do not move this next to the metadata bitfields above. That would
398 // save space but will also result in a data race because compiler
399 // will generate a 2 byte store which overwrites the meta-data
400 // fields upon setting this field.
401 bool retry_dispatched_ : 1;
402 };
403
404 CallData(RetryFilter* chand, const grpc_call_element_args& args);
405 ~CallData();
406
407 void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
408
409 // Returns the index into pending_batches_ to be used for batch.
410 static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
411 PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
412 void PendingBatchClear(PendingBatch* pending);
413 void MaybeClearPendingBatch(PendingBatch* pending);
414 static void FailPendingBatchInCallCombiner(void* arg,
415 grpc_error_handle error);
416 // Fails all pending batches. Does NOT yield call combiner.
417 void PendingBatchesFail(grpc_error_handle error);
418 // Returns a pointer to the first pending batch for which predicate(batch)
419 // returns true, or null if not found.
420 template <typename Predicate>
421 PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
422
423 // Caches data for send ops so that it can be retried later, if not
424 // already cached.
425 void MaybeCacheSendOpsForBatch(PendingBatch* pending);
426 void FreeCachedSendInitialMetadata();
427 // Frees cached send_message at index idx.
428 void FreeCachedSendMessage(size_t idx);
429 void FreeCachedSendTrailingMetadata();
430 void FreeAllCachedSendOpData();
431
432 // Commits the call so that no further retry attempts will be performed.
433 void RetryCommit(CallAttempt* call_attempt);
434
435 // Starts a retry after appropriate back-off.
436 void DoRetry(grpc_millis server_pushback_ms);
437 static void OnRetryTimer(void* arg, grpc_error_handle error);
438
439 RefCountedPtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall();
440
441 void CreateCallAttempt();
442
443 // Adds a closure to closures that will execute batch in the call combiner.
444 void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
445 CallCombinerClosureList* closures);
446
447 RetryFilter* chand_;
448 grpc_polling_entity* pollent_;
449 RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
450 const RetryMethodConfig* retry_policy_ = nullptr;
451 BackOff retry_backoff_;
452
453 grpc_slice path_; // Request path.
454 gpr_cycle_counter call_start_time_;
455 grpc_millis deadline_;
456 Arena* arena_;
457 grpc_call_stack* owning_call_;
458 CallCombiner* call_combiner_;
459 grpc_call_context_element* call_context_;
460
461 RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
462
463 // TODO(roth): As part of implementing hedging, we will need to maintain a
464 // list of all pending attempts, so that we can cancel them all if the call
465 // gets cancelled.
466 RefCountedPtr<CallAttempt> call_attempt_;
467
468 // LB call used when the call is commited before any CallAttempt is
469 // created.
470 // TODO(roth): Change CallAttempt logic such that once we've committed
471 // and all cached send ops have been replayed, we move the LB call
472 // from the CallAttempt here, thus creating a fast path for the
473 // remainder of the streaming call.
474 RefCountedPtr<ClientChannel::LoadBalancedCall> committed_call_;
475
476 // When are are not yet fully committed to a particular call (i.e.,
477 // either we might still retry or we have committed to the call but
478 // there are still some cached ops to be replayed on the call),
479 // batches received from above will be added to this list, and they
480 // will not be removed until we have invoked their completion callbacks.
481 size_t bytes_buffered_for_retry_ = 0;
482 PendingBatch pending_batches_[MAX_PENDING_BATCHES];
483 bool pending_send_initial_metadata_ : 1;
484 bool pending_send_message_ : 1;
485 bool pending_send_trailing_metadata_ : 1;
486
487 // Retry state.
488 bool retry_committed_ : 1;
489 bool last_attempt_got_server_pushback_ : 1;
490 int num_attempts_completed_ = 0;
491 Mutex timer_mu_;
492 Canceller* canceller_ ABSL_GUARDED_BY(timer_mu_);
493 grpc_timer retry_timer_ ABSL_GUARDED_BY(timer_mu_);
494 grpc_closure retry_closure_;
495
496 // The number of batches containing send ops that are currently in-flight
497 // on any call attempt.
498 // We hold a ref to the call stack while this is non-zero, since replay
499 // batches may not complete until after all callbacks have been returned
500 // to the surface, and we need to make sure that the call is not destroyed
501 // until all of these batches have completed.
502 // Note that we actually only need to track replay batches, but it's
503 // easier to track all batches with send ops.
504 int num_in_flight_call_attempt_send_batches_ = 0;
505
506 // Cached data for retrying send ops.
507 // send_initial_metadata
508 bool seen_send_initial_metadata_ = false;
509 grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
510 grpc_metadata_batch send_initial_metadata_;
511 uint32_t send_initial_metadata_flags_;
512 // TODO(roth): As part of implementing hedging, we'll probably need to
513 // have the LB call set a value in CallAttempt and then propagate it
514 // from CallAttempt to the parent call when we commit. Otherwise, we
515 // may leave this with a value for a peer other than the one we
516 // actually commit to.
517 gpr_atm* peer_string_;
518 // send_message
519 // When we get a send_message op, we replace the original byte stream
520 // with a CachingByteStream that caches the slices to a local buffer for
521 // use in retries.
522 // Note: We inline the cache for the first 3 send_message ops and use
523 // dynamic allocation after that. This number was essentially picked
524 // at random; it could be changed in the future to tune performance.
525 absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
526 // send_trailing_metadata
527 bool seen_send_trailing_metadata_ = false;
528 grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
529 grpc_metadata_batch send_trailing_metadata_;
530 };
531
532 //
533 // RetryFilter::CallData::CallStackDestructionBarrier
534 //
535
536 // A class to track the existence of LoadBalancedCall call stacks that
537 // we've created. We wait until all such call stacks have been
538 // destroyed before we return the on_call_stack_destruction closure up
539 // to the surface.
540 //
541 // The parent RetryFilter::CallData object holds a ref to this object.
542 // When it is destroyed, it will store the on_call_stack_destruction
543 // closure from the surface in this object and then release its ref.
544 // We also take a ref to this object for each LB call we create, and
545 // those refs are not released until the LB call stack is destroyed.
546 // When this object is destroyed, it will invoke the
547 // on_call_stack_destruction closure from the surface.
548 class RetryFilter::CallData::CallStackDestructionBarrier
549 : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
550 kUnrefCallDtor> {
551 public:
CallStackDestructionBarrier()552 CallStackDestructionBarrier() {}
553
~CallStackDestructionBarrier()554 ~CallStackDestructionBarrier() override {
555 // TODO(yashkt) : This can potentially be a Closure::Run
556 ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, GRPC_ERROR_NONE);
557 }
558
559 // Set the closure from the surface. This closure will be invoked
560 // when this object is destroyed.
set_on_call_stack_destruction(grpc_closure * on_call_stack_destruction)561 void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
562 on_call_stack_destruction_ = on_call_stack_destruction;
563 }
564
565 // Invoked to get an on_call_stack_destruction closure for a new LB call.
MakeLbCallDestructionClosure(CallData * calld)566 grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
567 Ref().release(); // Ref held by callback.
568 grpc_closure* on_lb_call_destruction_complete =
569 calld->arena_->New<grpc_closure>();
570 GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
571 OnLbCallDestructionComplete, this, nullptr);
572 return on_lb_call_destruction_complete;
573 }
574
575 private:
OnLbCallDestructionComplete(void * arg,grpc_error_handle)576 static void OnLbCallDestructionComplete(void* arg,
577 grpc_error_handle /*error*/) {
578 auto* self = static_cast<CallStackDestructionBarrier*>(arg);
579 self->Unref();
580 }
581
582 grpc_closure* on_call_stack_destruction_ = nullptr;
583 };
584
585 //
586 // RetryFilter::CallData::Canceller
587 //
588
589 class RetryFilter::CallData::Canceller {
590 public:
Canceller(CallData * calld)591 explicit Canceller(CallData* calld) : calld_(calld) {
592 GRPC_CALL_STACK_REF(calld_->owning_call_, "RetryCanceller");
593 GRPC_CLOSURE_INIT(&closure_, &Cancel, this, nullptr);
594 calld_->call_combiner_->SetNotifyOnCancel(&closure_);
595 }
596
597 private:
Cancel(void * arg,grpc_error_handle error)598 static void Cancel(void* arg, grpc_error_handle error) {
599 auto* self = static_cast<Canceller*>(arg);
600 auto* calld = self->calld_;
601 {
602 MutexLock lock(&calld->timer_mu_);
603 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
604 gpr_log(GPR_INFO,
605 "calld=%p: cancelling retry timer: error=%s self=%p "
606 "calld->canceller_=%p",
607 calld, grpc_error_std_string(error).c_str(), self,
608 calld->canceller_);
609 }
610 if (calld->canceller_ == self && error != GRPC_ERROR_NONE) {
611 calld->canceller_ = nullptr; // Checked by OnRetryTimer().
612 grpc_timer_cancel(&calld->retry_timer_);
613 calld->FreeAllCachedSendOpData();
614 GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "Canceller");
615 }
616 }
617 GRPC_CALL_STACK_UNREF(calld->owning_call_, "RetryCanceller");
618 delete self;
619 }
620
621 CallData* calld_;
622 grpc_closure closure_;
623 };
624
625 //
626 // RetryFilter::CallData::CallAttempt
627 //
628
CallAttempt(CallData * calld)629 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
630 : calld_(calld),
631 batch_payload_(calld->call_context_),
632 started_send_initial_metadata_(false),
633 completed_send_initial_metadata_(false),
634 started_send_trailing_metadata_(false),
635 completed_send_trailing_metadata_(false),
636 started_recv_initial_metadata_(false),
637 completed_recv_initial_metadata_(false),
638 started_recv_trailing_metadata_(false),
639 completed_recv_trailing_metadata_(false),
640 retry_dispatched_(false) {
641 lb_call_ = calld->CreateLoadBalancedCall();
642 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
643 gpr_log(GPR_INFO, "chand=%p calld=%p: attempt=%p: create lb_call=%p",
644 calld->chand_, calld, this, lb_call_.get());
645 }
646 }
647
FreeCachedSendOpDataAfterCommit()648 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
649 // TODO(roth): When we implement hedging, this logic will need to get
650 // a bit more complex, because there may be other (now abandoned) call
651 // attempts still using this data. We may need to do some sort of
652 // ref-counting instead.
653 if (completed_send_initial_metadata_) {
654 calld_->FreeCachedSendInitialMetadata();
655 }
656 for (size_t i = 0; i < completed_send_message_count_; ++i) {
657 calld_->FreeCachedSendMessage(i);
658 }
659 if (completed_send_trailing_metadata_) {
660 calld_->FreeCachedSendTrailingMetadata();
661 }
662 }
663
PendingBatchIsUnstarted(PendingBatch * pending)664 bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted(
665 PendingBatch* pending) {
666 // Only look at batches containing send ops, since batches containing
667 // only recv ops are always started immediately.
668 if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
669 return false;
670 }
671 if (pending->batch->send_initial_metadata &&
672 !started_send_initial_metadata_) {
673 return true;
674 }
675 if (pending->batch->send_message &&
676 started_send_message_count_ < calld_->send_messages_.size()) {
677 return true;
678 }
679 if (pending->batch->send_trailing_metadata &&
680 !started_send_trailing_metadata_) {
681 return true;
682 }
683 return false;
684 }
685
StartInternalRecvTrailingMetadata()686 void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() {
687 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
688 gpr_log(GPR_INFO,
689 "chand=%p calld=%p: call failed but recv_trailing_metadata not "
690 "started; starting it internally",
691 calld_->chand_, calld_);
692 }
693 // Create batch_data with 2 refs, since this batch will be unreffed twice:
694 // once for the recv_trailing_metadata_ready callback when the batch
695 // completes, and again when we actually get a recv_trailing_metadata
696 // op from the surface.
697 BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
698 batch_data->AddRetriableRecvTrailingMetadataOp();
699 recv_trailing_metadata_internal_batch_ = batch_data;
700 // Note: This will release the call combiner.
701 lb_call_->StartTransportStreamOpBatch(batch_data->batch());
702 }
703
704 // If there are any cached send ops that need to be replayed on the
705 // current call attempt, creates and returns a new batch to replay those ops.
706 // Otherwise, returns nullptr.
707 RetryFilter::CallData::CallAttempt::BatchData*
MaybeCreateBatchForReplay()708 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
709 BatchData* replay_batch_data = nullptr;
710 // send_initial_metadata.
711 if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ &&
712 !calld_->pending_send_initial_metadata_) {
713 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
714 gpr_log(GPR_INFO,
715 "chand=%p calld=%p: replaying previously completed "
716 "send_initial_metadata op",
717 calld_->chand_, calld_);
718 }
719 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
720 replay_batch_data->AddRetriableSendInitialMetadataOp();
721 }
722 // send_message.
723 // Note that we can only have one send_message op in flight at a time.
724 if (started_send_message_count_ < calld_->send_messages_.size() &&
725 started_send_message_count_ == completed_send_message_count_ &&
726 !calld_->pending_send_message_) {
727 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
728 gpr_log(GPR_INFO,
729 "chand=%p calld=%p: replaying previously completed "
730 "send_message op",
731 calld_->chand_, calld_);
732 }
733 if (replay_batch_data == nullptr) {
734 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
735 }
736 replay_batch_data->AddRetriableSendMessageOp();
737 }
738 // send_trailing_metadata.
739 // Note that we only add this op if we have no more send_message ops
740 // to start, since we can't send down any more send_message ops after
741 // send_trailing_metadata.
742 if (calld_->seen_send_trailing_metadata_ &&
743 started_send_message_count_ == calld_->send_messages_.size() &&
744 !started_send_trailing_metadata_ &&
745 !calld_->pending_send_trailing_metadata_) {
746 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
747 gpr_log(GPR_INFO,
748 "chand=%p calld=%p: replaying previously completed "
749 "send_trailing_metadata op",
750 calld_->chand_, calld_);
751 }
752 if (replay_batch_data == nullptr) {
753 replay_batch_data = CreateBatch(1, true /* set_on_complete */);
754 }
755 replay_batch_data->AddRetriableSendTrailingMetadataOp();
756 }
757 return replay_batch_data;
758 }
759
AddBatchesForPendingBatches(CallCombinerClosureList * closures)760 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
761 CallCombinerClosureList* closures) {
762 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) {
763 PendingBatch* pending = &calld_->pending_batches_[i];
764 grpc_transport_stream_op_batch* batch = pending->batch;
765 if (batch == nullptr) continue;
766 // Skip any batch that either (a) has already been started on this
767 // call attempt or (b) we can't start yet because we're still
768 // replaying send ops that need to be completed first.
769 // TODO(roth): Note that if any one op in the batch can't be sent
770 // yet due to ops that we're replaying, we don't start any of the ops
771 // in the batch. This is probably okay, but it could conceivably
772 // lead to increased latency in some cases -- e.g., we could delay
773 // starting a recv op due to it being in the same batch with a send
774 // op. If/when we revamp the callback protocol in
775 // transport_stream_op_batch, we may be able to fix this.
776 if (batch->send_initial_metadata && started_send_initial_metadata_) {
777 continue;
778 }
779 if (batch->send_message &&
780 completed_send_message_count_ < started_send_message_count_) {
781 continue;
782 }
783 // Note that we only start send_trailing_metadata if we have no more
784 // send_message ops to start, since we can't send down any more
785 // send_message ops after send_trailing_metadata.
786 if (batch->send_trailing_metadata &&
787 (started_send_message_count_ + batch->send_message <
788 calld_->send_messages_.size() ||
789 started_send_trailing_metadata_)) {
790 continue;
791 }
792 if (batch->recv_initial_metadata && started_recv_initial_metadata_) {
793 continue;
794 }
795 if (batch->recv_message &&
796 completed_recv_message_count_ < started_recv_message_count_) {
797 continue;
798 }
799 if (batch->recv_trailing_metadata && started_recv_trailing_metadata_) {
800 // If we previously completed a recv_trailing_metadata op
801 // initiated by StartInternalRecvTrailingMetadata(), use the
802 // result of that instead of trying to re-start this op.
803 if (GPR_UNLIKELY(recv_trailing_metadata_internal_batch_ != nullptr)) {
804 // If the batch completed, then trigger the completion callback
805 // directly, so that we return the previously returned results to
806 // the application. Otherwise, just unref the internally started
807 // batch, since we'll propagate the completion when it completes.
808 if (completed_recv_trailing_metadata_) {
809 // Batches containing recv_trailing_metadata always succeed.
810 closures->Add(
811 &recv_trailing_metadata_ready_, GRPC_ERROR_NONE,
812 "re-executing recv_trailing_metadata_ready to propagate "
813 "internally triggered result");
814 } else {
815 recv_trailing_metadata_internal_batch_->Unref();
816 }
817 recv_trailing_metadata_internal_batch_ = nullptr;
818 }
819 continue;
820 }
821 // If we're already committed, just send the batch as-is.
822 if (calld_->retry_committed_) {
823 calld_->AddClosureForBatch(batch, closures);
824 calld_->PendingBatchClear(pending);
825 continue;
826 }
827 // Create batch with the right number of callbacks.
828 const bool has_send_ops = batch->send_initial_metadata ||
829 batch->send_message ||
830 batch->send_trailing_metadata;
831 const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
832 batch->recv_message +
833 batch->recv_trailing_metadata;
834 CallAttempt::BatchData* batch_data =
835 CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
836 // Cache send ops if needed.
837 calld_->MaybeCacheSendOpsForBatch(pending);
838 // send_initial_metadata.
839 if (batch->send_initial_metadata) {
840 batch_data->AddRetriableSendInitialMetadataOp();
841 }
842 // send_message.
843 if (batch->send_message) {
844 batch_data->AddRetriableSendMessageOp();
845 }
846 // send_trailing_metadata.
847 if (batch->send_trailing_metadata) {
848 batch_data->AddRetriableSendTrailingMetadataOp();
849 }
850 // recv_initial_metadata.
851 if (batch->recv_initial_metadata) {
852 // recv_flags is only used on the server side.
853 GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
854 batch_data->AddRetriableRecvInitialMetadataOp();
855 }
856 // recv_message.
857 if (batch->recv_message) {
858 batch_data->AddRetriableRecvMessageOp();
859 }
860 // recv_trailing_metadata.
861 if (batch->recv_trailing_metadata) {
862 batch_data->AddRetriableRecvTrailingMetadataOp();
863 }
864 calld_->AddClosureForBatch(batch_data->batch(), closures);
865 // Track number of in-flight send batches.
866 // If this is the first one, take a ref to the call stack.
867 if (batch->send_initial_metadata || batch->send_message ||
868 batch->send_trailing_metadata) {
869 if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
870 GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
871 }
872 ++calld_->num_in_flight_call_attempt_send_batches_;
873 }
874 }
875 }
876
AddRetriableBatches(CallCombinerClosureList * closures)877 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
878 CallCombinerClosureList* closures) {
879 // Replay previously-returned send_* ops if needed.
880 BatchData* replay_batch_data = MaybeCreateBatchForReplay();
881 if (replay_batch_data != nullptr) {
882 calld_->AddClosureForBatch(replay_batch_data->batch(), closures);
883 // Track number of pending send batches.
884 // If this is the first one, take a ref to the call stack.
885 if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
886 GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
887 }
888 ++calld_->num_in_flight_call_attempt_send_batches_;
889 }
890 // Now add pending batches.
891 AddBatchesForPendingBatches(closures);
892 }
893
StartRetriableBatches()894 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
895 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
896 gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
897 calld_->chand_, calld_);
898 }
899 // Construct list of closures to execute, one for each pending batch.
900 CallCombinerClosureList closures;
901 AddRetriableBatches(&closures);
902 // Note: This will yield the call combiner.
903 // Start batches on LB call.
904 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
905 gpr_log(GPR_INFO,
906 "chand=%p calld=%p: starting %" PRIuPTR
907 " retriable batches on lb_call=%p",
908 calld_->chand_, calld_, closures.size(), lb_call());
909 }
910 closures.RunClosures(calld_->call_combiner_);
911 }
912
913 //
914 // RetryFilter::CallData::CallAttempt::BatchData
915 //
916
BatchData(RefCountedPtr<CallAttempt> attempt,int refcount,bool set_on_complete)917 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
918 RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
919 : RefCounted(nullptr, refcount), call_attempt_(std::move(attempt)) {
920 // TODO(roth): Consider holding this ref on the call stack in
921 // CallAttempt instead of here in BatchData. This would eliminate the
922 // need for CallData::num_in_flight_call_attempt_send_batches_.
923 // But it would require having a way to unref CallAttempt when it is
924 // no longer needed (i.e., when the call is committed and all cached
925 // send ops have been replayed and the LB call is moved into
926 // CallData::committed_call_).
927 GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "CallAttempt");
928 batch_.payload = &call_attempt_->batch_payload_;
929 if (set_on_complete) {
930 GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
931 grpc_schedule_on_exec_ctx);
932 batch_.on_complete = &on_complete_;
933 }
934 }
935
~BatchData()936 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
937 if (batch_.send_initial_metadata) {
938 grpc_metadata_batch_destroy(&call_attempt_->send_initial_metadata_);
939 }
940 if (batch_.send_trailing_metadata) {
941 grpc_metadata_batch_destroy(&call_attempt_->send_trailing_metadata_);
942 }
943 if (batch_.recv_initial_metadata) {
944 grpc_metadata_batch_destroy(&call_attempt_->recv_initial_metadata_);
945 }
946 if (batch_.recv_trailing_metadata) {
947 grpc_metadata_batch_destroy(&call_attempt_->recv_trailing_metadata_);
948 }
949 GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "CallAttempt");
950 }
951
952 void RetryFilter::CallData::CallAttempt::BatchData::
FreeCachedSendOpDataForCompletedBatch()953 FreeCachedSendOpDataForCompletedBatch() {
954 auto* calld = call_attempt_->calld_;
955 // TODO(roth): When we implement hedging, this logic will need to get
956 // a bit more complex, because there may be other (now abandoned) call
957 // attempts still using this data. We may need to do some sort of
958 // ref-counting instead.
959 if (batch_.send_initial_metadata) {
960 calld->FreeCachedSendInitialMetadata();
961 }
962 if (batch_.send_message) {
963 calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ -
964 1);
965 }
966 if (batch_.send_trailing_metadata) {
967 calld->FreeCachedSendTrailingMetadata();
968 }
969 }
970
MaybeRetry(grpc_status_code status,grpc_mdelem * server_pushback_md,bool is_lb_drop)971 bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry(
972 grpc_status_code status, grpc_mdelem* server_pushback_md, bool is_lb_drop) {
973 auto* calld = call_attempt_->calld_;
974 // LB drops always inhibit retries.
975 if (is_lb_drop) return false;
976 // Get retry policy.
977 if (calld->retry_policy_ == nullptr) return false;
978 // If we've already dispatched a retry from this call, return true.
979 // This catches the case where the batch has multiple callbacks
980 // (i.e., it includes either recv_message or recv_initial_metadata).
981 if (call_attempt_->retry_dispatched_) {
982 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
983 gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched",
984 calld->chand_, calld);
985 }
986 return true;
987 }
988 // Check status.
989 if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
990 if (calld->retry_throttle_data_ != nullptr) {
991 calld->retry_throttle_data_->RecordSuccess();
992 }
993 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
994 gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", calld->chand_,
995 calld);
996 }
997 return false;
998 }
999 // Status is not OK. Check whether the status is retryable.
1000 if (!calld->retry_policy_->retryable_status_codes().Contains(status)) {
1001 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1002 gpr_log(GPR_INFO,
1003 "chand=%p calld=%p: status %s not configured as retryable",
1004 calld->chand_, calld, grpc_status_code_to_string(status));
1005 }
1006 return false;
1007 }
1008 // Record the failure and check whether retries are throttled.
1009 // Note that it's important for this check to come after the status
1010 // code check above, since we should only record failures whose statuses
1011 // match the configured retryable status codes, so that we don't count
1012 // things like failures due to malformed requests (INVALID_ARGUMENT).
1013 // Conversely, it's important for this to come before the remaining
1014 // checks, so that we don't fail to record failures due to other factors.
1015 if (calld->retry_throttle_data_ != nullptr &&
1016 !calld->retry_throttle_data_->RecordFailure()) {
1017 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1018 gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", calld->chand_,
1019 calld);
1020 }
1021 return false;
1022 }
1023 // Check whether the call is committed.
1024 if (calld->retry_committed_) {
1025 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1026 gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed",
1027 calld->chand_, calld);
1028 }
1029 return false;
1030 }
1031 // Check whether we have retries remaining.
1032 ++calld->num_attempts_completed_;
1033 if (calld->num_attempts_completed_ >= calld->retry_policy_->max_attempts()) {
1034 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1035 gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts",
1036 calld->chand_, calld, calld->retry_policy_->max_attempts());
1037 }
1038 return false;
1039 }
1040 // Check server push-back.
1041 grpc_millis server_pushback_ms = -1;
1042 if (server_pushback_md != nullptr) {
1043 // If the value is "-1" or any other unparseable string, we do not retry.
1044 uint32_t ms;
1045 if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1046 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1047 gpr_log(GPR_INFO,
1048 "chand=%p calld=%p: not retrying due to server push-back",
1049 calld->chand_, calld);
1050 }
1051 return false;
1052 } else {
1053 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1054 gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1055 calld->chand_, calld, ms);
1056 }
1057 server_pushback_ms = static_cast<grpc_millis>(ms);
1058 }
1059 }
1060 // Do retry.
1061 call_attempt_->retry_dispatched_ = true;
1062 calld->DoRetry(server_pushback_ms);
1063 return true;
1064 }
1065
1066 //
1067 // recv_initial_metadata callback handling
1068 //
1069
1070 void RetryFilter::CallData::CallAttempt::BatchData::
InvokeRecvInitialMetadataCallback(void * arg,grpc_error_handle error)1071 InvokeRecvInitialMetadataCallback(void* arg, grpc_error_handle error) {
1072 auto* batch_data = static_cast<CallAttempt::BatchData*>(arg);
1073 auto* call_attempt = batch_data->call_attempt_.get();
1074 // Find pending batch.
1075 PendingBatch* pending = call_attempt->calld_->PendingBatchFind(
1076 "invoking recv_initial_metadata_ready for",
1077 [](grpc_transport_stream_op_batch* batch) {
1078 return batch->recv_initial_metadata &&
1079 batch->payload->recv_initial_metadata
1080 .recv_initial_metadata_ready != nullptr;
1081 });
1082 GPR_ASSERT(pending != nullptr);
1083 // Return metadata.
1084 grpc_metadata_batch_move(
1085 &call_attempt->recv_initial_metadata_,
1086 pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1087 // Update bookkeeping.
1088 // Note: Need to do this before invoking the callback, since invoking
1089 // the callback will result in yielding the call combiner.
1090 grpc_closure* recv_initial_metadata_ready =
1091 pending->batch->payload->recv_initial_metadata
1092 .recv_initial_metadata_ready;
1093 pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1094 nullptr;
1095 call_attempt->calld_->MaybeClearPendingBatch(pending);
1096 batch_data->Unref();
1097 // Invoke callback.
1098 Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
1099 GRPC_ERROR_REF(error));
1100 }
1101
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1102 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1103 void* arg, grpc_error_handle error) {
1104 CallAttempt::BatchData* batch_data =
1105 static_cast<CallAttempt::BatchData*>(arg);
1106 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1107 CallData* calld = call_attempt->calld_;
1108 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1109 gpr_log(GPR_INFO,
1110 "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1111 calld->chand_, calld, grpc_error_std_string(error).c_str());
1112 }
1113 call_attempt->completed_recv_initial_metadata_ = true;
1114 // If a retry was already dispatched, then we're not going to use the
1115 // result of this recv_initial_metadata op, so do nothing.
1116 if (call_attempt->retry_dispatched_) {
1117 GRPC_CALL_COMBINER_STOP(
1118 calld->call_combiner_,
1119 "recv_initial_metadata_ready after retry dispatched");
1120 return;
1121 }
1122 if (!calld->retry_committed_) {
1123 // If we got an error or a Trailers-Only response and have not yet gotten
1124 // the recv_trailing_metadata_ready callback, then defer propagating this
1125 // callback back to the surface. We can evaluate whether to retry when
1126 // recv_trailing_metadata comes back.
1127 if (GPR_UNLIKELY((call_attempt->trailing_metadata_available_ ||
1128 error != GRPC_ERROR_NONE) &&
1129 !call_attempt->completed_recv_trailing_metadata_)) {
1130 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1131 gpr_log(GPR_INFO,
1132 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1133 "(Trailers-Only)",
1134 calld->chand_, calld);
1135 }
1136 call_attempt->recv_initial_metadata_ready_deferred_batch_ = batch_data;
1137 call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error);
1138 if (!call_attempt->started_recv_trailing_metadata_) {
1139 // recv_trailing_metadata not yet started by application; start it
1140 // ourselves to get status.
1141 call_attempt->StartInternalRecvTrailingMetadata();
1142 } else {
1143 GRPC_CALL_COMBINER_STOP(
1144 calld->call_combiner_,
1145 "recv_initial_metadata_ready trailers-only or error");
1146 }
1147 return;
1148 }
1149 // Received valid initial metadata, so commit the call.
1150 calld->RetryCommit(call_attempt);
1151 }
1152 // Invoke the callback to return the result to the surface.
1153 // Manually invoking a callback function; it does not take ownership of error.
1154 InvokeRecvInitialMetadataCallback(batch_data, error);
1155 }
1156
1157 //
1158 // recv_message callback handling
1159 //
1160
InvokeRecvMessageCallback(void * arg,grpc_error_handle error)1161 void RetryFilter::CallData::CallAttempt::BatchData::InvokeRecvMessageCallback(
1162 void* arg, grpc_error_handle error) {
1163 CallAttempt::BatchData* batch_data =
1164 static_cast<CallAttempt::BatchData*>(arg);
1165 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1166 CallData* calld = call_attempt->calld_;
1167 // Find pending op.
1168 PendingBatch* pending = calld->PendingBatchFind(
1169 "invoking recv_message_ready for",
1170 [](grpc_transport_stream_op_batch* batch) {
1171 return batch->recv_message &&
1172 batch->payload->recv_message.recv_message_ready != nullptr;
1173 });
1174 GPR_ASSERT(pending != nullptr);
1175 // Return payload.
1176 *pending->batch->payload->recv_message.recv_message =
1177 std::move(call_attempt->recv_message_);
1178 // Update bookkeeping.
1179 // Note: Need to do this before invoking the callback, since invoking
1180 // the callback will result in yielding the call combiner.
1181 grpc_closure* recv_message_ready =
1182 pending->batch->payload->recv_message.recv_message_ready;
1183 pending->batch->payload->recv_message.recv_message_ready = nullptr;
1184 calld->MaybeClearPendingBatch(pending);
1185 batch_data->Unref();
1186 // Invoke callback.
1187 Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
1188 }
1189
RecvMessageReady(void * arg,grpc_error_handle error)1190 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1191 void* arg, grpc_error_handle error) {
1192 CallAttempt::BatchData* batch_data =
1193 static_cast<CallAttempt::BatchData*>(arg);
1194 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1195 CallData* calld = call_attempt->calld_;
1196 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1197 gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1198 calld->chand_, calld, grpc_error_std_string(error).c_str());
1199 }
1200 ++call_attempt->completed_recv_message_count_;
1201 // If a retry was already dispatched, then we're not going to use the
1202 // result of this recv_message op, so do nothing.
1203 if (call_attempt->retry_dispatched_) {
1204 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1205 "recv_message_ready after retry dispatched");
1206 return;
1207 }
1208 if (!calld->retry_committed_) {
1209 // If we got an error or the payload was nullptr and we have not yet gotten
1210 // the recv_trailing_metadata_ready callback, then defer propagating this
1211 // callback back to the surface. We can evaluate whether to retry when
1212 // recv_trailing_metadata comes back.
1213 if (GPR_UNLIKELY((call_attempt->recv_message_ == nullptr ||
1214 error != GRPC_ERROR_NONE) &&
1215 !call_attempt->completed_recv_trailing_metadata_)) {
1216 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1217 gpr_log(GPR_INFO,
1218 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1219 "message and recv_trailing_metadata pending)",
1220 calld->chand_, calld);
1221 }
1222 call_attempt->recv_message_ready_deferred_batch_ = batch_data;
1223 call_attempt->recv_message_error_ = GRPC_ERROR_REF(error);
1224 if (!call_attempt->started_recv_trailing_metadata_) {
1225 // recv_trailing_metadata not yet started by application; start it
1226 // ourselves to get status.
1227 call_attempt->StartInternalRecvTrailingMetadata();
1228 } else {
1229 GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1230 "recv_message_ready null");
1231 }
1232 return;
1233 }
1234 // Received a valid message, so commit the call.
1235 calld->RetryCommit(call_attempt);
1236 }
1237 // Invoke the callback to return the result to the surface.
1238 // Manually invoking a callback function; it does not take ownership of error.
1239 InvokeRecvMessageCallback(batch_data, error);
1240 }
1241
1242 //
1243 // recv_trailing_metadata handling
1244 //
1245
1246 namespace {
1247
1248 // Sets *status, *server_pushback_md, and *is_lb_drop based on md_batch
1249 // and error.
GetCallStatus(grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error_handle error,grpc_status_code * status,grpc_mdelem ** server_pushback_md,bool * is_lb_drop)1250 void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
1251 grpc_error_handle error, grpc_status_code* status,
1252 grpc_mdelem** server_pushback_md, bool* is_lb_drop) {
1253 if (error != GRPC_ERROR_NONE) {
1254 grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
1255 intptr_t value = 0;
1256 if (grpc_error_get_int(error, GRPC_ERROR_INT_LB_POLICY_DROP, &value) &&
1257 value != 0) {
1258 *is_lb_drop = true;
1259 }
1260 } else {
1261 GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1262 *status =
1263 grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1264 if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1265 *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1266 }
1267 }
1268 GRPC_ERROR_UNREF(error);
1269 }
1270
1271 } // namespace
1272
1273 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosureForRecvTrailingMetadataReady(grpc_error_handle error,CallCombinerClosureList * closures)1274 AddClosureForRecvTrailingMetadataReady(grpc_error_handle error,
1275 CallCombinerClosureList* closures) {
1276 auto* calld = call_attempt_->calld_;
1277 // Find pending batch.
1278 PendingBatch* pending = calld->PendingBatchFind(
1279 "invoking recv_trailing_metadata for",
1280 [](grpc_transport_stream_op_batch* batch) {
1281 return batch->recv_trailing_metadata &&
1282 batch->payload->recv_trailing_metadata
1283 .recv_trailing_metadata_ready != nullptr;
1284 });
1285 // If we generated the recv_trailing_metadata op internally via
1286 // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
1287 if (pending == nullptr) {
1288 GRPC_ERROR_UNREF(error);
1289 return;
1290 }
1291 // Return metadata.
1292 grpc_metadata_batch_move(
1293 &call_attempt_->recv_trailing_metadata_,
1294 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1295 // Add closure.
1296 closures->Add(pending->batch->payload->recv_trailing_metadata
1297 .recv_trailing_metadata_ready,
1298 error, "recv_trailing_metadata_ready for pending batch");
1299 // Update bookkeeping.
1300 pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1301 nullptr;
1302 calld->MaybeClearPendingBatch(pending);
1303 }
1304
1305 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList * closures)1306 AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList* closures) {
1307 if (batch_.recv_trailing_metadata) {
1308 // Add closure for deferred recv_initial_metadata_ready.
1309 if (GPR_UNLIKELY(
1310 call_attempt_->recv_initial_metadata_ready_deferred_batch_ !=
1311 nullptr)) {
1312 GRPC_CLOSURE_INIT(
1313 &call_attempt_->recv_initial_metadata_ready_,
1314 InvokeRecvInitialMetadataCallback,
1315 call_attempt_->recv_initial_metadata_ready_deferred_batch_,
1316 grpc_schedule_on_exec_ctx);
1317 closures->Add(&call_attempt_->recv_initial_metadata_ready_,
1318 call_attempt_->recv_initial_metadata_error_,
1319 "resuming recv_initial_metadata_ready");
1320 call_attempt_->recv_initial_metadata_ready_deferred_batch_ = nullptr;
1321 }
1322 // Add closure for deferred recv_message_ready.
1323 if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
1324 nullptr)) {
1325 GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_,
1326 InvokeRecvMessageCallback,
1327 call_attempt_->recv_message_ready_deferred_batch_,
1328 grpc_schedule_on_exec_ctx);
1329 closures->Add(&call_attempt_->recv_message_ready_,
1330 call_attempt_->recv_message_error_,
1331 "resuming recv_message_ready");
1332 call_attempt_->recv_message_ready_deferred_batch_ = nullptr;
1333 }
1334 }
1335 }
1336
1337 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresToFailUnstartedPendingBatches(grpc_error_handle error,CallCombinerClosureList * closures)1338 AddClosuresToFailUnstartedPendingBatches(
1339 grpc_error_handle error, CallCombinerClosureList* closures) {
1340 auto* calld = call_attempt_->calld_;
1341 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1342 PendingBatch* pending = &calld->pending_batches_[i];
1343 if (call_attempt_->PendingBatchIsUnstarted(pending)) {
1344 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1345 gpr_log(GPR_INFO,
1346 "chand=%p calld=%p: failing unstarted pending batch at "
1347 "index %" PRIuPTR,
1348 calld->chand_, calld, i);
1349 }
1350 closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1351 "failing on_complete for pending batch");
1352 pending->batch->on_complete = nullptr;
1353 calld->MaybeClearPendingBatch(pending);
1354 }
1355 }
1356 GRPC_ERROR_UNREF(error);
1357 }
1358
RunClosuresForCompletedCall(grpc_error_handle error)1359 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1360 grpc_error_handle error) {
1361 // Construct list of closures to execute.
1362 CallCombinerClosureList closures;
1363 // First, add closure for recv_trailing_metadata_ready.
1364 AddClosureForRecvTrailingMetadataReady(GRPC_ERROR_REF(error), &closures);
1365 // If there are deferred recv_initial_metadata_ready or recv_message_ready
1366 // callbacks, add them to closures.
1367 AddClosuresForDeferredRecvCallbacks(&closures);
1368 // Add closures to fail any pending batches that have not yet been started.
1369 AddClosuresToFailUnstartedPendingBatches(GRPC_ERROR_REF(error), &closures);
1370 // Schedule all of the closures identified above.
1371 // Note: This will release the call combiner.
1372 closures.RunClosures(call_attempt_->calld_->call_combiner_);
1373 // Don't need batch_data anymore.
1374 Unref();
1375 GRPC_ERROR_UNREF(error);
1376 }
1377
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1378 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1379 void* arg, grpc_error_handle error) {
1380 CallAttempt::BatchData* batch_data =
1381 static_cast<CallAttempt::BatchData*>(arg);
1382 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1383 CallData* calld = call_attempt->calld_;
1384 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1385 gpr_log(GPR_INFO,
1386 "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1387 calld->chand_, calld, grpc_error_std_string(error).c_str());
1388 }
1389 call_attempt->completed_recv_trailing_metadata_ = true;
1390 // Get the call's status and check for server pushback metadata.
1391 grpc_status_code status = GRPC_STATUS_OK;
1392 grpc_mdelem* server_pushback_md = nullptr;
1393 grpc_metadata_batch* md_batch =
1394 batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1395 bool is_lb_drop = false;
1396 GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status,
1397 &server_pushback_md, &is_lb_drop);
1398 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1399 gpr_log(
1400 GPR_INFO, "chand=%p calld=%p: call finished, status=%s is_lb_drop=%d",
1401 calld->chand_, calld, grpc_status_code_to_string(status), is_lb_drop);
1402 }
1403 // Check if we should retry.
1404 if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) {
1405 // Unref batch_data for deferred recv_initial_metadata_ready or
1406 // recv_message_ready callbacks, if any.
1407 if (call_attempt->recv_initial_metadata_ready_deferred_batch_ != nullptr) {
1408 GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_);
1409 batch_data->Unref();
1410 }
1411 if (call_attempt->recv_message_ready_deferred_batch_ != nullptr) {
1412 GRPC_ERROR_UNREF(call_attempt->recv_message_error_);
1413 batch_data->Unref();
1414 }
1415 batch_data->Unref();
1416 return;
1417 }
1418 // Not retrying, so commit the call.
1419 calld->RetryCommit(call_attempt);
1420 // Run any necessary closures.
1421 batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error));
1422 }
1423
1424 //
1425 // on_complete callback handling
1426 //
1427
1428 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForCompletedPendingBatch(grpc_error_handle error,CallCombinerClosureList * closures)1429 AddClosuresForCompletedPendingBatch(grpc_error_handle error,
1430 CallCombinerClosureList* closures) {
1431 auto* calld = call_attempt_->calld_;
1432 PendingBatch* pending = calld->PendingBatchFind(
1433 "completed", [this](grpc_transport_stream_op_batch* batch) {
1434 // Match the pending batch with the same set of send ops as the
1435 // batch we've just completed.
1436 return batch->on_complete != nullptr &&
1437 batch_.send_initial_metadata == batch->send_initial_metadata &&
1438 batch_.send_message == batch->send_message &&
1439 batch_.send_trailing_metadata == batch->send_trailing_metadata;
1440 });
1441 // If batch_data is a replay batch, then there will be no pending
1442 // batch to complete.
1443 if (pending == nullptr) {
1444 GRPC_ERROR_UNREF(error);
1445 return;
1446 }
1447 // Add closure.
1448 closures->Add(pending->batch->on_complete, error,
1449 "on_complete for pending batch");
1450 pending->batch->on_complete = nullptr;
1451 calld->MaybeClearPendingBatch(pending);
1452 }
1453
1454 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList * closures)1455 AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1456 auto* calld = call_attempt_->calld_;
1457 // We don't check send_initial_metadata here, because that op will always
1458 // be started as soon as it is received from the surface, so it will
1459 // never need to be started at this point.
1460 bool have_pending_send_message_ops =
1461 call_attempt_->started_send_message_count_ < calld->send_messages_.size();
1462 bool have_pending_send_trailing_metadata_op =
1463 calld->seen_send_trailing_metadata_ &&
1464 !call_attempt_->started_send_trailing_metadata_;
1465 if (!have_pending_send_message_ops &&
1466 !have_pending_send_trailing_metadata_op) {
1467 for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1468 PendingBatch* pending = &calld->pending_batches_[i];
1469 grpc_transport_stream_op_batch* batch = pending->batch;
1470 if (batch == nullptr || pending->send_ops_cached) continue;
1471 if (batch->send_message) have_pending_send_message_ops = true;
1472 if (batch->send_trailing_metadata) {
1473 have_pending_send_trailing_metadata_op = true;
1474 }
1475 }
1476 }
1477 if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1478 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1479 gpr_log(GPR_INFO,
1480 "chand=%p calld=%p: starting next batch for pending send op(s)",
1481 calld->chand_, calld);
1482 }
1483 call_attempt_->AddRetriableBatches(closures);
1484 }
1485 }
1486
OnComplete(void * arg,grpc_error_handle error)1487 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1488 void* arg, grpc_error_handle error) {
1489 CallAttempt::BatchData* batch_data =
1490 static_cast<CallAttempt::BatchData*>(arg);
1491 CallAttempt* call_attempt = batch_data->call_attempt_.get();
1492 CallData* calld = call_attempt->calld_;
1493 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1494 gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1495 calld->chand_, calld, grpc_error_std_string(error).c_str(),
1496 grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
1497 }
1498 // Update bookkeeping in call_attempt.
1499 if (batch_data->batch_.send_initial_metadata) {
1500 call_attempt->completed_send_initial_metadata_ = true;
1501 }
1502 if (batch_data->batch_.send_message) {
1503 ++call_attempt->completed_send_message_count_;
1504 }
1505 if (batch_data->batch_.send_trailing_metadata) {
1506 call_attempt->completed_send_trailing_metadata_ = true;
1507 }
1508 // If the call is committed, free cached data for send ops that we've just
1509 // completed.
1510 if (calld->retry_committed_) {
1511 batch_data->FreeCachedSendOpDataForCompletedBatch();
1512 }
1513 // Construct list of closures to execute.
1514 CallCombinerClosureList closures;
1515 // If a retry was already dispatched, that means we saw
1516 // recv_trailing_metadata before this, so we do nothing here.
1517 // Otherwise, invoke the callback to return the result to the surface.
1518 if (!call_attempt->retry_dispatched_) {
1519 // Add closure for the completed pending batch, if any.
1520 batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error),
1521 &closures);
1522 // If needed, add a callback to start any replay or pending send ops on
1523 // the LB call.
1524 if (!call_attempt->completed_recv_trailing_metadata_) {
1525 batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1526 }
1527 }
1528 // Track number of in-flight send batches and determine if this was the
1529 // last one.
1530 --calld->num_in_flight_call_attempt_send_batches_;
1531 const bool last_send_batch_complete =
1532 calld->num_in_flight_call_attempt_send_batches_ == 0;
1533 // Don't need batch_data anymore.
1534 batch_data->Unref();
1535 // Schedule all of the closures identified above.
1536 // Note: This yields the call combiner.
1537 closures.RunClosures(calld->call_combiner_);
1538 // If this was the last in-flight send batch, unref the call stack.
1539 if (last_send_batch_complete) {
1540 GRPC_CALL_STACK_UNREF(calld->owning_call_, "retriable_send_batches");
1541 }
1542 }
1543
1544 //
1545 // retriable batch construction
1546 //
1547
1548 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendInitialMetadataOp()1549 AddRetriableSendInitialMetadataOp() {
1550 auto* calld = call_attempt_->calld_;
1551 // Maps the number of retries to the corresponding metadata value slice.
1552 const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
1553 &GRPC_MDSTR_3, &GRPC_MDSTR_4};
1554 // We need to make a copy of the metadata batch for each attempt, since
1555 // the filters in the subchannel stack may modify this batch, and we don't
1556 // want those modifications to be passed forward to subsequent attempts.
1557 //
1558 // If we've already completed one or more attempts, add the
1559 // grpc-retry-attempts header.
1560 call_attempt_->send_initial_metadata_storage_ =
1561 static_cast<grpc_linked_mdelem*>(
1562 calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1563 (calld->send_initial_metadata_.list.count +
1564 (calld->num_attempts_completed_ > 0))));
1565 grpc_metadata_batch_copy(&calld->send_initial_metadata_,
1566 &call_attempt_->send_initial_metadata_,
1567 call_attempt_->send_initial_metadata_storage_);
1568 if (GPR_UNLIKELY(call_attempt_->send_initial_metadata_.idx.named
1569 .grpc_previous_rpc_attempts != nullptr)) {
1570 grpc_metadata_batch_remove(&call_attempt_->send_initial_metadata_,
1571 GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
1572 }
1573 if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
1574 grpc_mdelem retry_md = grpc_mdelem_create(
1575 GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
1576 *retry_count_strings[calld->num_attempts_completed_ - 1], nullptr);
1577 grpc_error_handle error = grpc_metadata_batch_add_tail(
1578 &call_attempt_->send_initial_metadata_,
1579 &call_attempt_->send_initial_metadata_storage_
1580 [calld->send_initial_metadata_.list.count],
1581 retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
1582 if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
1583 gpr_log(GPR_ERROR, "error adding retry metadata: %s",
1584 grpc_error_std_string(error).c_str());
1585 GPR_ASSERT(false);
1586 }
1587 }
1588 call_attempt_->started_send_initial_metadata_ = true;
1589 batch_.send_initial_metadata = true;
1590 batch_.payload->send_initial_metadata.send_initial_metadata =
1591 &call_attempt_->send_initial_metadata_;
1592 batch_.payload->send_initial_metadata.send_initial_metadata_flags =
1593 calld->send_initial_metadata_flags_;
1594 batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
1595 }
1596
1597 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendMessageOp()1598 AddRetriableSendMessageOp() {
1599 auto* calld = call_attempt_->calld_;
1600 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1601 gpr_log(GPR_INFO,
1602 "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
1603 calld->chand_, calld, call_attempt_->started_send_message_count_);
1604 }
1605 ByteStreamCache* cache =
1606 calld->send_messages_[call_attempt_->started_send_message_count_];
1607 ++call_attempt_->started_send_message_count_;
1608 call_attempt_->send_message_.Init(cache);
1609 batch_.send_message = true;
1610 batch_.payload->send_message.send_message.reset(
1611 call_attempt_->send_message_.get());
1612 }
1613
1614 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendTrailingMetadataOp()1615 AddRetriableSendTrailingMetadataOp() {
1616 auto* calld = call_attempt_->calld_;
1617 // We need to make a copy of the metadata batch for each attempt, since
1618 // the filters in the subchannel stack may modify this batch, and we don't
1619 // want those modifications to be passed forward to subsequent attempts.
1620 call_attempt_->send_trailing_metadata_storage_ =
1621 static_cast<grpc_linked_mdelem*>(
1622 calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1623 calld->send_trailing_metadata_.list.count));
1624 grpc_metadata_batch_copy(&calld->send_trailing_metadata_,
1625 &call_attempt_->send_trailing_metadata_,
1626 call_attempt_->send_trailing_metadata_storage_);
1627 call_attempt_->started_send_trailing_metadata_ = true;
1628 batch_.send_trailing_metadata = true;
1629 batch_.payload->send_trailing_metadata.send_trailing_metadata =
1630 &call_attempt_->send_trailing_metadata_;
1631 }
1632
1633 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvInitialMetadataOp()1634 AddRetriableRecvInitialMetadataOp() {
1635 call_attempt_->started_recv_initial_metadata_ = true;
1636 batch_.recv_initial_metadata = true;
1637 grpc_metadata_batch_init(&call_attempt_->recv_initial_metadata_);
1638 batch_.payload->recv_initial_metadata.recv_initial_metadata =
1639 &call_attempt_->recv_initial_metadata_;
1640 batch_.payload->recv_initial_metadata.trailing_metadata_available =
1641 &call_attempt_->trailing_metadata_available_;
1642 GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_,
1643 RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx);
1644 batch_.payload->recv_initial_metadata.recv_initial_metadata_ready =
1645 &call_attempt_->recv_initial_metadata_ready_;
1646 }
1647
1648 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvMessageOp()1649 AddRetriableRecvMessageOp() {
1650 ++call_attempt_->started_recv_message_count_;
1651 batch_.recv_message = true;
1652 batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
1653 GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
1654 grpc_schedule_on_exec_ctx);
1655 batch_.payload->recv_message.recv_message_ready =
1656 &call_attempt_->recv_message_ready_;
1657 }
1658
1659 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvTrailingMetadataOp()1660 AddRetriableRecvTrailingMetadataOp() {
1661 call_attempt_->started_recv_trailing_metadata_ = true;
1662 batch_.recv_trailing_metadata = true;
1663 grpc_metadata_batch_init(&call_attempt_->recv_trailing_metadata_);
1664 batch_.payload->recv_trailing_metadata.recv_trailing_metadata =
1665 &call_attempt_->recv_trailing_metadata_;
1666 batch_.payload->recv_trailing_metadata.collect_stats =
1667 &call_attempt_->collect_stats_;
1668 GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_,
1669 RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx);
1670 batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1671 &call_attempt_->recv_trailing_metadata_ready_;
1672 }
1673
1674 //
1675 // CallData vtable functions
1676 //
1677
Init(grpc_call_element * elem,const grpc_call_element_args * args)1678 grpc_error_handle RetryFilter::CallData::Init(
1679 grpc_call_element* elem, const grpc_call_element_args* args) {
1680 auto* chand = static_cast<RetryFilter*>(elem->channel_data);
1681 new (elem->call_data) CallData(chand, *args);
1682 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1683 gpr_log(GPR_INFO, "chand=%p: created call=%p", chand, elem->call_data);
1684 }
1685 return GRPC_ERROR_NONE;
1686 }
1687
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1688 void RetryFilter::CallData::Destroy(grpc_call_element* elem,
1689 const grpc_call_final_info* /*final_info*/,
1690 grpc_closure* then_schedule_closure) {
1691 auto* calld = static_cast<CallData*>(elem->call_data);
1692 // Save our ref to the CallStackDestructionBarrier until after our
1693 // dtor is invoked.
1694 RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
1695 std::move(calld->call_stack_destruction_barrier_);
1696 calld->~CallData();
1697 // Now set the callback in the CallStackDestructionBarrier object,
1698 // right before we release our ref to it (implicitly upon returning).
1699 // The callback will be invoked when the CallStackDestructionBarrier
1700 // is destroyed.
1701 call_stack_destruction_barrier->set_on_call_stack_destruction(
1702 then_schedule_closure);
1703 }
1704
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1705 void RetryFilter::CallData::StartTransportStreamOpBatch(
1706 grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1707 auto* calld = static_cast<CallData*>(elem->call_data);
1708 calld->StartTransportStreamOpBatch(batch);
1709 }
1710
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1711 void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
1712 grpc_polling_entity* pollent) {
1713 auto* calld = static_cast<CallData*>(elem->call_data);
1714 calld->pollent_ = pollent;
1715 }
1716
1717 //
1718 // CallData implementation
1719 //
1720
GetRetryPolicy(const grpc_call_context_element * context)1721 const RetryMethodConfig* GetRetryPolicy(
1722 const grpc_call_context_element* context) {
1723 if (context == nullptr) return nullptr;
1724 auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1725 context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1726 if (svc_cfg_call_data == nullptr) return nullptr;
1727 return static_cast<const RetryMethodConfig*>(
1728 svc_cfg_call_data->GetMethodParsedConfig(
1729 RetryServiceConfigParser::ParserIndex()));
1730 }
1731
CallData(RetryFilter * chand,const grpc_call_element_args & args)1732 RetryFilter::CallData::CallData(RetryFilter* chand,
1733 const grpc_call_element_args& args)
1734 : chand_(chand),
1735 retry_throttle_data_(chand->retry_throttle_data_),
1736 retry_policy_(GetRetryPolicy(args.context)),
1737 retry_backoff_(
1738 BackOff::Options()
1739 .set_initial_backoff(retry_policy_ == nullptr
1740 ? 0
1741 : retry_policy_->initial_backoff())
1742 .set_multiplier(retry_policy_ == nullptr
1743 ? 0
1744 : retry_policy_->backoff_multiplier())
1745 .set_jitter(RETRY_BACKOFF_JITTER)
1746 .set_max_backoff(
1747 retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())),
1748 path_(grpc_slice_ref_internal(args.path)),
1749 call_start_time_(args.start_time),
1750 deadline_(args.deadline),
1751 arena_(args.arena),
1752 owning_call_(args.call_stack),
1753 call_combiner_(args.call_combiner),
1754 call_context_(args.context),
1755 call_stack_destruction_barrier_(
1756 arena_->New<CallStackDestructionBarrier>()),
1757 pending_send_initial_metadata_(false),
1758 pending_send_message_(false),
1759 pending_send_trailing_metadata_(false),
1760 retry_committed_(false),
1761 last_attempt_got_server_pushback_(false) {}
1762
~CallData()1763 RetryFilter::CallData::~CallData() {
1764 grpc_slice_unref_internal(path_);
1765 // Make sure there are no remaining pending batches.
1766 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1767 GPR_ASSERT(pending_batches_[i].batch == nullptr);
1768 }
1769 }
1770
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)1771 void RetryFilter::CallData::StartTransportStreamOpBatch(
1772 grpc_transport_stream_op_batch* batch) {
1773 // If we have an LB call, delegate to the LB call.
1774 if (committed_call_ != nullptr) {
1775 // Note: This will release the call combiner.
1776 committed_call_->StartTransportStreamOpBatch(batch);
1777 return;
1778 }
1779 // Handle cancellation.
1780 if (GPR_UNLIKELY(batch->cancel_stream)) {
1781 grpc_error_handle cancel_error = batch->payload->cancel_stream.cancel_error;
1782 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1783 gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_,
1784 this, grpc_error_std_string(cancel_error).c_str());
1785 }
1786 // If we have a current call attempt, commit the call, then send
1787 // the cancellation down to that attempt. When the call fails, it
1788 // will not be retried, because we have committed it here.
1789 if (call_attempt_ != nullptr) {
1790 RetryCommit(call_attempt_.get());
1791 // Note: This will release the call combiner.
1792 call_attempt_->lb_call()->StartTransportStreamOpBatch(batch);
1793 return;
1794 }
1795 // Fail pending batches.
1796 PendingBatchesFail(GRPC_ERROR_REF(cancel_error));
1797 // Note: This will release the call combiner.
1798 grpc_transport_stream_op_batch_finish_with_failure(
1799 batch, GRPC_ERROR_REF(cancel_error), call_combiner_);
1800 return;
1801 }
1802 // Add the batch to the pending list.
1803 PendingBatch* pending = PendingBatchesAdd(batch);
1804 if (call_attempt_ == nullptr) {
1805 // If this is the first batch and retries are already committed
1806 // (e.g., if this batch put the call above the buffer size limit), then
1807 // immediately create an LB call and delegate the batch to it. This
1808 // avoids the overhead of unnecessarily allocating a CallAttempt
1809 // object or caching any of the send op data.
1810 if (num_attempts_completed_ == 0 && retry_committed_) {
1811 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1812 gpr_log(GPR_INFO,
1813 "chand=%p calld=%p: retry committed before first attempt; "
1814 "creating LB call",
1815 chand_, this);
1816 }
1817 PendingBatchClear(pending);
1818 committed_call_ = CreateLoadBalancedCall();
1819 committed_call_->StartTransportStreamOpBatch(batch);
1820 return;
1821 }
1822 // We do not yet have a call attempt, so create one.
1823 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1824 gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
1825 this);
1826 }
1827 CreateCallAttempt();
1828 return;
1829 }
1830 // Send batches to call attempt.
1831 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1832 gpr_log(GPR_INFO,
1833 "chand=%p calld=%p: starting batch on attempt=%p lb_call=%p",
1834 chand_, this, call_attempt_.get(), call_attempt_->lb_call());
1835 }
1836 call_attempt_->StartRetriableBatches();
1837 }
1838
1839 RefCountedPtr<ClientChannel::LoadBalancedCall>
CreateLoadBalancedCall()1840 RetryFilter::CallData::CreateLoadBalancedCall() {
1841 grpc_call_element_args args = {owning_call_, nullptr, call_context_,
1842 path_, call_start_time_, deadline_,
1843 arena_, call_combiner_};
1844 return chand_->client_channel_->CreateLoadBalancedCall(
1845 args, pollent_,
1846 // This callback holds a ref to the CallStackDestructionBarrier
1847 // object until the LB call is destroyed.
1848 call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this));
1849 }
1850
CreateCallAttempt()1851 void RetryFilter::CallData::CreateCallAttempt() {
1852 call_attempt_.reset(arena_->New<CallAttempt>(this));
1853 call_attempt_->StartRetriableBatches();
1854 // TODO(roth): When implementing hedging, change this to start a timer
1855 // for the next hedging attempt.
1856 }
1857
1858 namespace {
1859
StartBatchInCallCombiner(void * arg,grpc_error_handle)1860 void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
1861 grpc_transport_stream_op_batch* batch =
1862 static_cast<grpc_transport_stream_op_batch*>(arg);
1863 auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>(
1864 batch->handler_private.extra_arg);
1865 // Note: This will release the call combiner.
1866 lb_call->StartTransportStreamOpBatch(batch);
1867 }
1868
1869 } // namespace
1870
AddClosureForBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)1871 void RetryFilter::CallData::AddClosureForBatch(
1872 grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
1873 batch->handler_private.extra_arg = call_attempt_->lb_call();
1874 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
1875 batch, grpc_schedule_on_exec_ctx);
1876 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1877 gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on LB call: %s",
1878 chand_, this, grpc_transport_stream_op_batch_string(batch).c_str());
1879 }
1880 closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1881 "start_batch_on_lb_call");
1882 }
1883
1884 //
1885 // send op data caching
1886 //
1887
MaybeCacheSendOpsForBatch(PendingBatch * pending)1888 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1889 if (pending->send_ops_cached) return;
1890 pending->send_ops_cached = true;
1891 grpc_transport_stream_op_batch* batch = pending->batch;
1892 // Save a copy of metadata for send_initial_metadata ops.
1893 if (batch->send_initial_metadata) {
1894 seen_send_initial_metadata_ = true;
1895 GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1896 grpc_metadata_batch* send_initial_metadata =
1897 batch->payload->send_initial_metadata.send_initial_metadata;
1898 send_initial_metadata_storage_ =
1899 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
1900 sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
1901 grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1902 send_initial_metadata_storage_);
1903 send_initial_metadata_flags_ =
1904 batch->payload->send_initial_metadata.send_initial_metadata_flags;
1905 peer_string_ = batch->payload->send_initial_metadata.peer_string;
1906 }
1907 // Set up cache for send_message ops.
1908 if (batch->send_message) {
1909 ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1910 std::move(batch->payload->send_message.send_message));
1911 send_messages_.push_back(cache);
1912 }
1913 // Save metadata batch for send_trailing_metadata ops.
1914 if (batch->send_trailing_metadata) {
1915 seen_send_trailing_metadata_ = true;
1916 GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1917 grpc_metadata_batch* send_trailing_metadata =
1918 batch->payload->send_trailing_metadata.send_trailing_metadata;
1919 send_trailing_metadata_storage_ =
1920 static_cast<grpc_linked_mdelem*>(arena_->Alloc(
1921 sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
1922 grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1923 send_trailing_metadata_storage_);
1924 }
1925 }
1926
FreeCachedSendInitialMetadata()1927 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
1928 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1929 gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata",
1930 chand_, this);
1931 }
1932 grpc_metadata_batch_destroy(&send_initial_metadata_);
1933 }
1934
FreeCachedSendMessage(size_t idx)1935 void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {
1936 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1937 gpr_log(GPR_INFO,
1938 "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]", chand_,
1939 this, idx);
1940 }
1941 send_messages_[idx]->Destroy();
1942 }
1943
FreeCachedSendTrailingMetadata()1944 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
1945 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1946 gpr_log(GPR_INFO, "chand_=%p calld=%p: destroying send_trailing_metadata",
1947 chand_, this);
1948 }
1949 grpc_metadata_batch_destroy(&send_trailing_metadata_);
1950 }
1951
FreeAllCachedSendOpData()1952 void RetryFilter::CallData::FreeAllCachedSendOpData() {
1953 if (seen_send_initial_metadata_) {
1954 FreeCachedSendInitialMetadata();
1955 }
1956 for (size_t i = 0; i < send_messages_.size(); ++i) {
1957 FreeCachedSendMessage(i);
1958 }
1959 if (seen_send_trailing_metadata_) {
1960 FreeCachedSendTrailingMetadata();
1961 }
1962 }
1963
1964 //
1965 // pending_batches management
1966 //
1967
GetBatchIndex(grpc_transport_stream_op_batch * batch)1968 size_t RetryFilter::CallData::GetBatchIndex(
1969 grpc_transport_stream_op_batch* batch) {
1970 if (batch->send_initial_metadata) return 0;
1971 if (batch->send_message) return 1;
1972 if (batch->send_trailing_metadata) return 2;
1973 if (batch->recv_initial_metadata) return 3;
1974 if (batch->recv_message) return 4;
1975 if (batch->recv_trailing_metadata) return 5;
1976 GPR_UNREACHABLE_CODE(return (size_t)-1);
1977 }
1978
1979 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)1980 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
1981 grpc_transport_stream_op_batch* batch) {
1982 const size_t idx = GetBatchIndex(batch);
1983 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1984 gpr_log(GPR_INFO,
1985 "chand_=%p calld=%p: adding pending batch at index %" PRIuPTR,
1986 chand_, this, idx);
1987 }
1988 PendingBatch* pending = &pending_batches_[idx];
1989 GPR_ASSERT(pending->batch == nullptr);
1990 pending->batch = batch;
1991 pending->send_ops_cached = false;
1992 // Update state in calld about pending batches.
1993 // Also check if the batch takes us over the retry buffer limit.
1994 // Note: We don't check the size of trailing metadata here, because
1995 // gRPC clients do not send trailing metadata.
1996 if (batch->send_initial_metadata) {
1997 pending_send_initial_metadata_ = true;
1998 bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1999 batch->payload->send_initial_metadata.send_initial_metadata);
2000 }
2001 if (batch->send_message) {
2002 pending_send_message_ = true;
2003 bytes_buffered_for_retry_ +=
2004 batch->payload->send_message.send_message->length();
2005 }
2006 if (batch->send_trailing_metadata) {
2007 pending_send_trailing_metadata_ = true;
2008 }
2009 if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2010 chand_->per_rpc_retry_buffer_size_)) {
2011 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2012 gpr_log(GPR_INFO,
2013 "chand=%p calld=%p: exceeded retry buffer size, committing",
2014 chand_, this);
2015 }
2016 RetryCommit(call_attempt_.get());
2017 }
2018 return pending;
2019 }
2020
PendingBatchClear(PendingBatch * pending)2021 void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {
2022 if (pending->batch->send_initial_metadata) {
2023 pending_send_initial_metadata_ = false;
2024 }
2025 if (pending->batch->send_message) {
2026 pending_send_message_ = false;
2027 }
2028 if (pending->batch->send_trailing_metadata) {
2029 pending_send_trailing_metadata_ = false;
2030 }
2031 pending->batch = nullptr;
2032 }
2033
MaybeClearPendingBatch(PendingBatch * pending)2034 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {
2035 grpc_transport_stream_op_batch* batch = pending->batch;
2036 // We clear the pending batch if all of its callbacks have been
2037 // scheduled and reset to nullptr.
2038 if (batch->on_complete == nullptr &&
2039 (!batch->recv_initial_metadata ||
2040 batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2041 nullptr) &&
2042 (!batch->recv_message ||
2043 batch->payload->recv_message.recv_message_ready == nullptr) &&
2044 (!batch->recv_trailing_metadata ||
2045 batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2046 nullptr)) {
2047 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2048 gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_,
2049 this);
2050 }
2051 PendingBatchClear(pending);
2052 }
2053 }
2054
2055 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2056 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2057 void* arg, grpc_error_handle error) {
2058 grpc_transport_stream_op_batch* batch =
2059 static_cast<grpc_transport_stream_op_batch*>(arg);
2060 CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg);
2061 // Note: This will release the call combiner.
2062 grpc_transport_stream_op_batch_finish_with_failure(
2063 batch, GRPC_ERROR_REF(error), call->call_combiner_);
2064 }
2065
2066 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error)2067 void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {
2068 GPR_ASSERT(error != GRPC_ERROR_NONE);
2069 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2070 size_t num_batches = 0;
2071 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2072 if (pending_batches_[i].batch != nullptr) ++num_batches;
2073 }
2074 gpr_log(GPR_INFO,
2075 "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2076 chand_, this, num_batches, grpc_error_std_string(error).c_str());
2077 }
2078 CallCombinerClosureList closures;
2079 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2080 PendingBatch* pending = &pending_batches_[i];
2081 grpc_transport_stream_op_batch* batch = pending->batch;
2082 if (batch != nullptr) {
2083 batch->handler_private.extra_arg = this;
2084 GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2085 FailPendingBatchInCallCombiner, batch,
2086 grpc_schedule_on_exec_ctx);
2087 closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2088 "PendingBatchesFail");
2089 PendingBatchClear(pending);
2090 }
2091 }
2092 closures.RunClosuresWithoutYielding(call_combiner_);
2093 GRPC_ERROR_UNREF(error);
2094 }
2095
2096 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)2097 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2098 const char* log_message, Predicate predicate) {
2099 for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2100 PendingBatch* pending = &pending_batches_[i];
2101 grpc_transport_stream_op_batch* batch = pending->batch;
2102 if (batch != nullptr && predicate(batch)) {
2103 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2104 gpr_log(GPR_INFO,
2105 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2106 chand_, this, log_message, i);
2107 }
2108 return pending;
2109 }
2110 }
2111 return nullptr;
2112 }
2113
2114 //
2115 // retry code
2116 //
2117
RetryCommit(CallAttempt * call_attempt)2118 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2119 if (retry_committed_) return;
2120 retry_committed_ = true;
2121 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2122 gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this);
2123 }
2124 if (call_attempt != nullptr) {
2125 call_attempt->FreeCachedSendOpDataAfterCommit();
2126 }
2127 }
2128
DoRetry(grpc_millis server_pushback_ms)2129 void RetryFilter::CallData::DoRetry(grpc_millis server_pushback_ms) {
2130 // Reset call attempt.
2131 call_attempt_.reset();
2132 // Compute backoff delay.
2133 grpc_millis next_attempt_time;
2134 if (server_pushback_ms >= 0) {
2135 next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2136 last_attempt_got_server_pushback_ = true;
2137 } else {
2138 if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2139 last_attempt_got_server_pushback_ = false;
2140 }
2141 next_attempt_time = retry_backoff_.NextAttemptTime();
2142 }
2143 if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2144 gpr_log(GPR_INFO,
2145 "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
2146 this, next_attempt_time - ExecCtx::Get()->Now());
2147 }
2148 // Schedule retry after computed delay.
2149 GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr);
2150 GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2151 MutexLock lock(&timer_mu_);
2152 canceller_ = new Canceller(this);
2153 grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
2154 }
2155
OnRetryTimer(void * arg,grpc_error_handle error)2156 void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) {
2157 auto* calld = static_cast<CallData*>(arg);
2158 if (error == GRPC_ERROR_NONE) {
2159 bool start_attempt = false;
2160 {
2161 MutexLock lock(&calld->timer_mu_);
2162 if (calld->canceller_ != nullptr) {
2163 calld->canceller_ = nullptr;
2164 start_attempt = true;
2165 }
2166 }
2167 if (start_attempt) calld->CreateCallAttempt();
2168 }
2169 GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2170 }
2171
2172 } // namespace
2173
2174 const grpc_channel_filter kRetryFilterVtable = {
2175 RetryFilter::CallData::StartTransportStreamOpBatch,
2176 RetryFilter::StartTransportOp,
2177 sizeof(RetryFilter::CallData),
2178 RetryFilter::CallData::Init,
2179 RetryFilter::CallData::SetPollent,
2180 RetryFilter::CallData::Destroy,
2181 sizeof(RetryFilter),
2182 RetryFilter::Init,
2183 RetryFilter::Destroy,
2184 RetryFilter::GetChannelInfo,
2185 "retry_filter",
2186 };
2187
2188 } // namespace grpc_core
2189