• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include "src/core/ext/filters/client_channel/retry_filter.h"
20 
21 #include "absl/container/inlined_vector.h"
22 #include "absl/status/statusor.h"
23 #include "absl/strings/strip.h"
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/ext/filters/client_channel/client_channel.h"
28 #include "src/core/ext/filters/client_channel/retry_service_config.h"
29 #include "src/core/ext/filters/client_channel/retry_throttle.h"
30 #include "src/core/ext/filters/client_channel/service_config.h"
31 #include "src/core/ext/filters/client_channel/service_config_call_data.h"
32 #include "src/core/lib/backoff/backoff.h"
33 #include "src/core/lib/channel/channel_args.h"
34 #include "src/core/lib/channel/channel_stack.h"
35 #include "src/core/lib/channel/status_util.h"
36 #include "src/core/lib/gprpp/manual_constructor.h"
37 #include "src/core/lib/iomgr/polling_entity.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 #include "src/core/lib/slice/slice_string_helpers.h"
40 #include "src/core/lib/transport/error_utils.h"
41 #include "src/core/lib/transport/metadata.h"
42 #include "src/core/lib/transport/metadata_batch.h"
43 #include "src/core/lib/transport/static_metadata.h"
44 #include "src/core/lib/transport/status_metadata.h"
45 #include "src/core/lib/uri/uri_parser.h"
46 
47 //
48 // Retry filter
49 //
50 
51 // This filter is intended to be used in the DynamicFilter stack in the
52 // client channel, which is situated between the name resolver and the
53 // LB policy.  Normally, the last filter in the DynamicFilter stack is
54 // the DynamicTerminationFilter (see client_channel.cc), which creates a
55 // LoadBalancedCall and delegates to it.  However, when retries are
56 // enabled, this filter is used instead of the DynamicTerminationFilter.
57 //
58 // In order to support retries, we act as a proxy for stream op batches.
59 // When we get a batch from the surface, we add it to our list of pending
60 // batches, and we then use those batches to construct separate "child"
61 // batches to be started on an LB call.  When the child batches return, we
62 // then decide which pending batches have been completed and schedule their
63 // callbacks accordingly.  If a call attempt fails and we want to retry it,
64 // we create a new LB call and start again, constructing new "child" batches
65 // for the new LB call.
66 //
67 // Note that retries are committed when receiving data from the server
68 // (except for Trailers-Only responses).  However, there may be many
69 // send ops started before receiving any data, so we may have already
70 // completed some number of send ops (and returned the completions up to
71 // the surface) by the time we realize that we need to retry.  To deal
72 // with this, we cache data for send ops, so that we can replay them on a
73 // different LB call even after we have completed the original batches.
74 //
75 // The code is structured as follows:
76 // - In CallData (in the parent channel), we maintain a list of pending
77 //   ops and cached data for send ops.
78 // - There is a CallData::CallAttempt object for each retry attempt.
79 //   This object contains the LB call for that attempt and state to indicate
80 //   which ops from the CallData object have already been sent down to that
81 //   LB call.
82 // - There is a CallData::CallAttempt::BatchData object for each "child"
83 //   batch sent on the LB call.
84 //
85 // When constructing the "child" batches, we compare the state in the
86 // CallAttempt object against the state in the CallData object to see
87 // which batches need to be sent on the LB call for a given attempt.
88 
89 // TODO(roth): In subsequent PRs:
90 // - add support for transparent retries (including initial metadata)
91 // - figure out how to record stats in census for retries
92 //   (census filter is on top of this one)
93 // - add census stats for retries
94 
95 // By default, we buffer 256 KiB per RPC for retries.
96 // TODO(roth): Do we have any data to suggest a better value?
97 #define DEFAULT_PER_RPC_RETRY_BUFFER_SIZE (256 << 10)
98 
99 // This value was picked arbitrarily.  It can be changed if there is
100 // any even moderately compelling reason to do so.
101 #define RETRY_BACKOFF_JITTER 0.2
102 
103 namespace grpc_core {
104 
105 namespace {
106 
107 using internal::RetryGlobalConfig;
108 using internal::RetryMethodConfig;
109 using internal::RetryServiceConfigParser;
110 using internal::ServerRetryThrottleData;
111 
112 TraceFlag grpc_retry_trace(false, "retry");
113 
114 //
115 // RetryFilter
116 //
117 
118 class RetryFilter {
119  public:
120   class CallData;
121 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)122   static grpc_error_handle Init(grpc_channel_element* elem,
123                                 grpc_channel_element_args* args) {
124     GPR_ASSERT(args->is_last);
125     GPR_ASSERT(elem->filter == &kRetryFilterVtable);
126     grpc_error_handle error = GRPC_ERROR_NONE;
127     new (elem->channel_data) RetryFilter(args->channel_args, &error);
128     return error;
129   }
130 
Destroy(grpc_channel_element * elem)131   static void Destroy(grpc_channel_element* elem) {
132     auto* chand = static_cast<RetryFilter*>(elem->channel_data);
133     chand->~RetryFilter();
134   }
135 
136   // Will never be called.
StartTransportOp(grpc_channel_element *,grpc_transport_op *)137   static void StartTransportOp(grpc_channel_element* /*elem*/,
138                                grpc_transport_op* /*op*/) {}
GetChannelInfo(grpc_channel_element *,const grpc_channel_info *)139   static void GetChannelInfo(grpc_channel_element* /*elem*/,
140                              const grpc_channel_info* /*info*/) {}
141 
142  private:
GetMaxPerRpcRetryBufferSize(const grpc_channel_args * args)143   static size_t GetMaxPerRpcRetryBufferSize(const grpc_channel_args* args) {
144     return static_cast<size_t>(grpc_channel_args_find_integer(
145         args, GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE,
146         {DEFAULT_PER_RPC_RETRY_BUFFER_SIZE, 0, INT_MAX}));
147   }
148 
RetryFilter(const grpc_channel_args * args,grpc_error_handle * error)149   RetryFilter(const grpc_channel_args* args, grpc_error_handle* error)
150       : client_channel_(grpc_channel_args_find_pointer<ClientChannel>(
151             args, GRPC_ARG_CLIENT_CHANNEL)),
152         per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)) {
153     // Get retry throttling parameters from service config.
154     auto* service_config = grpc_channel_args_find_pointer<ServiceConfig>(
155         args, GRPC_ARG_SERVICE_CONFIG_OBJ);
156     if (service_config == nullptr) return;
157     const auto* config = static_cast<const RetryGlobalConfig*>(
158         service_config->GetGlobalParsedConfig(
159             RetryServiceConfigParser::ParserIndex()));
160     if (config == nullptr) return;
161     // Get server name from target URI.
162     const char* server_uri =
163         grpc_channel_args_find_string(args, GRPC_ARG_SERVER_URI);
164     if (server_uri == nullptr) {
165       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
166           "server URI channel arg missing or wrong type in client channel "
167           "filter");
168       return;
169     }
170     absl::StatusOr<URI> uri = URI::Parse(server_uri);
171     if (!uri.ok() || uri->path().empty()) {
172       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
173           "could not extract server name from target URI");
174       return;
175     }
176     std::string server_name(absl::StripPrefix(uri->path(), "/"));
177     // Get throttling config for server_name.
178     retry_throttle_data_ = internal::ServerRetryThrottleMap::GetDataForServer(
179         server_name, config->max_milli_tokens(), config->milli_token_ratio());
180   }
181 
182   ClientChannel* client_channel_;
183   size_t per_rpc_retry_buffer_size_;
184   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
185 };
186 
187 //
188 // RetryFilter::CallData
189 //
190 
191 class RetryFilter::CallData {
192  public:
193   static grpc_error_handle Init(grpc_call_element* elem,
194                                 const grpc_call_element_args* args);
195   static void Destroy(grpc_call_element* elem,
196                       const grpc_call_final_info* /*final_info*/,
197                       grpc_closure* then_schedule_closure);
198   static void StartTransportStreamOpBatch(
199       grpc_call_element* elem, grpc_transport_stream_op_batch* batch);
200   static void SetPollent(grpc_call_element* elem, grpc_polling_entity* pollent);
201 
202  private:
203   class Canceller;
204   class CallStackDestructionBarrier;
205 
206   // Pending batches stored in call data.
207   struct PendingBatch {
208     // The pending batch.  If nullptr, this slot is empty.
209     grpc_transport_stream_op_batch* batch = nullptr;
210     // Indicates whether payload for send ops has been cached in CallData.
211     bool send_ops_cached = false;
212   };
213 
214   // State associated with each call attempt.
215   // Allocated on the arena.
216   class CallAttempt
217       : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
218    public:
219     explicit CallAttempt(CallData* calld);
220 
lb_call() const221     ClientChannel::LoadBalancedCall* lb_call() const { return lb_call_.get(); }
222 
223     // Constructs and starts whatever batches are needed on this call
224     // attempt.
225     void StartRetriableBatches();
226 
227     // Frees cached send ops that have already been completed after
228     // committing the call.
229     void FreeCachedSendOpDataAfterCommit();
230 
231    private:
232     // State used for starting a retryable batch on the call attempt's LB call.
233     // This provides its own grpc_transport_stream_op_batch and other data
234     // structures needed to populate the ops in the batch.
235     // We allocate one struct on the arena for each attempt at starting a
236     // batch on a given LB call.
237     class BatchData
238         : public RefCounted<CallAttempt, PolymorphicRefCount, kUnrefCallDtor> {
239      public:
240       BatchData(RefCountedPtr<CallAttempt> call_attempt, int refcount,
241                 bool set_on_complete);
242       ~BatchData() override;
243 
batch()244       grpc_transport_stream_op_batch* batch() { return &batch_; }
245 
246       // Adds retriable send_initial_metadata op to batch_data.
247       void AddRetriableSendInitialMetadataOp();
248       // Adds retriable send_message op to batch_data.
249       void AddRetriableSendMessageOp();
250       // Adds retriable send_trailing_metadata op to batch_data.
251       void AddRetriableSendTrailingMetadataOp();
252       // Adds retriable recv_initial_metadata op to batch_data.
253       void AddRetriableRecvInitialMetadataOp();
254       // Adds retriable recv_message op to batch_data.
255       void AddRetriableRecvMessageOp();
256       // Adds retriable recv_trailing_metadata op to batch_data.
257       void AddRetriableRecvTrailingMetadataOp();
258 
259      private:
260       // Returns true if the call is being retried.
261       bool MaybeRetry(grpc_status_code status, grpc_mdelem* server_pushback_md,
262                       bool is_lb_drop);
263 
264       // Frees cached send ops that were completed by the completed batch in
265       // batch_data.  Used when batches are completed after the call is
266       // committed.
267       void FreeCachedSendOpDataForCompletedBatch();
268 
269       // Invokes recv_initial_metadata_ready for a batch.
270       static void InvokeRecvInitialMetadataCallback(void* arg,
271                                                     grpc_error_handle error);
272       // Intercepts recv_initial_metadata_ready callback for retries.
273       // Commits the call and returns the initial metadata up the stack.
274       static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
275 
276       // Invokes recv_message_ready for a batch.
277       static void InvokeRecvMessageCallback(void* arg, grpc_error_handle error);
278       // Intercepts recv_message_ready callback for retries.
279       // Commits the call and returns the message up the stack.
280       static void RecvMessageReady(void* arg, grpc_error_handle error);
281 
282       // Adds recv_trailing_metadata_ready closure to closures.
283       void AddClosureForRecvTrailingMetadataReady(
284           grpc_error_handle error, CallCombinerClosureList* closures);
285       // Adds any necessary closures for deferred recv_initial_metadata and
286       // recv_message callbacks to closures.
287       void AddClosuresForDeferredRecvCallbacks(
288           CallCombinerClosureList* closures);
289       // For any pending batch containing an op that has not yet been started,
290       // adds the pending batch's completion closures to closures.
291       void AddClosuresToFailUnstartedPendingBatches(
292           grpc_error_handle error, CallCombinerClosureList* closures);
293       // Runs necessary closures upon completion of a call attempt.
294       void RunClosuresForCompletedCall(grpc_error_handle error);
295       // Intercepts recv_trailing_metadata_ready callback for retries.
296       // Commits the call and returns the trailing metadata up the stack.
297       static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
298 
299       // Adds the on_complete closure for the pending batch completed in
300       // batch_data to closures.
301       void AddClosuresForCompletedPendingBatch(
302           grpc_error_handle error, CallCombinerClosureList* closures);
303 
304       // If there are any cached ops to replay or pending ops to start on the
305       // LB call, adds them to closures.
306       void AddClosuresForReplayOrPendingSendOps(
307           CallCombinerClosureList* closures);
308 
309       // Callback used to intercept on_complete from LB calls.
310       static void OnComplete(void* arg, grpc_error_handle error);
311 
312       RefCountedPtr<CallAttempt> call_attempt_;
313       // The batch to use in the LB call.
314       // Its payload field points to CallAttempt::batch_payload_.
315       grpc_transport_stream_op_batch batch_;
316       // For intercepting on_complete.
317       grpc_closure on_complete_;
318     };
319 
320     // Creates a BatchData object on the call's arena with the
321     // specified refcount.  If set_on_complete is true, the batch's
322     // on_complete callback will be set to point to on_complete();
323     // otherwise, the batch's on_complete callback will be null.
CreateBatch(int refcount,bool set_on_complete)324     BatchData* CreateBatch(int refcount, bool set_on_complete) {
325       return calld_->arena_->New<BatchData>(Ref(), refcount, set_on_complete);
326     }
327 
328     // If there are any cached send ops that need to be replayed on this
329     // call attempt, creates and returns a new batch to replay those ops.
330     // Otherwise, returns nullptr.
331     BatchData* MaybeCreateBatchForReplay();
332 
333     // Adds batches for pending batches to closures.
334     void AddBatchesForPendingBatches(CallCombinerClosureList* closures);
335 
336     // Adds whatever batches are needed on this attempt to closures.
337     void AddRetriableBatches(CallCombinerClosureList* closures);
338 
339     // Returns true if any op in the batch was not yet started on this attempt.
340     bool PendingBatchIsUnstarted(PendingBatch* pending);
341 
342     // Helper function used to start a recv_trailing_metadata batch.  This
343     // is used in the case where a recv_initial_metadata or recv_message
344     // op fails in a way that we know the call is over but when the application
345     // has not yet started its own recv_trailing_metadata op.
346     void StartInternalRecvTrailingMetadata();
347 
348     CallData* calld_;
349     RefCountedPtr<ClientChannel::LoadBalancedCall> lb_call_;
350 
351     // BatchData.batch.payload points to this.
352     grpc_transport_stream_op_batch_payload batch_payload_;
353     // For send_initial_metadata.
354     // Note that we need to make a copy of the initial metadata for each
355     // call attempt instead of just referring to the copy in call_data,
356     // because filters in the subchannel stack may modify the metadata,
357     // so we need to start in a pristine state for each attempt of the call.
358     grpc_linked_mdelem* send_initial_metadata_storage_;
359     grpc_metadata_batch send_initial_metadata_;
360     // For send_message.
361     // TODO(roth): Restructure this to eliminate use of ManualConstructor.
362     ManualConstructor<ByteStreamCache::CachingByteStream> send_message_;
363     // For send_trailing_metadata.
364     grpc_linked_mdelem* send_trailing_metadata_storage_;
365     grpc_metadata_batch send_trailing_metadata_;
366     // For intercepting recv_initial_metadata.
367     grpc_metadata_batch recv_initial_metadata_;
368     grpc_closure recv_initial_metadata_ready_;
369     bool trailing_metadata_available_ = false;
370     // For intercepting recv_message.
371     grpc_closure recv_message_ready_;
372     OrphanablePtr<ByteStream> recv_message_;
373     // For intercepting recv_trailing_metadata.
374     grpc_metadata_batch recv_trailing_metadata_;
375     grpc_transport_stream_stats collect_stats_;
376     grpc_closure recv_trailing_metadata_ready_;
377     // These fields indicate which ops have been started and completed on
378     // this call attempt.
379     size_t started_send_message_count_ = 0;
380     size_t completed_send_message_count_ = 0;
381     size_t started_recv_message_count_ = 0;
382     size_t completed_recv_message_count_ = 0;
383     bool started_send_initial_metadata_ : 1;
384     bool completed_send_initial_metadata_ : 1;
385     bool started_send_trailing_metadata_ : 1;
386     bool completed_send_trailing_metadata_ : 1;
387     bool started_recv_initial_metadata_ : 1;
388     bool completed_recv_initial_metadata_ : 1;
389     bool started_recv_trailing_metadata_ : 1;
390     bool completed_recv_trailing_metadata_ : 1;
391     // State for callback processing.
392     BatchData* recv_initial_metadata_ready_deferred_batch_ = nullptr;
393     grpc_error_handle recv_initial_metadata_error_ = GRPC_ERROR_NONE;
394     BatchData* recv_message_ready_deferred_batch_ = nullptr;
395     grpc_error_handle recv_message_error_ = GRPC_ERROR_NONE;
396     BatchData* recv_trailing_metadata_internal_batch_ = nullptr;
397     // NOTE: Do not move this next to the metadata bitfields above. That would
398     //       save space but will also result in a data race because compiler
399     //       will generate a 2 byte store which overwrites the meta-data
400     //       fields upon setting this field.
401     bool retry_dispatched_ : 1;
402   };
403 
404   CallData(RetryFilter* chand, const grpc_call_element_args& args);
405   ~CallData();
406 
407   void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch);
408 
409   // Returns the index into pending_batches_ to be used for batch.
410   static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch);
411   PendingBatch* PendingBatchesAdd(grpc_transport_stream_op_batch* batch);
412   void PendingBatchClear(PendingBatch* pending);
413   void MaybeClearPendingBatch(PendingBatch* pending);
414   static void FailPendingBatchInCallCombiner(void* arg,
415                                              grpc_error_handle error);
416   // Fails all pending batches.  Does NOT yield call combiner.
417   void PendingBatchesFail(grpc_error_handle error);
418   // Returns a pointer to the first pending batch for which predicate(batch)
419   // returns true, or null if not found.
420   template <typename Predicate>
421   PendingBatch* PendingBatchFind(const char* log_message, Predicate predicate);
422 
423   // Caches data for send ops so that it can be retried later, if not
424   // already cached.
425   void MaybeCacheSendOpsForBatch(PendingBatch* pending);
426   void FreeCachedSendInitialMetadata();
427   // Frees cached send_message at index idx.
428   void FreeCachedSendMessage(size_t idx);
429   void FreeCachedSendTrailingMetadata();
430   void FreeAllCachedSendOpData();
431 
432   // Commits the call so that no further retry attempts will be performed.
433   void RetryCommit(CallAttempt* call_attempt);
434 
435   // Starts a retry after appropriate back-off.
436   void DoRetry(grpc_millis server_pushback_ms);
437   static void OnRetryTimer(void* arg, grpc_error_handle error);
438 
439   RefCountedPtr<ClientChannel::LoadBalancedCall> CreateLoadBalancedCall();
440 
441   void CreateCallAttempt();
442 
443   // Adds a closure to closures that will execute batch in the call combiner.
444   void AddClosureForBatch(grpc_transport_stream_op_batch* batch,
445                           CallCombinerClosureList* closures);
446 
447   RetryFilter* chand_;
448   grpc_polling_entity* pollent_;
449   RefCountedPtr<ServerRetryThrottleData> retry_throttle_data_;
450   const RetryMethodConfig* retry_policy_ = nullptr;
451   BackOff retry_backoff_;
452 
453   grpc_slice path_;  // Request path.
454   gpr_cycle_counter call_start_time_;
455   grpc_millis deadline_;
456   Arena* arena_;
457   grpc_call_stack* owning_call_;
458   CallCombiner* call_combiner_;
459   grpc_call_context_element* call_context_;
460 
461   RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier_;
462 
463   // TODO(roth): As part of implementing hedging, we will need to maintain a
464   // list of all pending attempts, so that we can cancel them all if the call
465   // gets cancelled.
466   RefCountedPtr<CallAttempt> call_attempt_;
467 
468   // LB call used when the call is commited before any CallAttempt is
469   // created.
470   // TODO(roth): Change CallAttempt logic such that once we've committed
471   // and all cached send ops have been replayed, we move the LB call
472   // from the CallAttempt here, thus creating a fast path for the
473   // remainder of the streaming call.
474   RefCountedPtr<ClientChannel::LoadBalancedCall> committed_call_;
475 
476   // When are are not yet fully committed to a particular call (i.e.,
477   // either we might still retry or we have committed to the call but
478   // there are still some cached ops to be replayed on the call),
479   // batches received from above will be added to this list, and they
480   // will not be removed until we have invoked their completion callbacks.
481   size_t bytes_buffered_for_retry_ = 0;
482   PendingBatch pending_batches_[MAX_PENDING_BATCHES];
483   bool pending_send_initial_metadata_ : 1;
484   bool pending_send_message_ : 1;
485   bool pending_send_trailing_metadata_ : 1;
486 
487   // Retry state.
488   bool retry_committed_ : 1;
489   bool last_attempt_got_server_pushback_ : 1;
490   int num_attempts_completed_ = 0;
491   Mutex timer_mu_;
492   Canceller* canceller_ ABSL_GUARDED_BY(timer_mu_);
493   grpc_timer retry_timer_ ABSL_GUARDED_BY(timer_mu_);
494   grpc_closure retry_closure_;
495 
496   // The number of batches containing send ops that are currently in-flight
497   // on any call attempt.
498   // We hold a ref to the call stack while this is non-zero, since replay
499   // batches may not complete until after all callbacks have been returned
500   // to the surface, and we need to make sure that the call is not destroyed
501   // until all of these batches have completed.
502   // Note that we actually only need to track replay batches, but it's
503   // easier to track all batches with send ops.
504   int num_in_flight_call_attempt_send_batches_ = 0;
505 
506   // Cached data for retrying send ops.
507   // send_initial_metadata
508   bool seen_send_initial_metadata_ = false;
509   grpc_linked_mdelem* send_initial_metadata_storage_ = nullptr;
510   grpc_metadata_batch send_initial_metadata_;
511   uint32_t send_initial_metadata_flags_;
512   // TODO(roth): As part of implementing hedging, we'll probably need to
513   // have the LB call set a value in CallAttempt and then propagate it
514   // from CallAttempt to the parent call when we commit.  Otherwise, we
515   // may leave this with a value for a peer other than the one we
516   // actually commit to.
517   gpr_atm* peer_string_;
518   // send_message
519   // When we get a send_message op, we replace the original byte stream
520   // with a CachingByteStream that caches the slices to a local buffer for
521   // use in retries.
522   // Note: We inline the cache for the first 3 send_message ops and use
523   // dynamic allocation after that.  This number was essentially picked
524   // at random; it could be changed in the future to tune performance.
525   absl::InlinedVector<ByteStreamCache*, 3> send_messages_;
526   // send_trailing_metadata
527   bool seen_send_trailing_metadata_ = false;
528   grpc_linked_mdelem* send_trailing_metadata_storage_ = nullptr;
529   grpc_metadata_batch send_trailing_metadata_;
530 };
531 
532 //
533 // RetryFilter::CallData::CallStackDestructionBarrier
534 //
535 
536 // A class to track the existence of LoadBalancedCall call stacks that
537 // we've created.  We wait until all such call stacks have been
538 // destroyed before we return the on_call_stack_destruction closure up
539 // to the surface.
540 //
541 // The parent RetryFilter::CallData object holds a ref to this object.
542 // When it is destroyed, it will store the on_call_stack_destruction
543 // closure from the surface in this object and then release its ref.
544 // We also take a ref to this object for each LB call we create, and
545 // those refs are not released until the LB call stack is destroyed.
546 // When this object is destroyed, it will invoke the
547 // on_call_stack_destruction closure from the surface.
548 class RetryFilter::CallData::CallStackDestructionBarrier
549     : public RefCounted<CallStackDestructionBarrier, PolymorphicRefCount,
550                         kUnrefCallDtor> {
551  public:
CallStackDestructionBarrier()552   CallStackDestructionBarrier() {}
553 
~CallStackDestructionBarrier()554   ~CallStackDestructionBarrier() override {
555     // TODO(yashkt) : This can potentially be a Closure::Run
556     ExecCtx::Run(DEBUG_LOCATION, on_call_stack_destruction_, GRPC_ERROR_NONE);
557   }
558 
559   // Set the closure from the surface.  This closure will be invoked
560   // when this object is destroyed.
set_on_call_stack_destruction(grpc_closure * on_call_stack_destruction)561   void set_on_call_stack_destruction(grpc_closure* on_call_stack_destruction) {
562     on_call_stack_destruction_ = on_call_stack_destruction;
563   }
564 
565   // Invoked to get an on_call_stack_destruction closure for a new LB call.
MakeLbCallDestructionClosure(CallData * calld)566   grpc_closure* MakeLbCallDestructionClosure(CallData* calld) {
567     Ref().release();  // Ref held by callback.
568     grpc_closure* on_lb_call_destruction_complete =
569         calld->arena_->New<grpc_closure>();
570     GRPC_CLOSURE_INIT(on_lb_call_destruction_complete,
571                       OnLbCallDestructionComplete, this, nullptr);
572     return on_lb_call_destruction_complete;
573   }
574 
575  private:
OnLbCallDestructionComplete(void * arg,grpc_error_handle)576   static void OnLbCallDestructionComplete(void* arg,
577                                           grpc_error_handle /*error*/) {
578     auto* self = static_cast<CallStackDestructionBarrier*>(arg);
579     self->Unref();
580   }
581 
582   grpc_closure* on_call_stack_destruction_ = nullptr;
583 };
584 
585 //
586 // RetryFilter::CallData::Canceller
587 //
588 
589 class RetryFilter::CallData::Canceller {
590  public:
Canceller(CallData * calld)591   explicit Canceller(CallData* calld) : calld_(calld) {
592     GRPC_CALL_STACK_REF(calld_->owning_call_, "RetryCanceller");
593     GRPC_CLOSURE_INIT(&closure_, &Cancel, this, nullptr);
594     calld_->call_combiner_->SetNotifyOnCancel(&closure_);
595   }
596 
597  private:
Cancel(void * arg,grpc_error_handle error)598   static void Cancel(void* arg, grpc_error_handle error) {
599     auto* self = static_cast<Canceller*>(arg);
600     auto* calld = self->calld_;
601     {
602       MutexLock lock(&calld->timer_mu_);
603       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
604         gpr_log(GPR_INFO,
605                 "calld=%p: cancelling retry timer: error=%s self=%p "
606                 "calld->canceller_=%p",
607                 calld, grpc_error_std_string(error).c_str(), self,
608                 calld->canceller_);
609       }
610       if (calld->canceller_ == self && error != GRPC_ERROR_NONE) {
611         calld->canceller_ = nullptr;  // Checked by OnRetryTimer().
612         grpc_timer_cancel(&calld->retry_timer_);
613         calld->FreeAllCachedSendOpData();
614         GRPC_CALL_COMBINER_STOP(calld->call_combiner_, "Canceller");
615       }
616     }
617     GRPC_CALL_STACK_UNREF(calld->owning_call_, "RetryCanceller");
618     delete self;
619   }
620 
621   CallData* calld_;
622   grpc_closure closure_;
623 };
624 
625 //
626 // RetryFilter::CallData::CallAttempt
627 //
628 
CallAttempt(CallData * calld)629 RetryFilter::CallData::CallAttempt::CallAttempt(CallData* calld)
630     : calld_(calld),
631       batch_payload_(calld->call_context_),
632       started_send_initial_metadata_(false),
633       completed_send_initial_metadata_(false),
634       started_send_trailing_metadata_(false),
635       completed_send_trailing_metadata_(false),
636       started_recv_initial_metadata_(false),
637       completed_recv_initial_metadata_(false),
638       started_recv_trailing_metadata_(false),
639       completed_recv_trailing_metadata_(false),
640       retry_dispatched_(false) {
641   lb_call_ = calld->CreateLoadBalancedCall();
642   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
643     gpr_log(GPR_INFO, "chand=%p calld=%p: attempt=%p: create lb_call=%p",
644             calld->chand_, calld, this, lb_call_.get());
645   }
646 }
647 
FreeCachedSendOpDataAfterCommit()648 void RetryFilter::CallData::CallAttempt::FreeCachedSendOpDataAfterCommit() {
649   // TODO(roth): When we implement hedging, this logic will need to get
650   // a bit more complex, because there may be other (now abandoned) call
651   // attempts still using this data.  We may need to do some sort of
652   // ref-counting instead.
653   if (completed_send_initial_metadata_) {
654     calld_->FreeCachedSendInitialMetadata();
655   }
656   for (size_t i = 0; i < completed_send_message_count_; ++i) {
657     calld_->FreeCachedSendMessage(i);
658   }
659   if (completed_send_trailing_metadata_) {
660     calld_->FreeCachedSendTrailingMetadata();
661   }
662 }
663 
PendingBatchIsUnstarted(PendingBatch * pending)664 bool RetryFilter::CallData::CallAttempt::PendingBatchIsUnstarted(
665     PendingBatch* pending) {
666   // Only look at batches containing send ops, since batches containing
667   // only recv ops are always started immediately.
668   if (pending->batch == nullptr || pending->batch->on_complete == nullptr) {
669     return false;
670   }
671   if (pending->batch->send_initial_metadata &&
672       !started_send_initial_metadata_) {
673     return true;
674   }
675   if (pending->batch->send_message &&
676       started_send_message_count_ < calld_->send_messages_.size()) {
677     return true;
678   }
679   if (pending->batch->send_trailing_metadata &&
680       !started_send_trailing_metadata_) {
681     return true;
682   }
683   return false;
684 }
685 
StartInternalRecvTrailingMetadata()686 void RetryFilter::CallData::CallAttempt::StartInternalRecvTrailingMetadata() {
687   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
688     gpr_log(GPR_INFO,
689             "chand=%p calld=%p: call failed but recv_trailing_metadata not "
690             "started; starting it internally",
691             calld_->chand_, calld_);
692   }
693   // Create batch_data with 2 refs, since this batch will be unreffed twice:
694   // once for the recv_trailing_metadata_ready callback when the batch
695   // completes, and again when we actually get a recv_trailing_metadata
696   // op from the surface.
697   BatchData* batch_data = CreateBatch(2, false /* set_on_complete */);
698   batch_data->AddRetriableRecvTrailingMetadataOp();
699   recv_trailing_metadata_internal_batch_ = batch_data;
700   // Note: This will release the call combiner.
701   lb_call_->StartTransportStreamOpBatch(batch_data->batch());
702 }
703 
704 // If there are any cached send ops that need to be replayed on the
705 // current call attempt, creates and returns a new batch to replay those ops.
706 // Otherwise, returns nullptr.
707 RetryFilter::CallData::CallAttempt::BatchData*
MaybeCreateBatchForReplay()708 RetryFilter::CallData::CallAttempt::MaybeCreateBatchForReplay() {
709   BatchData* replay_batch_data = nullptr;
710   // send_initial_metadata.
711   if (calld_->seen_send_initial_metadata_ && !started_send_initial_metadata_ &&
712       !calld_->pending_send_initial_metadata_) {
713     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
714       gpr_log(GPR_INFO,
715               "chand=%p calld=%p: replaying previously completed "
716               "send_initial_metadata op",
717               calld_->chand_, calld_);
718     }
719     replay_batch_data = CreateBatch(1, true /* set_on_complete */);
720     replay_batch_data->AddRetriableSendInitialMetadataOp();
721   }
722   // send_message.
723   // Note that we can only have one send_message op in flight at a time.
724   if (started_send_message_count_ < calld_->send_messages_.size() &&
725       started_send_message_count_ == completed_send_message_count_ &&
726       !calld_->pending_send_message_) {
727     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
728       gpr_log(GPR_INFO,
729               "chand=%p calld=%p: replaying previously completed "
730               "send_message op",
731               calld_->chand_, calld_);
732     }
733     if (replay_batch_data == nullptr) {
734       replay_batch_data = CreateBatch(1, true /* set_on_complete */);
735     }
736     replay_batch_data->AddRetriableSendMessageOp();
737   }
738   // send_trailing_metadata.
739   // Note that we only add this op if we have no more send_message ops
740   // to start, since we can't send down any more send_message ops after
741   // send_trailing_metadata.
742   if (calld_->seen_send_trailing_metadata_ &&
743       started_send_message_count_ == calld_->send_messages_.size() &&
744       !started_send_trailing_metadata_ &&
745       !calld_->pending_send_trailing_metadata_) {
746     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
747       gpr_log(GPR_INFO,
748               "chand=%p calld=%p: replaying previously completed "
749               "send_trailing_metadata op",
750               calld_->chand_, calld_);
751     }
752     if (replay_batch_data == nullptr) {
753       replay_batch_data = CreateBatch(1, true /* set_on_complete */);
754     }
755     replay_batch_data->AddRetriableSendTrailingMetadataOp();
756   }
757   return replay_batch_data;
758 }
759 
AddBatchesForPendingBatches(CallCombinerClosureList * closures)760 void RetryFilter::CallData::CallAttempt::AddBatchesForPendingBatches(
761     CallCombinerClosureList* closures) {
762   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld_->pending_batches_); ++i) {
763     PendingBatch* pending = &calld_->pending_batches_[i];
764     grpc_transport_stream_op_batch* batch = pending->batch;
765     if (batch == nullptr) continue;
766     // Skip any batch that either (a) has already been started on this
767     // call attempt or (b) we can't start yet because we're still
768     // replaying send ops that need to be completed first.
769     // TODO(roth): Note that if any one op in the batch can't be sent
770     // yet due to ops that we're replaying, we don't start any of the ops
771     // in the batch.  This is probably okay, but it could conceivably
772     // lead to increased latency in some cases -- e.g., we could delay
773     // starting a recv op due to it being in the same batch with a send
774     // op.  If/when we revamp the callback protocol in
775     // transport_stream_op_batch, we may be able to fix this.
776     if (batch->send_initial_metadata && started_send_initial_metadata_) {
777       continue;
778     }
779     if (batch->send_message &&
780         completed_send_message_count_ < started_send_message_count_) {
781       continue;
782     }
783     // Note that we only start send_trailing_metadata if we have no more
784     // send_message ops to start, since we can't send down any more
785     // send_message ops after send_trailing_metadata.
786     if (batch->send_trailing_metadata &&
787         (started_send_message_count_ + batch->send_message <
788              calld_->send_messages_.size() ||
789          started_send_trailing_metadata_)) {
790       continue;
791     }
792     if (batch->recv_initial_metadata && started_recv_initial_metadata_) {
793       continue;
794     }
795     if (batch->recv_message &&
796         completed_recv_message_count_ < started_recv_message_count_) {
797       continue;
798     }
799     if (batch->recv_trailing_metadata && started_recv_trailing_metadata_) {
800       // If we previously completed a recv_trailing_metadata op
801       // initiated by StartInternalRecvTrailingMetadata(), use the
802       // result of that instead of trying to re-start this op.
803       if (GPR_UNLIKELY(recv_trailing_metadata_internal_batch_ != nullptr)) {
804         // If the batch completed, then trigger the completion callback
805         // directly, so that we return the previously returned results to
806         // the application.  Otherwise, just unref the internally started
807         // batch, since we'll propagate the completion when it completes.
808         if (completed_recv_trailing_metadata_) {
809           // Batches containing recv_trailing_metadata always succeed.
810           closures->Add(
811               &recv_trailing_metadata_ready_, GRPC_ERROR_NONE,
812               "re-executing recv_trailing_metadata_ready to propagate "
813               "internally triggered result");
814         } else {
815           recv_trailing_metadata_internal_batch_->Unref();
816         }
817         recv_trailing_metadata_internal_batch_ = nullptr;
818       }
819       continue;
820     }
821     // If we're already committed, just send the batch as-is.
822     if (calld_->retry_committed_) {
823       calld_->AddClosureForBatch(batch, closures);
824       calld_->PendingBatchClear(pending);
825       continue;
826     }
827     // Create batch with the right number of callbacks.
828     const bool has_send_ops = batch->send_initial_metadata ||
829                               batch->send_message ||
830                               batch->send_trailing_metadata;
831     const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
832                               batch->recv_message +
833                               batch->recv_trailing_metadata;
834     CallAttempt::BatchData* batch_data =
835         CreateBatch(num_callbacks, has_send_ops /* set_on_complete */);
836     // Cache send ops if needed.
837     calld_->MaybeCacheSendOpsForBatch(pending);
838     // send_initial_metadata.
839     if (batch->send_initial_metadata) {
840       batch_data->AddRetriableSendInitialMetadataOp();
841     }
842     // send_message.
843     if (batch->send_message) {
844       batch_data->AddRetriableSendMessageOp();
845     }
846     // send_trailing_metadata.
847     if (batch->send_trailing_metadata) {
848       batch_data->AddRetriableSendTrailingMetadataOp();
849     }
850     // recv_initial_metadata.
851     if (batch->recv_initial_metadata) {
852       // recv_flags is only used on the server side.
853       GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
854       batch_data->AddRetriableRecvInitialMetadataOp();
855     }
856     // recv_message.
857     if (batch->recv_message) {
858       batch_data->AddRetriableRecvMessageOp();
859     }
860     // recv_trailing_metadata.
861     if (batch->recv_trailing_metadata) {
862       batch_data->AddRetriableRecvTrailingMetadataOp();
863     }
864     calld_->AddClosureForBatch(batch_data->batch(), closures);
865     // Track number of in-flight send batches.
866     // If this is the first one, take a ref to the call stack.
867     if (batch->send_initial_metadata || batch->send_message ||
868         batch->send_trailing_metadata) {
869       if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
870         GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
871       }
872       ++calld_->num_in_flight_call_attempt_send_batches_;
873     }
874   }
875 }
876 
AddRetriableBatches(CallCombinerClosureList * closures)877 void RetryFilter::CallData::CallAttempt::AddRetriableBatches(
878     CallCombinerClosureList* closures) {
879   // Replay previously-returned send_* ops if needed.
880   BatchData* replay_batch_data = MaybeCreateBatchForReplay();
881   if (replay_batch_data != nullptr) {
882     calld_->AddClosureForBatch(replay_batch_data->batch(), closures);
883     // Track number of pending send batches.
884     // If this is the first one, take a ref to the call stack.
885     if (calld_->num_in_flight_call_attempt_send_batches_ == 0) {
886       GRPC_CALL_STACK_REF(calld_->owning_call_, "retriable_send_batches");
887     }
888     ++calld_->num_in_flight_call_attempt_send_batches_;
889   }
890   // Now add pending batches.
891   AddBatchesForPendingBatches(closures);
892 }
893 
StartRetriableBatches()894 void RetryFilter::CallData::CallAttempt::StartRetriableBatches() {
895   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
896     gpr_log(GPR_INFO, "chand=%p calld=%p: constructing retriable batches",
897             calld_->chand_, calld_);
898   }
899   // Construct list of closures to execute, one for each pending batch.
900   CallCombinerClosureList closures;
901   AddRetriableBatches(&closures);
902   // Note: This will yield the call combiner.
903   // Start batches on LB call.
904   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
905     gpr_log(GPR_INFO,
906             "chand=%p calld=%p: starting %" PRIuPTR
907             " retriable batches on lb_call=%p",
908             calld_->chand_, calld_, closures.size(), lb_call());
909   }
910   closures.RunClosures(calld_->call_combiner_);
911 }
912 
913 //
914 // RetryFilter::CallData::CallAttempt::BatchData
915 //
916 
BatchData(RefCountedPtr<CallAttempt> attempt,int refcount,bool set_on_complete)917 RetryFilter::CallData::CallAttempt::BatchData::BatchData(
918     RefCountedPtr<CallAttempt> attempt, int refcount, bool set_on_complete)
919     : RefCounted(nullptr, refcount), call_attempt_(std::move(attempt)) {
920   // TODO(roth): Consider holding this ref on the call stack in
921   // CallAttempt instead of here in BatchData.  This would eliminate the
922   // need for CallData::num_in_flight_call_attempt_send_batches_.
923   // But it would require having a way to unref CallAttempt when it is
924   // no longer needed (i.e., when the call is committed and all cached
925   // send ops have been replayed and the LB call is moved into
926   // CallData::committed_call_).
927   GRPC_CALL_STACK_REF(call_attempt_->calld_->owning_call_, "CallAttempt");
928   batch_.payload = &call_attempt_->batch_payload_;
929   if (set_on_complete) {
930     GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
931                       grpc_schedule_on_exec_ctx);
932     batch_.on_complete = &on_complete_;
933   }
934 }
935 
~BatchData()936 RetryFilter::CallData::CallAttempt::BatchData::~BatchData() {
937   if (batch_.send_initial_metadata) {
938     grpc_metadata_batch_destroy(&call_attempt_->send_initial_metadata_);
939   }
940   if (batch_.send_trailing_metadata) {
941     grpc_metadata_batch_destroy(&call_attempt_->send_trailing_metadata_);
942   }
943   if (batch_.recv_initial_metadata) {
944     grpc_metadata_batch_destroy(&call_attempt_->recv_initial_metadata_);
945   }
946   if (batch_.recv_trailing_metadata) {
947     grpc_metadata_batch_destroy(&call_attempt_->recv_trailing_metadata_);
948   }
949   GRPC_CALL_STACK_UNREF(call_attempt_->calld_->owning_call_, "CallAttempt");
950 }
951 
952 void RetryFilter::CallData::CallAttempt::BatchData::
FreeCachedSendOpDataForCompletedBatch()953     FreeCachedSendOpDataForCompletedBatch() {
954   auto* calld = call_attempt_->calld_;
955   // TODO(roth): When we implement hedging, this logic will need to get
956   // a bit more complex, because there may be other (now abandoned) call
957   // attempts still using this data.  We may need to do some sort of
958   // ref-counting instead.
959   if (batch_.send_initial_metadata) {
960     calld->FreeCachedSendInitialMetadata();
961   }
962   if (batch_.send_message) {
963     calld->FreeCachedSendMessage(call_attempt_->completed_send_message_count_ -
964                                  1);
965   }
966   if (batch_.send_trailing_metadata) {
967     calld->FreeCachedSendTrailingMetadata();
968   }
969 }
970 
MaybeRetry(grpc_status_code status,grpc_mdelem * server_pushback_md,bool is_lb_drop)971 bool RetryFilter::CallData::CallAttempt::BatchData::MaybeRetry(
972     grpc_status_code status, grpc_mdelem* server_pushback_md, bool is_lb_drop) {
973   auto* calld = call_attempt_->calld_;
974   // LB drops always inhibit retries.
975   if (is_lb_drop) return false;
976   // Get retry policy.
977   if (calld->retry_policy_ == nullptr) return false;
978   // If we've already dispatched a retry from this call, return true.
979   // This catches the case where the batch has multiple callbacks
980   // (i.e., it includes either recv_message or recv_initial_metadata).
981   if (call_attempt_->retry_dispatched_) {
982     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
983       gpr_log(GPR_INFO, "chand=%p calld=%p: retry already dispatched",
984               calld->chand_, calld);
985     }
986     return true;
987   }
988   // Check status.
989   if (GPR_LIKELY(status == GRPC_STATUS_OK)) {
990     if (calld->retry_throttle_data_ != nullptr) {
991       calld->retry_throttle_data_->RecordSuccess();
992     }
993     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
994       gpr_log(GPR_INFO, "chand=%p calld=%p: call succeeded", calld->chand_,
995               calld);
996     }
997     return false;
998   }
999   // Status is not OK.  Check whether the status is retryable.
1000   if (!calld->retry_policy_->retryable_status_codes().Contains(status)) {
1001     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1002       gpr_log(GPR_INFO,
1003               "chand=%p calld=%p: status %s not configured as retryable",
1004               calld->chand_, calld, grpc_status_code_to_string(status));
1005     }
1006     return false;
1007   }
1008   // Record the failure and check whether retries are throttled.
1009   // Note that it's important for this check to come after the status
1010   // code check above, since we should only record failures whose statuses
1011   // match the configured retryable status codes, so that we don't count
1012   // things like failures due to malformed requests (INVALID_ARGUMENT).
1013   // Conversely, it's important for this to come before the remaining
1014   // checks, so that we don't fail to record failures due to other factors.
1015   if (calld->retry_throttle_data_ != nullptr &&
1016       !calld->retry_throttle_data_->RecordFailure()) {
1017     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1018       gpr_log(GPR_INFO, "chand=%p calld=%p: retries throttled", calld->chand_,
1019               calld);
1020     }
1021     return false;
1022   }
1023   // Check whether the call is committed.
1024   if (calld->retry_committed_) {
1025     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1026       gpr_log(GPR_INFO, "chand=%p calld=%p: retries already committed",
1027               calld->chand_, calld);
1028     }
1029     return false;
1030   }
1031   // Check whether we have retries remaining.
1032   ++calld->num_attempts_completed_;
1033   if (calld->num_attempts_completed_ >= calld->retry_policy_->max_attempts()) {
1034     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1035       gpr_log(GPR_INFO, "chand=%p calld=%p: exceeded %d retry attempts",
1036               calld->chand_, calld, calld->retry_policy_->max_attempts());
1037     }
1038     return false;
1039   }
1040   // Check server push-back.
1041   grpc_millis server_pushback_ms = -1;
1042   if (server_pushback_md != nullptr) {
1043     // If the value is "-1" or any other unparseable string, we do not retry.
1044     uint32_t ms;
1045     if (!grpc_parse_slice_to_uint32(GRPC_MDVALUE(*server_pushback_md), &ms)) {
1046       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1047         gpr_log(GPR_INFO,
1048                 "chand=%p calld=%p: not retrying due to server push-back",
1049                 calld->chand_, calld);
1050       }
1051       return false;
1052     } else {
1053       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1054         gpr_log(GPR_INFO, "chand=%p calld=%p: server push-back: retry in %u ms",
1055                 calld->chand_, calld, ms);
1056       }
1057       server_pushback_ms = static_cast<grpc_millis>(ms);
1058     }
1059   }
1060   // Do retry.
1061   call_attempt_->retry_dispatched_ = true;
1062   calld->DoRetry(server_pushback_ms);
1063   return true;
1064 }
1065 
1066 //
1067 // recv_initial_metadata callback handling
1068 //
1069 
1070 void RetryFilter::CallData::CallAttempt::BatchData::
InvokeRecvInitialMetadataCallback(void * arg,grpc_error_handle error)1071     InvokeRecvInitialMetadataCallback(void* arg, grpc_error_handle error) {
1072   auto* batch_data = static_cast<CallAttempt::BatchData*>(arg);
1073   auto* call_attempt = batch_data->call_attempt_.get();
1074   // Find pending batch.
1075   PendingBatch* pending = call_attempt->calld_->PendingBatchFind(
1076       "invoking recv_initial_metadata_ready for",
1077       [](grpc_transport_stream_op_batch* batch) {
1078         return batch->recv_initial_metadata &&
1079                batch->payload->recv_initial_metadata
1080                        .recv_initial_metadata_ready != nullptr;
1081       });
1082   GPR_ASSERT(pending != nullptr);
1083   // Return metadata.
1084   grpc_metadata_batch_move(
1085       &call_attempt->recv_initial_metadata_,
1086       pending->batch->payload->recv_initial_metadata.recv_initial_metadata);
1087   // Update bookkeeping.
1088   // Note: Need to do this before invoking the callback, since invoking
1089   // the callback will result in yielding the call combiner.
1090   grpc_closure* recv_initial_metadata_ready =
1091       pending->batch->payload->recv_initial_metadata
1092           .recv_initial_metadata_ready;
1093   pending->batch->payload->recv_initial_metadata.recv_initial_metadata_ready =
1094       nullptr;
1095   call_attempt->calld_->MaybeClearPendingBatch(pending);
1096   batch_data->Unref();
1097   // Invoke callback.
1098   Closure::Run(DEBUG_LOCATION, recv_initial_metadata_ready,
1099                GRPC_ERROR_REF(error));
1100 }
1101 
RecvInitialMetadataReady(void * arg,grpc_error_handle error)1102 void RetryFilter::CallData::CallAttempt::BatchData::RecvInitialMetadataReady(
1103     void* arg, grpc_error_handle error) {
1104   CallAttempt::BatchData* batch_data =
1105       static_cast<CallAttempt::BatchData*>(arg);
1106   CallAttempt* call_attempt = batch_data->call_attempt_.get();
1107   CallData* calld = call_attempt->calld_;
1108   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1109     gpr_log(GPR_INFO,
1110             "chand=%p calld=%p: got recv_initial_metadata_ready, error=%s",
1111             calld->chand_, calld, grpc_error_std_string(error).c_str());
1112   }
1113   call_attempt->completed_recv_initial_metadata_ = true;
1114   // If a retry was already dispatched, then we're not going to use the
1115   // result of this recv_initial_metadata op, so do nothing.
1116   if (call_attempt->retry_dispatched_) {
1117     GRPC_CALL_COMBINER_STOP(
1118         calld->call_combiner_,
1119         "recv_initial_metadata_ready after retry dispatched");
1120     return;
1121   }
1122   if (!calld->retry_committed_) {
1123     // If we got an error or a Trailers-Only response and have not yet gotten
1124     // the recv_trailing_metadata_ready callback, then defer propagating this
1125     // callback back to the surface.  We can evaluate whether to retry when
1126     // recv_trailing_metadata comes back.
1127     if (GPR_UNLIKELY((call_attempt->trailing_metadata_available_ ||
1128                       error != GRPC_ERROR_NONE) &&
1129                      !call_attempt->completed_recv_trailing_metadata_)) {
1130       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1131         gpr_log(GPR_INFO,
1132                 "chand=%p calld=%p: deferring recv_initial_metadata_ready "
1133                 "(Trailers-Only)",
1134                 calld->chand_, calld);
1135       }
1136       call_attempt->recv_initial_metadata_ready_deferred_batch_ = batch_data;
1137       call_attempt->recv_initial_metadata_error_ = GRPC_ERROR_REF(error);
1138       if (!call_attempt->started_recv_trailing_metadata_) {
1139         // recv_trailing_metadata not yet started by application; start it
1140         // ourselves to get status.
1141         call_attempt->StartInternalRecvTrailingMetadata();
1142       } else {
1143         GRPC_CALL_COMBINER_STOP(
1144             calld->call_combiner_,
1145             "recv_initial_metadata_ready trailers-only or error");
1146       }
1147       return;
1148     }
1149     // Received valid initial metadata, so commit the call.
1150     calld->RetryCommit(call_attempt);
1151   }
1152   // Invoke the callback to return the result to the surface.
1153   // Manually invoking a callback function; it does not take ownership of error.
1154   InvokeRecvInitialMetadataCallback(batch_data, error);
1155 }
1156 
1157 //
1158 // recv_message callback handling
1159 //
1160 
InvokeRecvMessageCallback(void * arg,grpc_error_handle error)1161 void RetryFilter::CallData::CallAttempt::BatchData::InvokeRecvMessageCallback(
1162     void* arg, grpc_error_handle error) {
1163   CallAttempt::BatchData* batch_data =
1164       static_cast<CallAttempt::BatchData*>(arg);
1165   CallAttempt* call_attempt = batch_data->call_attempt_.get();
1166   CallData* calld = call_attempt->calld_;
1167   // Find pending op.
1168   PendingBatch* pending = calld->PendingBatchFind(
1169       "invoking recv_message_ready for",
1170       [](grpc_transport_stream_op_batch* batch) {
1171         return batch->recv_message &&
1172                batch->payload->recv_message.recv_message_ready != nullptr;
1173       });
1174   GPR_ASSERT(pending != nullptr);
1175   // Return payload.
1176   *pending->batch->payload->recv_message.recv_message =
1177       std::move(call_attempt->recv_message_);
1178   // Update bookkeeping.
1179   // Note: Need to do this before invoking the callback, since invoking
1180   // the callback will result in yielding the call combiner.
1181   grpc_closure* recv_message_ready =
1182       pending->batch->payload->recv_message.recv_message_ready;
1183   pending->batch->payload->recv_message.recv_message_ready = nullptr;
1184   calld->MaybeClearPendingBatch(pending);
1185   batch_data->Unref();
1186   // Invoke callback.
1187   Closure::Run(DEBUG_LOCATION, recv_message_ready, GRPC_ERROR_REF(error));
1188 }
1189 
RecvMessageReady(void * arg,grpc_error_handle error)1190 void RetryFilter::CallData::CallAttempt::BatchData::RecvMessageReady(
1191     void* arg, grpc_error_handle error) {
1192   CallAttempt::BatchData* batch_data =
1193       static_cast<CallAttempt::BatchData*>(arg);
1194   CallAttempt* call_attempt = batch_data->call_attempt_.get();
1195   CallData* calld = call_attempt->calld_;
1196   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1197     gpr_log(GPR_INFO, "chand=%p calld=%p: got recv_message_ready, error=%s",
1198             calld->chand_, calld, grpc_error_std_string(error).c_str());
1199   }
1200   ++call_attempt->completed_recv_message_count_;
1201   // If a retry was already dispatched, then we're not going to use the
1202   // result of this recv_message op, so do nothing.
1203   if (call_attempt->retry_dispatched_) {
1204     GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1205                             "recv_message_ready after retry dispatched");
1206     return;
1207   }
1208   if (!calld->retry_committed_) {
1209     // If we got an error or the payload was nullptr and we have not yet gotten
1210     // the recv_trailing_metadata_ready callback, then defer propagating this
1211     // callback back to the surface.  We can evaluate whether to retry when
1212     // recv_trailing_metadata comes back.
1213     if (GPR_UNLIKELY((call_attempt->recv_message_ == nullptr ||
1214                       error != GRPC_ERROR_NONE) &&
1215                      !call_attempt->completed_recv_trailing_metadata_)) {
1216       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1217         gpr_log(GPR_INFO,
1218                 "chand=%p calld=%p: deferring recv_message_ready (nullptr "
1219                 "message and recv_trailing_metadata pending)",
1220                 calld->chand_, calld);
1221       }
1222       call_attempt->recv_message_ready_deferred_batch_ = batch_data;
1223       call_attempt->recv_message_error_ = GRPC_ERROR_REF(error);
1224       if (!call_attempt->started_recv_trailing_metadata_) {
1225         // recv_trailing_metadata not yet started by application; start it
1226         // ourselves to get status.
1227         call_attempt->StartInternalRecvTrailingMetadata();
1228       } else {
1229         GRPC_CALL_COMBINER_STOP(calld->call_combiner_,
1230                                 "recv_message_ready null");
1231       }
1232       return;
1233     }
1234     // Received a valid message, so commit the call.
1235     calld->RetryCommit(call_attempt);
1236   }
1237   // Invoke the callback to return the result to the surface.
1238   // Manually invoking a callback function; it does not take ownership of error.
1239   InvokeRecvMessageCallback(batch_data, error);
1240 }
1241 
1242 //
1243 // recv_trailing_metadata handling
1244 //
1245 
1246 namespace {
1247 
1248 // Sets *status, *server_pushback_md, and *is_lb_drop based on md_batch
1249 // and error.
GetCallStatus(grpc_millis deadline,grpc_metadata_batch * md_batch,grpc_error_handle error,grpc_status_code * status,grpc_mdelem ** server_pushback_md,bool * is_lb_drop)1250 void GetCallStatus(grpc_millis deadline, grpc_metadata_batch* md_batch,
1251                    grpc_error_handle error, grpc_status_code* status,
1252                    grpc_mdelem** server_pushback_md, bool* is_lb_drop) {
1253   if (error != GRPC_ERROR_NONE) {
1254     grpc_error_get_status(error, deadline, status, nullptr, nullptr, nullptr);
1255     intptr_t value = 0;
1256     if (grpc_error_get_int(error, GRPC_ERROR_INT_LB_POLICY_DROP, &value) &&
1257         value != 0) {
1258       *is_lb_drop = true;
1259     }
1260   } else {
1261     GPR_ASSERT(md_batch->idx.named.grpc_status != nullptr);
1262     *status =
1263         grpc_get_status_code_from_metadata(md_batch->idx.named.grpc_status->md);
1264     if (md_batch->idx.named.grpc_retry_pushback_ms != nullptr) {
1265       *server_pushback_md = &md_batch->idx.named.grpc_retry_pushback_ms->md;
1266     }
1267   }
1268   GRPC_ERROR_UNREF(error);
1269 }
1270 
1271 }  // namespace
1272 
1273 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosureForRecvTrailingMetadataReady(grpc_error_handle error,CallCombinerClosureList * closures)1274     AddClosureForRecvTrailingMetadataReady(grpc_error_handle error,
1275                                            CallCombinerClosureList* closures) {
1276   auto* calld = call_attempt_->calld_;
1277   // Find pending batch.
1278   PendingBatch* pending = calld->PendingBatchFind(
1279       "invoking recv_trailing_metadata for",
1280       [](grpc_transport_stream_op_batch* batch) {
1281         return batch->recv_trailing_metadata &&
1282                batch->payload->recv_trailing_metadata
1283                        .recv_trailing_metadata_ready != nullptr;
1284       });
1285   // If we generated the recv_trailing_metadata op internally via
1286   // StartInternalRecvTrailingMetadata(), then there will be no pending batch.
1287   if (pending == nullptr) {
1288     GRPC_ERROR_UNREF(error);
1289     return;
1290   }
1291   // Return metadata.
1292   grpc_metadata_batch_move(
1293       &call_attempt_->recv_trailing_metadata_,
1294       pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata);
1295   // Add closure.
1296   closures->Add(pending->batch->payload->recv_trailing_metadata
1297                     .recv_trailing_metadata_ready,
1298                 error, "recv_trailing_metadata_ready for pending batch");
1299   // Update bookkeeping.
1300   pending->batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1301       nullptr;
1302   calld->MaybeClearPendingBatch(pending);
1303 }
1304 
1305 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList * closures)1306     AddClosuresForDeferredRecvCallbacks(CallCombinerClosureList* closures) {
1307   if (batch_.recv_trailing_metadata) {
1308     // Add closure for deferred recv_initial_metadata_ready.
1309     if (GPR_UNLIKELY(
1310             call_attempt_->recv_initial_metadata_ready_deferred_batch_ !=
1311             nullptr)) {
1312       GRPC_CLOSURE_INIT(
1313           &call_attempt_->recv_initial_metadata_ready_,
1314           InvokeRecvInitialMetadataCallback,
1315           call_attempt_->recv_initial_metadata_ready_deferred_batch_,
1316           grpc_schedule_on_exec_ctx);
1317       closures->Add(&call_attempt_->recv_initial_metadata_ready_,
1318                     call_attempt_->recv_initial_metadata_error_,
1319                     "resuming recv_initial_metadata_ready");
1320       call_attempt_->recv_initial_metadata_ready_deferred_batch_ = nullptr;
1321     }
1322     // Add closure for deferred recv_message_ready.
1323     if (GPR_UNLIKELY(call_attempt_->recv_message_ready_deferred_batch_ !=
1324                      nullptr)) {
1325       GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_,
1326                         InvokeRecvMessageCallback,
1327                         call_attempt_->recv_message_ready_deferred_batch_,
1328                         grpc_schedule_on_exec_ctx);
1329       closures->Add(&call_attempt_->recv_message_ready_,
1330                     call_attempt_->recv_message_error_,
1331                     "resuming recv_message_ready");
1332       call_attempt_->recv_message_ready_deferred_batch_ = nullptr;
1333     }
1334   }
1335 }
1336 
1337 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresToFailUnstartedPendingBatches(grpc_error_handle error,CallCombinerClosureList * closures)1338     AddClosuresToFailUnstartedPendingBatches(
1339         grpc_error_handle error, CallCombinerClosureList* closures) {
1340   auto* calld = call_attempt_->calld_;
1341   for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1342     PendingBatch* pending = &calld->pending_batches_[i];
1343     if (call_attempt_->PendingBatchIsUnstarted(pending)) {
1344       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1345         gpr_log(GPR_INFO,
1346                 "chand=%p calld=%p: failing unstarted pending batch at "
1347                 "index %" PRIuPTR,
1348                 calld->chand_, calld, i);
1349       }
1350       closures->Add(pending->batch->on_complete, GRPC_ERROR_REF(error),
1351                     "failing on_complete for pending batch");
1352       pending->batch->on_complete = nullptr;
1353       calld->MaybeClearPendingBatch(pending);
1354     }
1355   }
1356   GRPC_ERROR_UNREF(error);
1357 }
1358 
RunClosuresForCompletedCall(grpc_error_handle error)1359 void RetryFilter::CallData::CallAttempt::BatchData::RunClosuresForCompletedCall(
1360     grpc_error_handle error) {
1361   // Construct list of closures to execute.
1362   CallCombinerClosureList closures;
1363   // First, add closure for recv_trailing_metadata_ready.
1364   AddClosureForRecvTrailingMetadataReady(GRPC_ERROR_REF(error), &closures);
1365   // If there are deferred recv_initial_metadata_ready or recv_message_ready
1366   // callbacks, add them to closures.
1367   AddClosuresForDeferredRecvCallbacks(&closures);
1368   // Add closures to fail any pending batches that have not yet been started.
1369   AddClosuresToFailUnstartedPendingBatches(GRPC_ERROR_REF(error), &closures);
1370   // Schedule all of the closures identified above.
1371   // Note: This will release the call combiner.
1372   closures.RunClosures(call_attempt_->calld_->call_combiner_);
1373   // Don't need batch_data anymore.
1374   Unref();
1375   GRPC_ERROR_UNREF(error);
1376 }
1377 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)1378 void RetryFilter::CallData::CallAttempt::BatchData::RecvTrailingMetadataReady(
1379     void* arg, grpc_error_handle error) {
1380   CallAttempt::BatchData* batch_data =
1381       static_cast<CallAttempt::BatchData*>(arg);
1382   CallAttempt* call_attempt = batch_data->call_attempt_.get();
1383   CallData* calld = call_attempt->calld_;
1384   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1385     gpr_log(GPR_INFO,
1386             "chand=%p calld=%p: got recv_trailing_metadata_ready, error=%s",
1387             calld->chand_, calld, grpc_error_std_string(error).c_str());
1388   }
1389   call_attempt->completed_recv_trailing_metadata_ = true;
1390   // Get the call's status and check for server pushback metadata.
1391   grpc_status_code status = GRPC_STATUS_OK;
1392   grpc_mdelem* server_pushback_md = nullptr;
1393   grpc_metadata_batch* md_batch =
1394       batch_data->batch_.payload->recv_trailing_metadata.recv_trailing_metadata;
1395   bool is_lb_drop = false;
1396   GetCallStatus(calld->deadline_, md_batch, GRPC_ERROR_REF(error), &status,
1397                 &server_pushback_md, &is_lb_drop);
1398   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1399     gpr_log(
1400         GPR_INFO, "chand=%p calld=%p: call finished, status=%s is_lb_drop=%d",
1401         calld->chand_, calld, grpc_status_code_to_string(status), is_lb_drop);
1402   }
1403   // Check if we should retry.
1404   if (batch_data->MaybeRetry(status, server_pushback_md, is_lb_drop)) {
1405     // Unref batch_data for deferred recv_initial_metadata_ready or
1406     // recv_message_ready callbacks, if any.
1407     if (call_attempt->recv_initial_metadata_ready_deferred_batch_ != nullptr) {
1408       GRPC_ERROR_UNREF(call_attempt->recv_initial_metadata_error_);
1409       batch_data->Unref();
1410     }
1411     if (call_attempt->recv_message_ready_deferred_batch_ != nullptr) {
1412       GRPC_ERROR_UNREF(call_attempt->recv_message_error_);
1413       batch_data->Unref();
1414     }
1415     batch_data->Unref();
1416     return;
1417   }
1418   // Not retrying, so commit the call.
1419   calld->RetryCommit(call_attempt);
1420   // Run any necessary closures.
1421   batch_data->RunClosuresForCompletedCall(GRPC_ERROR_REF(error));
1422 }
1423 
1424 //
1425 // on_complete callback handling
1426 //
1427 
1428 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForCompletedPendingBatch(grpc_error_handle error,CallCombinerClosureList * closures)1429     AddClosuresForCompletedPendingBatch(grpc_error_handle error,
1430                                         CallCombinerClosureList* closures) {
1431   auto* calld = call_attempt_->calld_;
1432   PendingBatch* pending = calld->PendingBatchFind(
1433       "completed", [this](grpc_transport_stream_op_batch* batch) {
1434         // Match the pending batch with the same set of send ops as the
1435         // batch we've just completed.
1436         return batch->on_complete != nullptr &&
1437                batch_.send_initial_metadata == batch->send_initial_metadata &&
1438                batch_.send_message == batch->send_message &&
1439                batch_.send_trailing_metadata == batch->send_trailing_metadata;
1440       });
1441   // If batch_data is a replay batch, then there will be no pending
1442   // batch to complete.
1443   if (pending == nullptr) {
1444     GRPC_ERROR_UNREF(error);
1445     return;
1446   }
1447   // Add closure.
1448   closures->Add(pending->batch->on_complete, error,
1449                 "on_complete for pending batch");
1450   pending->batch->on_complete = nullptr;
1451   calld->MaybeClearPendingBatch(pending);
1452 }
1453 
1454 void RetryFilter::CallData::CallAttempt::BatchData::
AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList * closures)1455     AddClosuresForReplayOrPendingSendOps(CallCombinerClosureList* closures) {
1456   auto* calld = call_attempt_->calld_;
1457   // We don't check send_initial_metadata here, because that op will always
1458   // be started as soon as it is received from the surface, so it will
1459   // never need to be started at this point.
1460   bool have_pending_send_message_ops =
1461       call_attempt_->started_send_message_count_ < calld->send_messages_.size();
1462   bool have_pending_send_trailing_metadata_op =
1463       calld->seen_send_trailing_metadata_ &&
1464       !call_attempt_->started_send_trailing_metadata_;
1465   if (!have_pending_send_message_ops &&
1466       !have_pending_send_trailing_metadata_op) {
1467     for (size_t i = 0; i < GPR_ARRAY_SIZE(calld->pending_batches_); ++i) {
1468       PendingBatch* pending = &calld->pending_batches_[i];
1469       grpc_transport_stream_op_batch* batch = pending->batch;
1470       if (batch == nullptr || pending->send_ops_cached) continue;
1471       if (batch->send_message) have_pending_send_message_ops = true;
1472       if (batch->send_trailing_metadata) {
1473         have_pending_send_trailing_metadata_op = true;
1474       }
1475     }
1476   }
1477   if (have_pending_send_message_ops || have_pending_send_trailing_metadata_op) {
1478     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1479       gpr_log(GPR_INFO,
1480               "chand=%p calld=%p: starting next batch for pending send op(s)",
1481               calld->chand_, calld);
1482     }
1483     call_attempt_->AddRetriableBatches(closures);
1484   }
1485 }
1486 
OnComplete(void * arg,grpc_error_handle error)1487 void RetryFilter::CallData::CallAttempt::BatchData::OnComplete(
1488     void* arg, grpc_error_handle error) {
1489   CallAttempt::BatchData* batch_data =
1490       static_cast<CallAttempt::BatchData*>(arg);
1491   CallAttempt* call_attempt = batch_data->call_attempt_.get();
1492   CallData* calld = call_attempt->calld_;
1493   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1494     gpr_log(GPR_INFO, "chand=%p calld=%p: got on_complete, error=%s, batch=%s",
1495             calld->chand_, calld, grpc_error_std_string(error).c_str(),
1496             grpc_transport_stream_op_batch_string(&batch_data->batch_).c_str());
1497   }
1498   // Update bookkeeping in call_attempt.
1499   if (batch_data->batch_.send_initial_metadata) {
1500     call_attempt->completed_send_initial_metadata_ = true;
1501   }
1502   if (batch_data->batch_.send_message) {
1503     ++call_attempt->completed_send_message_count_;
1504   }
1505   if (batch_data->batch_.send_trailing_metadata) {
1506     call_attempt->completed_send_trailing_metadata_ = true;
1507   }
1508   // If the call is committed, free cached data for send ops that we've just
1509   // completed.
1510   if (calld->retry_committed_) {
1511     batch_data->FreeCachedSendOpDataForCompletedBatch();
1512   }
1513   // Construct list of closures to execute.
1514   CallCombinerClosureList closures;
1515   // If a retry was already dispatched, that means we saw
1516   // recv_trailing_metadata before this, so we do nothing here.
1517   // Otherwise, invoke the callback to return the result to the surface.
1518   if (!call_attempt->retry_dispatched_) {
1519     // Add closure for the completed pending batch, if any.
1520     batch_data->AddClosuresForCompletedPendingBatch(GRPC_ERROR_REF(error),
1521                                                     &closures);
1522     // If needed, add a callback to start any replay or pending send ops on
1523     // the LB call.
1524     if (!call_attempt->completed_recv_trailing_metadata_) {
1525       batch_data->AddClosuresForReplayOrPendingSendOps(&closures);
1526     }
1527   }
1528   // Track number of in-flight send batches and determine if this was the
1529   // last one.
1530   --calld->num_in_flight_call_attempt_send_batches_;
1531   const bool last_send_batch_complete =
1532       calld->num_in_flight_call_attempt_send_batches_ == 0;
1533   // Don't need batch_data anymore.
1534   batch_data->Unref();
1535   // Schedule all of the closures identified above.
1536   // Note: This yields the call combiner.
1537   closures.RunClosures(calld->call_combiner_);
1538   // If this was the last in-flight send batch, unref the call stack.
1539   if (last_send_batch_complete) {
1540     GRPC_CALL_STACK_UNREF(calld->owning_call_, "retriable_send_batches");
1541   }
1542 }
1543 
1544 //
1545 // retriable batch construction
1546 //
1547 
1548 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendInitialMetadataOp()1549     AddRetriableSendInitialMetadataOp() {
1550   auto* calld = call_attempt_->calld_;
1551   // Maps the number of retries to the corresponding metadata value slice.
1552   const grpc_slice* retry_count_strings[] = {&GRPC_MDSTR_1, &GRPC_MDSTR_2,
1553                                              &GRPC_MDSTR_3, &GRPC_MDSTR_4};
1554   // We need to make a copy of the metadata batch for each attempt, since
1555   // the filters in the subchannel stack may modify this batch, and we don't
1556   // want those modifications to be passed forward to subsequent attempts.
1557   //
1558   // If we've already completed one or more attempts, add the
1559   // grpc-retry-attempts header.
1560   call_attempt_->send_initial_metadata_storage_ =
1561       static_cast<grpc_linked_mdelem*>(
1562           calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1563                                (calld->send_initial_metadata_.list.count +
1564                                 (calld->num_attempts_completed_ > 0))));
1565   grpc_metadata_batch_copy(&calld->send_initial_metadata_,
1566                            &call_attempt_->send_initial_metadata_,
1567                            call_attempt_->send_initial_metadata_storage_);
1568   if (GPR_UNLIKELY(call_attempt_->send_initial_metadata_.idx.named
1569                        .grpc_previous_rpc_attempts != nullptr)) {
1570     grpc_metadata_batch_remove(&call_attempt_->send_initial_metadata_,
1571                                GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
1572   }
1573   if (GPR_UNLIKELY(calld->num_attempts_completed_ > 0)) {
1574     grpc_mdelem retry_md = grpc_mdelem_create(
1575         GRPC_MDSTR_GRPC_PREVIOUS_RPC_ATTEMPTS,
1576         *retry_count_strings[calld->num_attempts_completed_ - 1], nullptr);
1577     grpc_error_handle error = grpc_metadata_batch_add_tail(
1578         &call_attempt_->send_initial_metadata_,
1579         &call_attempt_->send_initial_metadata_storage_
1580              [calld->send_initial_metadata_.list.count],
1581         retry_md, GRPC_BATCH_GRPC_PREVIOUS_RPC_ATTEMPTS);
1582     if (GPR_UNLIKELY(error != GRPC_ERROR_NONE)) {
1583       gpr_log(GPR_ERROR, "error adding retry metadata: %s",
1584               grpc_error_std_string(error).c_str());
1585       GPR_ASSERT(false);
1586     }
1587   }
1588   call_attempt_->started_send_initial_metadata_ = true;
1589   batch_.send_initial_metadata = true;
1590   batch_.payload->send_initial_metadata.send_initial_metadata =
1591       &call_attempt_->send_initial_metadata_;
1592   batch_.payload->send_initial_metadata.send_initial_metadata_flags =
1593       calld->send_initial_metadata_flags_;
1594   batch_.payload->send_initial_metadata.peer_string = calld->peer_string_;
1595 }
1596 
1597 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendMessageOp()1598     AddRetriableSendMessageOp() {
1599   auto* calld = call_attempt_->calld_;
1600   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1601     gpr_log(GPR_INFO,
1602             "chand=%p calld=%p: starting calld->send_messages[%" PRIuPTR "]",
1603             calld->chand_, calld, call_attempt_->started_send_message_count_);
1604   }
1605   ByteStreamCache* cache =
1606       calld->send_messages_[call_attempt_->started_send_message_count_];
1607   ++call_attempt_->started_send_message_count_;
1608   call_attempt_->send_message_.Init(cache);
1609   batch_.send_message = true;
1610   batch_.payload->send_message.send_message.reset(
1611       call_attempt_->send_message_.get());
1612 }
1613 
1614 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableSendTrailingMetadataOp()1615     AddRetriableSendTrailingMetadataOp() {
1616   auto* calld = call_attempt_->calld_;
1617   // We need to make a copy of the metadata batch for each attempt, since
1618   // the filters in the subchannel stack may modify this batch, and we don't
1619   // want those modifications to be passed forward to subsequent attempts.
1620   call_attempt_->send_trailing_metadata_storage_ =
1621       static_cast<grpc_linked_mdelem*>(
1622           calld->arena_->Alloc(sizeof(grpc_linked_mdelem) *
1623                                calld->send_trailing_metadata_.list.count));
1624   grpc_metadata_batch_copy(&calld->send_trailing_metadata_,
1625                            &call_attempt_->send_trailing_metadata_,
1626                            call_attempt_->send_trailing_metadata_storage_);
1627   call_attempt_->started_send_trailing_metadata_ = true;
1628   batch_.send_trailing_metadata = true;
1629   batch_.payload->send_trailing_metadata.send_trailing_metadata =
1630       &call_attempt_->send_trailing_metadata_;
1631 }
1632 
1633 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvInitialMetadataOp()1634     AddRetriableRecvInitialMetadataOp() {
1635   call_attempt_->started_recv_initial_metadata_ = true;
1636   batch_.recv_initial_metadata = true;
1637   grpc_metadata_batch_init(&call_attempt_->recv_initial_metadata_);
1638   batch_.payload->recv_initial_metadata.recv_initial_metadata =
1639       &call_attempt_->recv_initial_metadata_;
1640   batch_.payload->recv_initial_metadata.trailing_metadata_available =
1641       &call_attempt_->trailing_metadata_available_;
1642   GRPC_CLOSURE_INIT(&call_attempt_->recv_initial_metadata_ready_,
1643                     RecvInitialMetadataReady, this, grpc_schedule_on_exec_ctx);
1644   batch_.payload->recv_initial_metadata.recv_initial_metadata_ready =
1645       &call_attempt_->recv_initial_metadata_ready_;
1646 }
1647 
1648 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvMessageOp()1649     AddRetriableRecvMessageOp() {
1650   ++call_attempt_->started_recv_message_count_;
1651   batch_.recv_message = true;
1652   batch_.payload->recv_message.recv_message = &call_attempt_->recv_message_;
1653   GRPC_CLOSURE_INIT(&call_attempt_->recv_message_ready_, RecvMessageReady, this,
1654                     grpc_schedule_on_exec_ctx);
1655   batch_.payload->recv_message.recv_message_ready =
1656       &call_attempt_->recv_message_ready_;
1657 }
1658 
1659 void RetryFilter::CallData::CallAttempt::BatchData::
AddRetriableRecvTrailingMetadataOp()1660     AddRetriableRecvTrailingMetadataOp() {
1661   call_attempt_->started_recv_trailing_metadata_ = true;
1662   batch_.recv_trailing_metadata = true;
1663   grpc_metadata_batch_init(&call_attempt_->recv_trailing_metadata_);
1664   batch_.payload->recv_trailing_metadata.recv_trailing_metadata =
1665       &call_attempt_->recv_trailing_metadata_;
1666   batch_.payload->recv_trailing_metadata.collect_stats =
1667       &call_attempt_->collect_stats_;
1668   GRPC_CLOSURE_INIT(&call_attempt_->recv_trailing_metadata_ready_,
1669                     RecvTrailingMetadataReady, this, grpc_schedule_on_exec_ctx);
1670   batch_.payload->recv_trailing_metadata.recv_trailing_metadata_ready =
1671       &call_attempt_->recv_trailing_metadata_ready_;
1672 }
1673 
1674 //
1675 // CallData vtable functions
1676 //
1677 
Init(grpc_call_element * elem,const grpc_call_element_args * args)1678 grpc_error_handle RetryFilter::CallData::Init(
1679     grpc_call_element* elem, const grpc_call_element_args* args) {
1680   auto* chand = static_cast<RetryFilter*>(elem->channel_data);
1681   new (elem->call_data) CallData(chand, *args);
1682   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1683     gpr_log(GPR_INFO, "chand=%p: created call=%p", chand, elem->call_data);
1684   }
1685   return GRPC_ERROR_NONE;
1686 }
1687 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure * then_schedule_closure)1688 void RetryFilter::CallData::Destroy(grpc_call_element* elem,
1689                                     const grpc_call_final_info* /*final_info*/,
1690                                     grpc_closure* then_schedule_closure) {
1691   auto* calld = static_cast<CallData*>(elem->call_data);
1692   // Save our ref to the CallStackDestructionBarrier until after our
1693   // dtor is invoked.
1694   RefCountedPtr<CallStackDestructionBarrier> call_stack_destruction_barrier =
1695       std::move(calld->call_stack_destruction_barrier_);
1696   calld->~CallData();
1697   // Now set the callback in the CallStackDestructionBarrier object,
1698   // right before we release our ref to it (implicitly upon returning).
1699   // The callback will be invoked when the CallStackDestructionBarrier
1700   // is destroyed.
1701   call_stack_destruction_barrier->set_on_call_stack_destruction(
1702       then_schedule_closure);
1703 }
1704 
StartTransportStreamOpBatch(grpc_call_element * elem,grpc_transport_stream_op_batch * batch)1705 void RetryFilter::CallData::StartTransportStreamOpBatch(
1706     grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
1707   auto* calld = static_cast<CallData*>(elem->call_data);
1708   calld->StartTransportStreamOpBatch(batch);
1709 }
1710 
SetPollent(grpc_call_element * elem,grpc_polling_entity * pollent)1711 void RetryFilter::CallData::SetPollent(grpc_call_element* elem,
1712                                        grpc_polling_entity* pollent) {
1713   auto* calld = static_cast<CallData*>(elem->call_data);
1714   calld->pollent_ = pollent;
1715 }
1716 
1717 //
1718 // CallData implementation
1719 //
1720 
GetRetryPolicy(const grpc_call_context_element * context)1721 const RetryMethodConfig* GetRetryPolicy(
1722     const grpc_call_context_element* context) {
1723   if (context == nullptr) return nullptr;
1724   auto* svc_cfg_call_data = static_cast<ServiceConfigCallData*>(
1725       context[GRPC_CONTEXT_SERVICE_CONFIG_CALL_DATA].value);
1726   if (svc_cfg_call_data == nullptr) return nullptr;
1727   return static_cast<const RetryMethodConfig*>(
1728       svc_cfg_call_data->GetMethodParsedConfig(
1729           RetryServiceConfigParser::ParserIndex()));
1730 }
1731 
CallData(RetryFilter * chand,const grpc_call_element_args & args)1732 RetryFilter::CallData::CallData(RetryFilter* chand,
1733                                 const grpc_call_element_args& args)
1734     : chand_(chand),
1735       retry_throttle_data_(chand->retry_throttle_data_),
1736       retry_policy_(GetRetryPolicy(args.context)),
1737       retry_backoff_(
1738           BackOff::Options()
1739               .set_initial_backoff(retry_policy_ == nullptr
1740                                        ? 0
1741                                        : retry_policy_->initial_backoff())
1742               .set_multiplier(retry_policy_ == nullptr
1743                                   ? 0
1744                                   : retry_policy_->backoff_multiplier())
1745               .set_jitter(RETRY_BACKOFF_JITTER)
1746               .set_max_backoff(
1747                   retry_policy_ == nullptr ? 0 : retry_policy_->max_backoff())),
1748       path_(grpc_slice_ref_internal(args.path)),
1749       call_start_time_(args.start_time),
1750       deadline_(args.deadline),
1751       arena_(args.arena),
1752       owning_call_(args.call_stack),
1753       call_combiner_(args.call_combiner),
1754       call_context_(args.context),
1755       call_stack_destruction_barrier_(
1756           arena_->New<CallStackDestructionBarrier>()),
1757       pending_send_initial_metadata_(false),
1758       pending_send_message_(false),
1759       pending_send_trailing_metadata_(false),
1760       retry_committed_(false),
1761       last_attempt_got_server_pushback_(false) {}
1762 
~CallData()1763 RetryFilter::CallData::~CallData() {
1764   grpc_slice_unref_internal(path_);
1765   // Make sure there are no remaining pending batches.
1766   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
1767     GPR_ASSERT(pending_batches_[i].batch == nullptr);
1768   }
1769 }
1770 
StartTransportStreamOpBatch(grpc_transport_stream_op_batch * batch)1771 void RetryFilter::CallData::StartTransportStreamOpBatch(
1772     grpc_transport_stream_op_batch* batch) {
1773   // If we have an LB call, delegate to the LB call.
1774   if (committed_call_ != nullptr) {
1775     // Note: This will release the call combiner.
1776     committed_call_->StartTransportStreamOpBatch(batch);
1777     return;
1778   }
1779   // Handle cancellation.
1780   if (GPR_UNLIKELY(batch->cancel_stream)) {
1781     grpc_error_handle cancel_error = batch->payload->cancel_stream.cancel_error;
1782     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1783       gpr_log(GPR_INFO, "chand=%p calld=%p: cancelled from surface: %s", chand_,
1784               this, grpc_error_std_string(cancel_error).c_str());
1785     }
1786     // If we have a current call attempt, commit the call, then send
1787     // the cancellation down to that attempt.  When the call fails, it
1788     // will not be retried, because we have committed it here.
1789     if (call_attempt_ != nullptr) {
1790       RetryCommit(call_attempt_.get());
1791       // Note: This will release the call combiner.
1792       call_attempt_->lb_call()->StartTransportStreamOpBatch(batch);
1793       return;
1794     }
1795     // Fail pending batches.
1796     PendingBatchesFail(GRPC_ERROR_REF(cancel_error));
1797     // Note: This will release the call combiner.
1798     grpc_transport_stream_op_batch_finish_with_failure(
1799         batch, GRPC_ERROR_REF(cancel_error), call_combiner_);
1800     return;
1801   }
1802   // Add the batch to the pending list.
1803   PendingBatch* pending = PendingBatchesAdd(batch);
1804   if (call_attempt_ == nullptr) {
1805     // If this is the first batch and retries are already committed
1806     // (e.g., if this batch put the call above the buffer size limit), then
1807     // immediately create an LB call and delegate the batch to it.  This
1808     // avoids the overhead of unnecessarily allocating a CallAttempt
1809     // object or caching any of the send op data.
1810     if (num_attempts_completed_ == 0 && retry_committed_) {
1811       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1812         gpr_log(GPR_INFO,
1813                 "chand=%p calld=%p: retry committed before first attempt; "
1814                 "creating LB call",
1815                 chand_, this);
1816       }
1817       PendingBatchClear(pending);
1818       committed_call_ = CreateLoadBalancedCall();
1819       committed_call_->StartTransportStreamOpBatch(batch);
1820       return;
1821     }
1822     // We do not yet have a call attempt, so create one.
1823     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1824       gpr_log(GPR_INFO, "chand=%p calld=%p: creating call attempt", chand_,
1825               this);
1826     }
1827     CreateCallAttempt();
1828     return;
1829   }
1830   // Send batches to call attempt.
1831   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1832     gpr_log(GPR_INFO,
1833             "chand=%p calld=%p: starting batch on attempt=%p lb_call=%p",
1834             chand_, this, call_attempt_.get(), call_attempt_->lb_call());
1835   }
1836   call_attempt_->StartRetriableBatches();
1837 }
1838 
1839 RefCountedPtr<ClientChannel::LoadBalancedCall>
CreateLoadBalancedCall()1840 RetryFilter::CallData::CreateLoadBalancedCall() {
1841   grpc_call_element_args args = {owning_call_, nullptr,          call_context_,
1842                                  path_,        call_start_time_, deadline_,
1843                                  arena_,       call_combiner_};
1844   return chand_->client_channel_->CreateLoadBalancedCall(
1845       args, pollent_,
1846       // This callback holds a ref to the CallStackDestructionBarrier
1847       // object until the LB call is destroyed.
1848       call_stack_destruction_barrier_->MakeLbCallDestructionClosure(this));
1849 }
1850 
CreateCallAttempt()1851 void RetryFilter::CallData::CreateCallAttempt() {
1852   call_attempt_.reset(arena_->New<CallAttempt>(this));
1853   call_attempt_->StartRetriableBatches();
1854   // TODO(roth): When implementing hedging, change this to start a timer
1855   // for the next hedging attempt.
1856 }
1857 
1858 namespace {
1859 
StartBatchInCallCombiner(void * arg,grpc_error_handle)1860 void StartBatchInCallCombiner(void* arg, grpc_error_handle /*ignored*/) {
1861   grpc_transport_stream_op_batch* batch =
1862       static_cast<grpc_transport_stream_op_batch*>(arg);
1863   auto* lb_call = static_cast<ClientChannel::LoadBalancedCall*>(
1864       batch->handler_private.extra_arg);
1865   // Note: This will release the call combiner.
1866   lb_call->StartTransportStreamOpBatch(batch);
1867 }
1868 
1869 }  // namespace
1870 
AddClosureForBatch(grpc_transport_stream_op_batch * batch,CallCombinerClosureList * closures)1871 void RetryFilter::CallData::AddClosureForBatch(
1872     grpc_transport_stream_op_batch* batch, CallCombinerClosureList* closures) {
1873   batch->handler_private.extra_arg = call_attempt_->lb_call();
1874   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
1875                     batch, grpc_schedule_on_exec_ctx);
1876   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1877     gpr_log(GPR_INFO, "chand=%p calld=%p: starting batch on LB call: %s",
1878             chand_, this, grpc_transport_stream_op_batch_string(batch).c_str());
1879   }
1880   closures->Add(&batch->handler_private.closure, GRPC_ERROR_NONE,
1881                 "start_batch_on_lb_call");
1882 }
1883 
1884 //
1885 // send op data caching
1886 //
1887 
MaybeCacheSendOpsForBatch(PendingBatch * pending)1888 void RetryFilter::CallData::MaybeCacheSendOpsForBatch(PendingBatch* pending) {
1889   if (pending->send_ops_cached) return;
1890   pending->send_ops_cached = true;
1891   grpc_transport_stream_op_batch* batch = pending->batch;
1892   // Save a copy of metadata for send_initial_metadata ops.
1893   if (batch->send_initial_metadata) {
1894     seen_send_initial_metadata_ = true;
1895     GPR_ASSERT(send_initial_metadata_storage_ == nullptr);
1896     grpc_metadata_batch* send_initial_metadata =
1897         batch->payload->send_initial_metadata.send_initial_metadata;
1898     send_initial_metadata_storage_ =
1899         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
1900             sizeof(grpc_linked_mdelem) * send_initial_metadata->list.count));
1901     grpc_metadata_batch_copy(send_initial_metadata, &send_initial_metadata_,
1902                              send_initial_metadata_storage_);
1903     send_initial_metadata_flags_ =
1904         batch->payload->send_initial_metadata.send_initial_metadata_flags;
1905     peer_string_ = batch->payload->send_initial_metadata.peer_string;
1906   }
1907   // Set up cache for send_message ops.
1908   if (batch->send_message) {
1909     ByteStreamCache* cache = arena_->New<ByteStreamCache>(
1910         std::move(batch->payload->send_message.send_message));
1911     send_messages_.push_back(cache);
1912   }
1913   // Save metadata batch for send_trailing_metadata ops.
1914   if (batch->send_trailing_metadata) {
1915     seen_send_trailing_metadata_ = true;
1916     GPR_ASSERT(send_trailing_metadata_storage_ == nullptr);
1917     grpc_metadata_batch* send_trailing_metadata =
1918         batch->payload->send_trailing_metadata.send_trailing_metadata;
1919     send_trailing_metadata_storage_ =
1920         static_cast<grpc_linked_mdelem*>(arena_->Alloc(
1921             sizeof(grpc_linked_mdelem) * send_trailing_metadata->list.count));
1922     grpc_metadata_batch_copy(send_trailing_metadata, &send_trailing_metadata_,
1923                              send_trailing_metadata_storage_);
1924   }
1925 }
1926 
FreeCachedSendInitialMetadata()1927 void RetryFilter::CallData::FreeCachedSendInitialMetadata() {
1928   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1929     gpr_log(GPR_INFO, "chand=%p calld=%p: destroying send_initial_metadata",
1930             chand_, this);
1931   }
1932   grpc_metadata_batch_destroy(&send_initial_metadata_);
1933 }
1934 
FreeCachedSendMessage(size_t idx)1935 void RetryFilter::CallData::FreeCachedSendMessage(size_t idx) {
1936   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1937     gpr_log(GPR_INFO,
1938             "chand=%p calld=%p: destroying send_messages[%" PRIuPTR "]", chand_,
1939             this, idx);
1940   }
1941   send_messages_[idx]->Destroy();
1942 }
1943 
FreeCachedSendTrailingMetadata()1944 void RetryFilter::CallData::FreeCachedSendTrailingMetadata() {
1945   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1946     gpr_log(GPR_INFO, "chand_=%p calld=%p: destroying send_trailing_metadata",
1947             chand_, this);
1948   }
1949   grpc_metadata_batch_destroy(&send_trailing_metadata_);
1950 }
1951 
FreeAllCachedSendOpData()1952 void RetryFilter::CallData::FreeAllCachedSendOpData() {
1953   if (seen_send_initial_metadata_) {
1954     FreeCachedSendInitialMetadata();
1955   }
1956   for (size_t i = 0; i < send_messages_.size(); ++i) {
1957     FreeCachedSendMessage(i);
1958   }
1959   if (seen_send_trailing_metadata_) {
1960     FreeCachedSendTrailingMetadata();
1961   }
1962 }
1963 
1964 //
1965 // pending_batches management
1966 //
1967 
GetBatchIndex(grpc_transport_stream_op_batch * batch)1968 size_t RetryFilter::CallData::GetBatchIndex(
1969     grpc_transport_stream_op_batch* batch) {
1970   if (batch->send_initial_metadata) return 0;
1971   if (batch->send_message) return 1;
1972   if (batch->send_trailing_metadata) return 2;
1973   if (batch->recv_initial_metadata) return 3;
1974   if (batch->recv_message) return 4;
1975   if (batch->recv_trailing_metadata) return 5;
1976   GPR_UNREACHABLE_CODE(return (size_t)-1);
1977 }
1978 
1979 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesAdd(grpc_transport_stream_op_batch * batch)1980 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchesAdd(
1981     grpc_transport_stream_op_batch* batch) {
1982   const size_t idx = GetBatchIndex(batch);
1983   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
1984     gpr_log(GPR_INFO,
1985             "chand_=%p calld=%p: adding pending batch at index %" PRIuPTR,
1986             chand_, this, idx);
1987   }
1988   PendingBatch* pending = &pending_batches_[idx];
1989   GPR_ASSERT(pending->batch == nullptr);
1990   pending->batch = batch;
1991   pending->send_ops_cached = false;
1992   // Update state in calld about pending batches.
1993   // Also check if the batch takes us over the retry buffer limit.
1994   // Note: We don't check the size of trailing metadata here, because
1995   // gRPC clients do not send trailing metadata.
1996   if (batch->send_initial_metadata) {
1997     pending_send_initial_metadata_ = true;
1998     bytes_buffered_for_retry_ += grpc_metadata_batch_size(
1999         batch->payload->send_initial_metadata.send_initial_metadata);
2000   }
2001   if (batch->send_message) {
2002     pending_send_message_ = true;
2003     bytes_buffered_for_retry_ +=
2004         batch->payload->send_message.send_message->length();
2005   }
2006   if (batch->send_trailing_metadata) {
2007     pending_send_trailing_metadata_ = true;
2008   }
2009   if (GPR_UNLIKELY(bytes_buffered_for_retry_ >
2010                    chand_->per_rpc_retry_buffer_size_)) {
2011     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2012       gpr_log(GPR_INFO,
2013               "chand=%p calld=%p: exceeded retry buffer size, committing",
2014               chand_, this);
2015     }
2016     RetryCommit(call_attempt_.get());
2017   }
2018   return pending;
2019 }
2020 
PendingBatchClear(PendingBatch * pending)2021 void RetryFilter::CallData::PendingBatchClear(PendingBatch* pending) {
2022   if (pending->batch->send_initial_metadata) {
2023     pending_send_initial_metadata_ = false;
2024   }
2025   if (pending->batch->send_message) {
2026     pending_send_message_ = false;
2027   }
2028   if (pending->batch->send_trailing_metadata) {
2029     pending_send_trailing_metadata_ = false;
2030   }
2031   pending->batch = nullptr;
2032 }
2033 
MaybeClearPendingBatch(PendingBatch * pending)2034 void RetryFilter::CallData::MaybeClearPendingBatch(PendingBatch* pending) {
2035   grpc_transport_stream_op_batch* batch = pending->batch;
2036   // We clear the pending batch if all of its callbacks have been
2037   // scheduled and reset to nullptr.
2038   if (batch->on_complete == nullptr &&
2039       (!batch->recv_initial_metadata ||
2040        batch->payload->recv_initial_metadata.recv_initial_metadata_ready ==
2041            nullptr) &&
2042       (!batch->recv_message ||
2043        batch->payload->recv_message.recv_message_ready == nullptr) &&
2044       (!batch->recv_trailing_metadata ||
2045        batch->payload->recv_trailing_metadata.recv_trailing_metadata_ready ==
2046            nullptr)) {
2047     if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2048       gpr_log(GPR_INFO, "chand=%p calld=%p: clearing pending batch", chand_,
2049               this);
2050     }
2051     PendingBatchClear(pending);
2052   }
2053 }
2054 
2055 // This is called via the call combiner, so access to calld is synchronized.
FailPendingBatchInCallCombiner(void * arg,grpc_error_handle error)2056 void RetryFilter::CallData::FailPendingBatchInCallCombiner(
2057     void* arg, grpc_error_handle error) {
2058   grpc_transport_stream_op_batch* batch =
2059       static_cast<grpc_transport_stream_op_batch*>(arg);
2060   CallData* call = static_cast<CallData*>(batch->handler_private.extra_arg);
2061   // Note: This will release the call combiner.
2062   grpc_transport_stream_op_batch_finish_with_failure(
2063       batch, GRPC_ERROR_REF(error), call->call_combiner_);
2064 }
2065 
2066 // This is called via the call combiner, so access to calld is synchronized.
PendingBatchesFail(grpc_error_handle error)2067 void RetryFilter::CallData::PendingBatchesFail(grpc_error_handle error) {
2068   GPR_ASSERT(error != GRPC_ERROR_NONE);
2069   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2070     size_t num_batches = 0;
2071     for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2072       if (pending_batches_[i].batch != nullptr) ++num_batches;
2073     }
2074     gpr_log(GPR_INFO,
2075             "chand=%p calld=%p: failing %" PRIuPTR " pending batches: %s",
2076             chand_, this, num_batches, grpc_error_std_string(error).c_str());
2077   }
2078   CallCombinerClosureList closures;
2079   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2080     PendingBatch* pending = &pending_batches_[i];
2081     grpc_transport_stream_op_batch* batch = pending->batch;
2082     if (batch != nullptr) {
2083       batch->handler_private.extra_arg = this;
2084       GRPC_CLOSURE_INIT(&batch->handler_private.closure,
2085                         FailPendingBatchInCallCombiner, batch,
2086                         grpc_schedule_on_exec_ctx);
2087       closures.Add(&batch->handler_private.closure, GRPC_ERROR_REF(error),
2088                    "PendingBatchesFail");
2089       PendingBatchClear(pending);
2090     }
2091   }
2092   closures.RunClosuresWithoutYielding(call_combiner_);
2093   GRPC_ERROR_UNREF(error);
2094 }
2095 
2096 template <typename Predicate>
PendingBatchFind(const char * log_message,Predicate predicate)2097 RetryFilter::CallData::PendingBatch* RetryFilter::CallData::PendingBatchFind(
2098     const char* log_message, Predicate predicate) {
2099   for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
2100     PendingBatch* pending = &pending_batches_[i];
2101     grpc_transport_stream_op_batch* batch = pending->batch;
2102     if (batch != nullptr && predicate(batch)) {
2103       if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2104         gpr_log(GPR_INFO,
2105                 "chand=%p calld=%p: %s pending batch at index %" PRIuPTR,
2106                 chand_, this, log_message, i);
2107       }
2108       return pending;
2109     }
2110   }
2111   return nullptr;
2112 }
2113 
2114 //
2115 // retry code
2116 //
2117 
RetryCommit(CallAttempt * call_attempt)2118 void RetryFilter::CallData::RetryCommit(CallAttempt* call_attempt) {
2119   if (retry_committed_) return;
2120   retry_committed_ = true;
2121   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2122     gpr_log(GPR_INFO, "chand=%p calld=%p: committing retries", chand_, this);
2123   }
2124   if (call_attempt != nullptr) {
2125     call_attempt->FreeCachedSendOpDataAfterCommit();
2126   }
2127 }
2128 
DoRetry(grpc_millis server_pushback_ms)2129 void RetryFilter::CallData::DoRetry(grpc_millis server_pushback_ms) {
2130   // Reset call attempt.
2131   call_attempt_.reset();
2132   // Compute backoff delay.
2133   grpc_millis next_attempt_time;
2134   if (server_pushback_ms >= 0) {
2135     next_attempt_time = ExecCtx::Get()->Now() + server_pushback_ms;
2136     last_attempt_got_server_pushback_ = true;
2137   } else {
2138     if (num_attempts_completed_ == 1 || last_attempt_got_server_pushback_) {
2139       last_attempt_got_server_pushback_ = false;
2140     }
2141     next_attempt_time = retry_backoff_.NextAttemptTime();
2142   }
2143   if (GRPC_TRACE_FLAG_ENABLED(grpc_retry_trace)) {
2144     gpr_log(GPR_INFO,
2145             "chand=%p calld=%p: retrying failed call in %" PRId64 " ms", chand_,
2146             this, next_attempt_time - ExecCtx::Get()->Now());
2147   }
2148   // Schedule retry after computed delay.
2149   GRPC_CLOSURE_INIT(&retry_closure_, OnRetryTimer, this, nullptr);
2150   GRPC_CALL_STACK_REF(owning_call_, "OnRetryTimer");
2151   MutexLock lock(&timer_mu_);
2152   canceller_ = new Canceller(this);
2153   grpc_timer_init(&retry_timer_, next_attempt_time, &retry_closure_);
2154 }
2155 
OnRetryTimer(void * arg,grpc_error_handle error)2156 void RetryFilter::CallData::OnRetryTimer(void* arg, grpc_error_handle error) {
2157   auto* calld = static_cast<CallData*>(arg);
2158   if (error == GRPC_ERROR_NONE) {
2159     bool start_attempt = false;
2160     {
2161       MutexLock lock(&calld->timer_mu_);
2162       if (calld->canceller_ != nullptr) {
2163         calld->canceller_ = nullptr;
2164         start_attempt = true;
2165       }
2166     }
2167     if (start_attempt) calld->CreateCallAttempt();
2168   }
2169   GRPC_CALL_STACK_UNREF(calld->owning_call_, "OnRetryTimer");
2170 }
2171 
2172 }  // namespace
2173 
2174 const grpc_channel_filter kRetryFilterVtable = {
2175     RetryFilter::CallData::StartTransportStreamOpBatch,
2176     RetryFilter::StartTransportOp,
2177     sizeof(RetryFilter::CallData),
2178     RetryFilter::CallData::Init,
2179     RetryFilter::CallData::SetPollent,
2180     RetryFilter::CallData::Destroy,
2181     sizeof(RetryFilter),
2182     RetryFilter::Init,
2183     RetryFilter::Destroy,
2184     RetryFilter::GetChannelInfo,
2185     "retry_filter",
2186 };
2187 
2188 }  // namespace grpc_core
2189