• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/client_channel/retry_interceptor.h"
16 
17 #include "src/core/lib/promise/cancel_callback.h"
18 #include "src/core/lib/promise/for_each.h"
19 #include "src/core/lib/promise/map.h"
20 #include "src/core/lib/promise/sleep.h"
21 #include "src/core/service_config/service_config_call_data.h"
22 
23 namespace grpc_core {
24 
25 namespace {
GetMaxPerRpcRetryBufferSize(const ChannelArgs & args)26 size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
27   // By default, we buffer 256 KiB per RPC for retries.
28   // TODO(roth): Do we have any data to suggest a better value?
29   static constexpr int kDefaultPerRpcRetryBufferSize = (256 << 10);
30   return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
31                    .value_or(kDefaultPerRpcRetryBufferSize),
32                0, INT_MAX);
33 }
34 }  // namespace
35 
36 namespace retry_detail {
37 
RetryState(const internal::RetryMethodConfig * retry_policy,RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)38 RetryState::RetryState(
39     const internal::RetryMethodConfig* retry_policy,
40     RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
41     : retry_policy_(retry_policy),
42       retry_throttle_data_(std::move(retry_throttle_data)),
43       retry_backoff_(
44           BackOff::Options()
45               .set_initial_backoff(retry_policy_ == nullptr
46                                        ? Duration::Zero()
47                                        : retry_policy_->initial_backoff())
48               .set_multiplier(retry_policy_ == nullptr
49                                   ? 0
50                                   : retry_policy_->backoff_multiplier())
51               // This value was picked arbitrarily.  It can be changed if
52               // there is any even moderately compelling reason to do so.
53               .set_jitter(0.2)
54               .set_max_backoff(retry_policy_ == nullptr
55                                    ? Duration::Zero()
56                                    : retry_policy_->max_backoff())) {}
57 
ShouldRetry(const ServerMetadata & md,bool committed,absl::FunctionRef<std::string ()> lazy_attempt_debug_string)58 absl::optional<Duration> RetryState::ShouldRetry(
59     const ServerMetadata& md, bool committed,
60     absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
61   // If no retry policy, don't retry.
62   if (retry_policy_ == nullptr) {
63     GRPC_TRACE_LOG(retry, INFO)
64         << lazy_attempt_debug_string() << " no retry policy";
65     return absl::nullopt;
66   }
67   const auto status = md.get(GrpcStatusMetadata());
68   if (status.has_value()) {
69     if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
70       if (retry_throttle_data_ != nullptr) {
71         retry_throttle_data_->RecordSuccess();
72       }
73       GRPC_TRACE_LOG(retry, INFO)
74           << lazy_attempt_debug_string() << " call succeeded";
75       return absl::nullopt;
76     }
77     // Status is not OK.  Check whether the status is retryable.
78     if (!retry_policy_->retryable_status_codes().Contains(*status)) {
79       GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status "
80                                   << grpc_status_code_to_string(*status)
81                                   << " not configured as retryable";
82       return absl::nullopt;
83     }
84   }
85   // Record the failure and check whether retries are throttled.
86   // Note that it's important for this check to come after the status
87   // code check above, since we should only record failures whose statuses
88   // match the configured retryable status codes, so that we don't count
89   // things like failures due to malformed requests (INVALID_ARGUMENT).
90   // Conversely, it's important for this to come before the remaining
91   // checks, so that we don't fail to record failures due to other factors.
92   if (retry_throttle_data_ != nullptr &&
93       !retry_throttle_data_->RecordFailure()) {
94     GRPC_TRACE_LOG(retry, INFO)
95         << lazy_attempt_debug_string() << " retries throttled";
96     return absl::nullopt;
97   }
98   // Check whether the call is committed.
99   if (committed) {
100     GRPC_TRACE_LOG(retry, INFO)
101         << lazy_attempt_debug_string() << " retries already committed";
102     return absl::nullopt;
103   }
104   // Check whether we have retries remaining.
105   ++num_attempts_completed_;
106   if (num_attempts_completed_ >= retry_policy_->max_attempts()) {
107     GRPC_TRACE_LOG(retry, INFO)
108         << lazy_attempt_debug_string() << " exceeded "
109         << retry_policy_->max_attempts() << " retry attempts";
110     return absl::nullopt;
111   }
112   // Check server push-back.
113   const auto server_pushback = md.get(GrpcRetryPushbackMsMetadata());
114   if (server_pushback.has_value() && server_pushback < Duration::Zero()) {
115     GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string()
116                                 << " not retrying due to server push-back";
117     return absl::nullopt;
118   }
119   // We should retry.
120   Duration next_attempt_timeout;
121   if (server_pushback.has_value()) {
122     CHECK_GE(*server_pushback, Duration::Zero());
123     next_attempt_timeout = *server_pushback;
124     retry_backoff_.Reset();
125   } else {
126     next_attempt_timeout = retry_backoff_.NextAttemptDelay();
127   }
128   GRPC_TRACE_LOG(retry, INFO)
129       << lazy_attempt_debug_string() << " server push-back: retry in "
130       << next_attempt_timeout;
131   return next_attempt_timeout;
132 }
133 
134 absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>>
ServerRetryThrottleDataFromChannelArgs(const ChannelArgs & args)135 ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args) {
136   // Get retry throttling parameters from service config.
137   auto* service_config = args.GetObject<ServiceConfig>();
138   if (service_config == nullptr) return nullptr;
139   const auto* config = static_cast<const internal::RetryGlobalConfig*>(
140       service_config->GetGlobalParsedConfig(
141           internal::RetryServiceConfigParser::ParserIndex()));
142   if (config == nullptr) return nullptr;
143   // Get server name from target URI.
144   auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
145   if (!server_uri.has_value()) {
146     return GRPC_ERROR_CREATE(
147         "server URI channel arg missing or wrong type in client channel "
148         "filter");
149   }
150   absl::StatusOr<URI> uri = URI::Parse(*server_uri);
151   if (!uri.ok() || uri->path().empty()) {
152     return GRPC_ERROR_CREATE("could not extract server name from target URI");
153   }
154   std::string server_name(absl::StripPrefix(uri->path(), "/"));
155   // Get throttling config for server_name.
156   return internal::ServerRetryThrottleMap::Get()->GetDataForServer(
157       server_name, config->max_milli_tokens(), config->milli_token_ratio());
158 }
159 
160 }  // namespace retry_detail
161 
162 ////////////////////////////////////////////////////////////////////////////////
163 // RetryInterceptor
164 
Create(const ChannelArgs & args,const FilterArgs &)165 absl::StatusOr<RefCountedPtr<RetryInterceptor>> RetryInterceptor::Create(
166     const ChannelArgs& args, const FilterArgs&) {
167   auto retry_throttle_data =
168       retry_detail::ServerRetryThrottleDataFromChannelArgs(args);
169   if (!retry_throttle_data.ok()) {
170     return retry_throttle_data.status();
171   }
172   return MakeRefCounted<RetryInterceptor>(args,
173                                           std::move(*retry_throttle_data));
174 }
175 
RetryInterceptor(const ChannelArgs & args,RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)176 RetryInterceptor::RetryInterceptor(
177     const ChannelArgs& args,
178     RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
179     : per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
180       service_config_parser_index_(
181           internal::RetryServiceConfigParser::ParserIndex()),
182       retry_throttle_data_(std::move(retry_throttle_data)) {}
183 
InterceptCall(UnstartedCallHandler unstarted_call_handler)184 void RetryInterceptor::InterceptCall(
185     UnstartedCallHandler unstarted_call_handler) {
186   auto call_handler = unstarted_call_handler.StartCall();
187   auto* arena = call_handler.arena();
188   auto call = arena->MakeRefCounted<Call>(RefAsSubclass<RetryInterceptor>(),
189                                           std::move(call_handler));
190   call->StartAttempt();
191   call->Start();
192 }
193 
GetRetryPolicy()194 const internal::RetryMethodConfig* RetryInterceptor::GetRetryPolicy() {
195   auto* svc_cfg_call_data = MaybeGetContext<ServiceConfigCallData>();
196   if (svc_cfg_call_data == nullptr) return nullptr;
197   return static_cast<const internal::RetryMethodConfig*>(
198       svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index_));
199 }
200 
201 ////////////////////////////////////////////////////////////////////////////////
202 // RetryInterceptor::Call
203 
Call(RefCountedPtr<RetryInterceptor> interceptor,CallHandler call_handler)204 RetryInterceptor::Call::Call(RefCountedPtr<RetryInterceptor> interceptor,
205                              CallHandler call_handler)
206     : call_handler_(std::move(call_handler)),
207       interceptor_(std::move(interceptor)),
208       retry_state_(interceptor_->GetRetryPolicy(),
209                    interceptor_->retry_throttle_data_) {
210   GRPC_TRACE_LOG(retry, INFO)
211       << DebugTag() << " retry call created: " << retry_state_;
212 }
213 
ClientToBuffer()214 auto RetryInterceptor::Call::ClientToBuffer() {
215   return TrySeq(
216       call_handler_.PullClientInitialMetadata(),
217       [self = Ref()](ClientMetadataHandle metadata) mutable {
218         GRPC_TRACE_LOG(retry, INFO)
219             << self->DebugTag()
220             << " got client initial metadata: " << metadata->DebugString();
221         return self->request_buffer_.PushClientInitialMetadata(
222             std::move(metadata));
223       },
224       [self = Ref()](size_t buffered) {
225         self->MaybeCommit(buffered);
226         return ForEach(
227             MessagesFrom(self->call_handler_), [self](MessageHandle message) {
228               GRPC_TRACE_LOG(retry, INFO)
229                   << self->DebugTag() << " got client message "
230                   << message->DebugString();
231               return TrySeq(
232                   self->request_buffer_.PushMessage(std::move(message)),
233                   [self](size_t buffered) {
234                     self->MaybeCommit(buffered);
235                     return absl::OkStatus();
236                   });
237             });
238       });
239 }
240 
Start()241 void RetryInterceptor::Call::Start() {
242   call_handler_.SpawnGuarded("client_to_buffer", [self = Ref()]() {
243     return OnCancel(Map(self->ClientToBuffer(),
244                         [self](absl::Status status) {
245                           if (status.ok()) {
246                             self->request_buffer_.FinishSends();
247                           } else {
248                             self->request_buffer_.Cancel(status);
249                           }
250                           return status;
251                         }),
252                     [self]() { self->request_buffer_.Cancel(); });
253   });
254 }
255 
StartAttempt()256 void RetryInterceptor::Call::StartAttempt() {
257   if (current_attempt_ != nullptr) {
258     current_attempt_->Cancel();
259   }
260   auto current_attempt = call_handler_.arena()->MakeRefCounted<Attempt>(Ref());
261   current_attempt_ = current_attempt.get();
262   current_attempt->Start();
263 }
264 
MaybeCommit(size_t buffered)265 void RetryInterceptor::Call::MaybeCommit(size_t buffered) {
266   GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " buffered:" << buffered << "/"
267                               << interceptor_->per_rpc_retry_buffer_size_;
268   if (buffered >= interceptor_->per_rpc_retry_buffer_size_) {
269     std::ignore = current_attempt_->Commit();
270   }
271 }
272 
DebugTag()273 std::string RetryInterceptor::Call::DebugTag() {
274   return absl::StrFormat("%s call:%p", Activity::current()->DebugTag(), this);
275 }
276 
277 ////////////////////////////////////////////////////////////////////////////////
278 // RetryInterceptor::Attempt
279 
Attempt(RefCountedPtr<Call> call)280 RetryInterceptor::Attempt::Attempt(RefCountedPtr<Call> call)
281     : call_(std::move(call)), reader_(call_->request_buffer()) {
282   GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " retry attempt created";
283 }
284 
~Attempt()285 RetryInterceptor::Attempt::~Attempt() { call_->RemoveAttempt(this); }
286 
ServerToClientGotInitialMetadata(ServerMetadataHandle md)287 auto RetryInterceptor::Attempt::ServerToClientGotInitialMetadata(
288     ServerMetadataHandle md) {
289   GRPC_TRACE_LOG(retry, INFO)
290       << DebugTag() << " get server initial metadata " << md->DebugString();
291   const bool committed = Commit();
292   return If(
293       committed,
294       [&]() {
295         call_->call_handler()->SpawnPushServerInitialMetadata(std::move(md));
296         return Seq(ForEach(MessagesFrom(initiator_),
297                            [call = call_](MessageHandle message) {
298                              GRPC_TRACE_LOG(retry, INFO)
299                                  << call->DebugTag() << " got server message "
300                                  << message->DebugString();
301                              return call->call_handler()->SpawnPushMessage(
302                                  std::move(message));
303                            }),
304                    initiator_.PullServerTrailingMetadata(),
305                    [call = call_](ServerMetadataHandle md) {
306                      GRPC_TRACE_LOG(retry, INFO)
307                          << call->DebugTag()
308                          << " got server trailing metadata: "
309                          << md->DebugString();
310                      call->call_handler()->SpawnPushServerTrailingMetadata(
311                          std::move(md));
312                      return absl::OkStatus();
313                    });
314       },
315       [&]() { return []() { return absl::CancelledError(); }; });
316 }
317 
ServerToClientGotTrailersOnlyResponse()318 auto RetryInterceptor::Attempt::ServerToClientGotTrailersOnlyResponse() {
319   GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " got trailers only response";
320   return Seq(
321       initiator_.PullServerTrailingMetadata(),
322       [self = Ref()](ServerMetadataHandle md) {
323         GRPC_TRACE_LOG(retry, INFO)
324             << self->DebugTag()
325             << " got server trailing metadata: " << md->DebugString();
326         auto delay = self->call_->ShouldRetry(
327             *md,
328             [self = self.get()]() -> std::string { return self->DebugTag(); });
329         return If(
330             delay.has_value(),
331             [self, delay]() {
332               return Map(Sleep(*delay), [call = self->call_](absl::Status) {
333                 call->StartAttempt();
334                 return absl::OkStatus();
335               });
336             },
337             [self, md = std::move(md)]() mutable {
338               if (!self->Commit()) return absl::CancelledError();
339               self->call_->call_handler()->SpawnPushServerTrailingMetadata(
340                   std::move(md));
341               return absl::OkStatus();
342             });
343       });
344 }
345 
ServerToClient()346 auto RetryInterceptor::Attempt::ServerToClient() {
347   return TrySeq(
348       initiator_.PullServerInitialMetadata(),
349       [self = Ref()](absl::optional<ServerMetadataHandle> metadata) {
350         const bool has_md = metadata.has_value();
351         return If(
352             has_md,
353             [self = self.get(), md = std::move(metadata)]() mutable {
354               return self->ServerToClientGotInitialMetadata(std::move(*md));
355             },
356             [self = self.get()]() {
357               return self->ServerToClientGotTrailersOnlyResponse();
358             });
359       });
360 }
361 
Commit(SourceLocation whence)362 bool RetryInterceptor::Attempt::Commit(SourceLocation whence) {
363   if (committed_) return true;
364   GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " commit attempt from "
365                               << whence.file() << ":" << whence.line();
366   if (!call_->IsCurrentAttempt(this)) return false;
367   committed_ = true;
368   call_->request_buffer()->Commit(reader());
369   return true;
370 }
371 
ClientToServer()372 auto RetryInterceptor::Attempt::ClientToServer() {
373   return TrySeq(
374       reader_.PullClientInitialMetadata(),
375       [self = Ref()](ClientMetadataHandle metadata) {
376         int num_attempts_completed = self->call_->num_attempts_completed();
377         if (GPR_UNLIKELY(num_attempts_completed > 0)) {
378           metadata->Set(GrpcPreviousRpcAttemptsMetadata(),
379                         num_attempts_completed);
380         } else {
381           metadata->Remove(GrpcPreviousRpcAttemptsMetadata());
382         }
383         self->initiator_ = self->call_->interceptor()->MakeChildCall(
384             std::move(metadata), self->call_->call_handler()->arena()->Ref());
385         self->call_->call_handler()->AddChildCall(self->initiator_);
386         self->initiator_.SpawnGuarded(
387             "server_to_client", [self]() { return self->ServerToClient(); });
388         return ForEach(
389             MessagesFrom(&self->reader_), [self](MessageHandle message) {
390               return self->initiator_.SpawnPushMessage(std::move(message));
391             });
392       });
393 }
394 
Start()395 void RetryInterceptor::Attempt::Start() {
396   call_->call_handler()->SpawnGuardedUntilCallCompletes(
397       "buffer_to_server", [self = Ref()]() { return self->ClientToServer(); });
398 }
399 
Cancel()400 void RetryInterceptor::Attempt::Cancel() { initiator_.SpawnCancel(); }
401 
DebugTag() const402 std::string RetryInterceptor::Attempt::DebugTag() const {
403   return absl::StrFormat("%s attempt:%p", call_->DebugTag(), this);
404 }
405 
406 }  // namespace grpc_core
407