1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/client_channel/retry_interceptor.h"
16
17 #include "src/core/lib/promise/cancel_callback.h"
18 #include "src/core/lib/promise/for_each.h"
19 #include "src/core/lib/promise/map.h"
20 #include "src/core/lib/promise/sleep.h"
21 #include "src/core/service_config/service_config_call_data.h"
22
23 namespace grpc_core {
24
25 namespace {
GetMaxPerRpcRetryBufferSize(const ChannelArgs & args)26 size_t GetMaxPerRpcRetryBufferSize(const ChannelArgs& args) {
27 // By default, we buffer 256 KiB per RPC for retries.
28 // TODO(roth): Do we have any data to suggest a better value?
29 static constexpr int kDefaultPerRpcRetryBufferSize = (256 << 10);
30 return Clamp(args.GetInt(GRPC_ARG_PER_RPC_RETRY_BUFFER_SIZE)
31 .value_or(kDefaultPerRpcRetryBufferSize),
32 0, INT_MAX);
33 }
34 } // namespace
35
36 namespace retry_detail {
37
RetryState(const internal::RetryMethodConfig * retry_policy,RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)38 RetryState::RetryState(
39 const internal::RetryMethodConfig* retry_policy,
40 RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
41 : retry_policy_(retry_policy),
42 retry_throttle_data_(std::move(retry_throttle_data)),
43 retry_backoff_(
44 BackOff::Options()
45 .set_initial_backoff(retry_policy_ == nullptr
46 ? Duration::Zero()
47 : retry_policy_->initial_backoff())
48 .set_multiplier(retry_policy_ == nullptr
49 ? 0
50 : retry_policy_->backoff_multiplier())
51 // This value was picked arbitrarily. It can be changed if
52 // there is any even moderately compelling reason to do so.
53 .set_jitter(0.2)
54 .set_max_backoff(retry_policy_ == nullptr
55 ? Duration::Zero()
56 : retry_policy_->max_backoff())) {}
57
ShouldRetry(const ServerMetadata & md,bool committed,absl::FunctionRef<std::string ()> lazy_attempt_debug_string)58 absl::optional<Duration> RetryState::ShouldRetry(
59 const ServerMetadata& md, bool committed,
60 absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
61 // If no retry policy, don't retry.
62 if (retry_policy_ == nullptr) {
63 GRPC_TRACE_LOG(retry, INFO)
64 << lazy_attempt_debug_string() << " no retry policy";
65 return absl::nullopt;
66 }
67 const auto status = md.get(GrpcStatusMetadata());
68 if (status.has_value()) {
69 if (GPR_LIKELY(*status == GRPC_STATUS_OK)) {
70 if (retry_throttle_data_ != nullptr) {
71 retry_throttle_data_->RecordSuccess();
72 }
73 GRPC_TRACE_LOG(retry, INFO)
74 << lazy_attempt_debug_string() << " call succeeded";
75 return absl::nullopt;
76 }
77 // Status is not OK. Check whether the status is retryable.
78 if (!retry_policy_->retryable_status_codes().Contains(*status)) {
79 GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string() << ": status "
80 << grpc_status_code_to_string(*status)
81 << " not configured as retryable";
82 return absl::nullopt;
83 }
84 }
85 // Record the failure and check whether retries are throttled.
86 // Note that it's important for this check to come after the status
87 // code check above, since we should only record failures whose statuses
88 // match the configured retryable status codes, so that we don't count
89 // things like failures due to malformed requests (INVALID_ARGUMENT).
90 // Conversely, it's important for this to come before the remaining
91 // checks, so that we don't fail to record failures due to other factors.
92 if (retry_throttle_data_ != nullptr &&
93 !retry_throttle_data_->RecordFailure()) {
94 GRPC_TRACE_LOG(retry, INFO)
95 << lazy_attempt_debug_string() << " retries throttled";
96 return absl::nullopt;
97 }
98 // Check whether the call is committed.
99 if (committed) {
100 GRPC_TRACE_LOG(retry, INFO)
101 << lazy_attempt_debug_string() << " retries already committed";
102 return absl::nullopt;
103 }
104 // Check whether we have retries remaining.
105 ++num_attempts_completed_;
106 if (num_attempts_completed_ >= retry_policy_->max_attempts()) {
107 GRPC_TRACE_LOG(retry, INFO)
108 << lazy_attempt_debug_string() << " exceeded "
109 << retry_policy_->max_attempts() << " retry attempts";
110 return absl::nullopt;
111 }
112 // Check server push-back.
113 const auto server_pushback = md.get(GrpcRetryPushbackMsMetadata());
114 if (server_pushback.has_value() && server_pushback < Duration::Zero()) {
115 GRPC_TRACE_LOG(retry, INFO) << lazy_attempt_debug_string()
116 << " not retrying due to server push-back";
117 return absl::nullopt;
118 }
119 // We should retry.
120 Duration next_attempt_timeout;
121 if (server_pushback.has_value()) {
122 CHECK_GE(*server_pushback, Duration::Zero());
123 next_attempt_timeout = *server_pushback;
124 retry_backoff_.Reset();
125 } else {
126 next_attempt_timeout = retry_backoff_.NextAttemptDelay();
127 }
128 GRPC_TRACE_LOG(retry, INFO)
129 << lazy_attempt_debug_string() << " server push-back: retry in "
130 << next_attempt_timeout;
131 return next_attempt_timeout;
132 }
133
134 absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>>
ServerRetryThrottleDataFromChannelArgs(const ChannelArgs & args)135 ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args) {
136 // Get retry throttling parameters from service config.
137 auto* service_config = args.GetObject<ServiceConfig>();
138 if (service_config == nullptr) return nullptr;
139 const auto* config = static_cast<const internal::RetryGlobalConfig*>(
140 service_config->GetGlobalParsedConfig(
141 internal::RetryServiceConfigParser::ParserIndex()));
142 if (config == nullptr) return nullptr;
143 // Get server name from target URI.
144 auto server_uri = args.GetString(GRPC_ARG_SERVER_URI);
145 if (!server_uri.has_value()) {
146 return GRPC_ERROR_CREATE(
147 "server URI channel arg missing or wrong type in client channel "
148 "filter");
149 }
150 absl::StatusOr<URI> uri = URI::Parse(*server_uri);
151 if (!uri.ok() || uri->path().empty()) {
152 return GRPC_ERROR_CREATE("could not extract server name from target URI");
153 }
154 std::string server_name(absl::StripPrefix(uri->path(), "/"));
155 // Get throttling config for server_name.
156 return internal::ServerRetryThrottleMap::Get()->GetDataForServer(
157 server_name, config->max_milli_tokens(), config->milli_token_ratio());
158 }
159
160 } // namespace retry_detail
161
162 ////////////////////////////////////////////////////////////////////////////////
163 // RetryInterceptor
164
Create(const ChannelArgs & args,const FilterArgs &)165 absl::StatusOr<RefCountedPtr<RetryInterceptor>> RetryInterceptor::Create(
166 const ChannelArgs& args, const FilterArgs&) {
167 auto retry_throttle_data =
168 retry_detail::ServerRetryThrottleDataFromChannelArgs(args);
169 if (!retry_throttle_data.ok()) {
170 return retry_throttle_data.status();
171 }
172 return MakeRefCounted<RetryInterceptor>(args,
173 std::move(*retry_throttle_data));
174 }
175
RetryInterceptor(const ChannelArgs & args,RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)176 RetryInterceptor::RetryInterceptor(
177 const ChannelArgs& args,
178 RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data)
179 : per_rpc_retry_buffer_size_(GetMaxPerRpcRetryBufferSize(args)),
180 service_config_parser_index_(
181 internal::RetryServiceConfigParser::ParserIndex()),
182 retry_throttle_data_(std::move(retry_throttle_data)) {}
183
InterceptCall(UnstartedCallHandler unstarted_call_handler)184 void RetryInterceptor::InterceptCall(
185 UnstartedCallHandler unstarted_call_handler) {
186 auto call_handler = unstarted_call_handler.StartCall();
187 auto* arena = call_handler.arena();
188 auto call = arena->MakeRefCounted<Call>(RefAsSubclass<RetryInterceptor>(),
189 std::move(call_handler));
190 call->StartAttempt();
191 call->Start();
192 }
193
GetRetryPolicy()194 const internal::RetryMethodConfig* RetryInterceptor::GetRetryPolicy() {
195 auto* svc_cfg_call_data = MaybeGetContext<ServiceConfigCallData>();
196 if (svc_cfg_call_data == nullptr) return nullptr;
197 return static_cast<const internal::RetryMethodConfig*>(
198 svc_cfg_call_data->GetMethodParsedConfig(service_config_parser_index_));
199 }
200
201 ////////////////////////////////////////////////////////////////////////////////
202 // RetryInterceptor::Call
203
Call(RefCountedPtr<RetryInterceptor> interceptor,CallHandler call_handler)204 RetryInterceptor::Call::Call(RefCountedPtr<RetryInterceptor> interceptor,
205 CallHandler call_handler)
206 : call_handler_(std::move(call_handler)),
207 interceptor_(std::move(interceptor)),
208 retry_state_(interceptor_->GetRetryPolicy(),
209 interceptor_->retry_throttle_data_) {
210 GRPC_TRACE_LOG(retry, INFO)
211 << DebugTag() << " retry call created: " << retry_state_;
212 }
213
ClientToBuffer()214 auto RetryInterceptor::Call::ClientToBuffer() {
215 return TrySeq(
216 call_handler_.PullClientInitialMetadata(),
217 [self = Ref()](ClientMetadataHandle metadata) mutable {
218 GRPC_TRACE_LOG(retry, INFO)
219 << self->DebugTag()
220 << " got client initial metadata: " << metadata->DebugString();
221 return self->request_buffer_.PushClientInitialMetadata(
222 std::move(metadata));
223 },
224 [self = Ref()](size_t buffered) {
225 self->MaybeCommit(buffered);
226 return ForEach(
227 MessagesFrom(self->call_handler_), [self](MessageHandle message) {
228 GRPC_TRACE_LOG(retry, INFO)
229 << self->DebugTag() << " got client message "
230 << message->DebugString();
231 return TrySeq(
232 self->request_buffer_.PushMessage(std::move(message)),
233 [self](size_t buffered) {
234 self->MaybeCommit(buffered);
235 return absl::OkStatus();
236 });
237 });
238 });
239 }
240
Start()241 void RetryInterceptor::Call::Start() {
242 call_handler_.SpawnGuarded("client_to_buffer", [self = Ref()]() {
243 return OnCancel(Map(self->ClientToBuffer(),
244 [self](absl::Status status) {
245 if (status.ok()) {
246 self->request_buffer_.FinishSends();
247 } else {
248 self->request_buffer_.Cancel(status);
249 }
250 return status;
251 }),
252 [self]() { self->request_buffer_.Cancel(); });
253 });
254 }
255
StartAttempt()256 void RetryInterceptor::Call::StartAttempt() {
257 if (current_attempt_ != nullptr) {
258 current_attempt_->Cancel();
259 }
260 auto current_attempt = call_handler_.arena()->MakeRefCounted<Attempt>(Ref());
261 current_attempt_ = current_attempt.get();
262 current_attempt->Start();
263 }
264
MaybeCommit(size_t buffered)265 void RetryInterceptor::Call::MaybeCommit(size_t buffered) {
266 GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " buffered:" << buffered << "/"
267 << interceptor_->per_rpc_retry_buffer_size_;
268 if (buffered >= interceptor_->per_rpc_retry_buffer_size_) {
269 std::ignore = current_attempt_->Commit();
270 }
271 }
272
DebugTag()273 std::string RetryInterceptor::Call::DebugTag() {
274 return absl::StrFormat("%s call:%p", Activity::current()->DebugTag(), this);
275 }
276
277 ////////////////////////////////////////////////////////////////////////////////
278 // RetryInterceptor::Attempt
279
Attempt(RefCountedPtr<Call> call)280 RetryInterceptor::Attempt::Attempt(RefCountedPtr<Call> call)
281 : call_(std::move(call)), reader_(call_->request_buffer()) {
282 GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " retry attempt created";
283 }
284
~Attempt()285 RetryInterceptor::Attempt::~Attempt() { call_->RemoveAttempt(this); }
286
ServerToClientGotInitialMetadata(ServerMetadataHandle md)287 auto RetryInterceptor::Attempt::ServerToClientGotInitialMetadata(
288 ServerMetadataHandle md) {
289 GRPC_TRACE_LOG(retry, INFO)
290 << DebugTag() << " get server initial metadata " << md->DebugString();
291 const bool committed = Commit();
292 return If(
293 committed,
294 [&]() {
295 call_->call_handler()->SpawnPushServerInitialMetadata(std::move(md));
296 return Seq(ForEach(MessagesFrom(initiator_),
297 [call = call_](MessageHandle message) {
298 GRPC_TRACE_LOG(retry, INFO)
299 << call->DebugTag() << " got server message "
300 << message->DebugString();
301 return call->call_handler()->SpawnPushMessage(
302 std::move(message));
303 }),
304 initiator_.PullServerTrailingMetadata(),
305 [call = call_](ServerMetadataHandle md) {
306 GRPC_TRACE_LOG(retry, INFO)
307 << call->DebugTag()
308 << " got server trailing metadata: "
309 << md->DebugString();
310 call->call_handler()->SpawnPushServerTrailingMetadata(
311 std::move(md));
312 return absl::OkStatus();
313 });
314 },
315 [&]() { return []() { return absl::CancelledError(); }; });
316 }
317
ServerToClientGotTrailersOnlyResponse()318 auto RetryInterceptor::Attempt::ServerToClientGotTrailersOnlyResponse() {
319 GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " got trailers only response";
320 return Seq(
321 initiator_.PullServerTrailingMetadata(),
322 [self = Ref()](ServerMetadataHandle md) {
323 GRPC_TRACE_LOG(retry, INFO)
324 << self->DebugTag()
325 << " got server trailing metadata: " << md->DebugString();
326 auto delay = self->call_->ShouldRetry(
327 *md,
328 [self = self.get()]() -> std::string { return self->DebugTag(); });
329 return If(
330 delay.has_value(),
331 [self, delay]() {
332 return Map(Sleep(*delay), [call = self->call_](absl::Status) {
333 call->StartAttempt();
334 return absl::OkStatus();
335 });
336 },
337 [self, md = std::move(md)]() mutable {
338 if (!self->Commit()) return absl::CancelledError();
339 self->call_->call_handler()->SpawnPushServerTrailingMetadata(
340 std::move(md));
341 return absl::OkStatus();
342 });
343 });
344 }
345
ServerToClient()346 auto RetryInterceptor::Attempt::ServerToClient() {
347 return TrySeq(
348 initiator_.PullServerInitialMetadata(),
349 [self = Ref()](absl::optional<ServerMetadataHandle> metadata) {
350 const bool has_md = metadata.has_value();
351 return If(
352 has_md,
353 [self = self.get(), md = std::move(metadata)]() mutable {
354 return self->ServerToClientGotInitialMetadata(std::move(*md));
355 },
356 [self = self.get()]() {
357 return self->ServerToClientGotTrailersOnlyResponse();
358 });
359 });
360 }
361
Commit(SourceLocation whence)362 bool RetryInterceptor::Attempt::Commit(SourceLocation whence) {
363 if (committed_) return true;
364 GRPC_TRACE_LOG(retry, INFO) << DebugTag() << " commit attempt from "
365 << whence.file() << ":" << whence.line();
366 if (!call_->IsCurrentAttempt(this)) return false;
367 committed_ = true;
368 call_->request_buffer()->Commit(reader());
369 return true;
370 }
371
ClientToServer()372 auto RetryInterceptor::Attempt::ClientToServer() {
373 return TrySeq(
374 reader_.PullClientInitialMetadata(),
375 [self = Ref()](ClientMetadataHandle metadata) {
376 int num_attempts_completed = self->call_->num_attempts_completed();
377 if (GPR_UNLIKELY(num_attempts_completed > 0)) {
378 metadata->Set(GrpcPreviousRpcAttemptsMetadata(),
379 num_attempts_completed);
380 } else {
381 metadata->Remove(GrpcPreviousRpcAttemptsMetadata());
382 }
383 self->initiator_ = self->call_->interceptor()->MakeChildCall(
384 std::move(metadata), self->call_->call_handler()->arena()->Ref());
385 self->call_->call_handler()->AddChildCall(self->initiator_);
386 self->initiator_.SpawnGuarded(
387 "server_to_client", [self]() { return self->ServerToClient(); });
388 return ForEach(
389 MessagesFrom(&self->reader_), [self](MessageHandle message) {
390 return self->initiator_.SpawnPushMessage(std::move(message));
391 });
392 });
393 }
394
Start()395 void RetryInterceptor::Attempt::Start() {
396 call_->call_handler()->SpawnGuardedUntilCallCompletes(
397 "buffer_to_server", [self = Ref()]() { return self->ClientToServer(); });
398 }
399
Cancel()400 void RetryInterceptor::Attempt::Cancel() { initiator_.SpawnCancel(); }
401
DebugTag() const402 std::string RetryInterceptor::Attempt::DebugTag() const {
403 return absl::StrFormat("%s attempt:%p", call_->DebugTag(), this);
404 }
405
406 } // namespace grpc_core
407