• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H
16 #define GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H
17 
18 #include "src/core/call/request_buffer.h"
19 #include "src/core/client_channel/client_channel_args.h"
20 #include "src/core/client_channel/retry_service_config.h"
21 #include "src/core/client_channel/retry_throttle.h"
22 #include "src/core/filter/filter_args.h"
23 #include "src/core/lib/transport/interception_chain.h"
24 #include "src/core/util/backoff.h"
25 
26 namespace grpc_core {
27 
28 namespace retry_detail {
29 class RetryState {
30  public:
31   RetryState(
32       const internal::RetryMethodConfig* retry_policy,
33       RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data);
34 
35   // if nullopt --> commit & don't retry
36   // if duration --> retry after duration
37   absl::optional<Duration> ShouldRetry(
38       const ServerMetadata& md, bool committed,
39       absl::FunctionRef<std::string()> lazy_attempt_debug_string);
num_attempts_completed()40   int num_attempts_completed() const { return num_attempts_completed_; }
41 
42   template <typename Sink>
AbslStringify(Sink & sink,const RetryState & state)43   friend void AbslStringify(Sink& sink, const RetryState& state) {
44     sink.Append(absl::StrCat(
45         "policy:{",
46         state.retry_policy_ != nullptr ? absl::StrCat(*state.retry_policy_)
47                                        : "none",
48         "} throttle:", state.retry_throttle_data_ != nullptr,
49         " attempts:", state.num_attempts_completed_));
50   }
51 
52  private:
53   const internal::RetryMethodConfig* const retry_policy_;
54   RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
55   int num_attempts_completed_ = 0;
56   BackOff retry_backoff_;
57 };
58 
59 absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>>
60 ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args);
61 }  // namespace retry_detail
62 
63 class RetryInterceptor : public Interceptor {
64  public:
65   RetryInterceptor(
66       const ChannelArgs& args,
67       RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data);
68 
69   static absl::StatusOr<RefCountedPtr<RetryInterceptor>> Create(
70       const ChannelArgs& args, const FilterArgs&);
71 
Orphaned()72   void Orphaned() override {}
73 
74  protected:
75   void InterceptCall(UnstartedCallHandler unstarted_call_handler) override;
76 
77  private:
78   class Attempt;
79 
80   class Call final
81       : public RefCounted<Call, NonPolymorphicRefCount, UnrefCallDtor> {
82    public:
83     Call(RefCountedPtr<RetryInterceptor> interceptor, CallHandler call_handler);
84 
85     void StartAttempt();
86     void Start();
87 
request_buffer()88     RequestBuffer* request_buffer() { return &request_buffer_; }
call_handler()89     CallHandler* call_handler() { return &call_handler_; }
interceptor()90     RetryInterceptor* interceptor() { return interceptor_.get(); }
91     // if nullopt --> commit & don't retry
92     // if duration --> retry after duration
ShouldRetry(const ServerMetadata & md,absl::FunctionRef<std::string ()> lazy_attempt_debug_string)93     absl::optional<Duration> ShouldRetry(
94         const ServerMetadata& md,
95         absl::FunctionRef<std::string()> lazy_attempt_debug_string) {
96       return retry_state_.ShouldRetry(md, request_buffer_.committed(),
97                                       lazy_attempt_debug_string);
98     }
num_attempts_completed()99     int num_attempts_completed() const {
100       return retry_state_.num_attempts_completed();
101     }
RemoveAttempt(Attempt * attempt)102     void RemoveAttempt(Attempt* attempt) {
103       if (current_attempt_ == attempt) current_attempt_ = nullptr;
104     }
IsCurrentAttempt(Attempt * attempt)105     bool IsCurrentAttempt(Attempt* attempt) {
106       CHECK(attempt != nullptr);
107       return current_attempt_ == attempt;
108     }
109 
110     std::string DebugTag();
111 
112    private:
113     void MaybeCommit(size_t buffered);
114     auto ClientToBuffer();
115 
116     RequestBuffer request_buffer_;
117     CallHandler call_handler_;
118     RefCountedPtr<RetryInterceptor> interceptor_;
119     Attempt* current_attempt_ = nullptr;
120     retry_detail::RetryState retry_state_;
121   };
122 
123   class Attempt final
124       : public RefCounted<Attempt, NonPolymorphicRefCount, UnrefCallDtor> {
125    public:
126     explicit Attempt(RefCountedPtr<Call> call);
127     ~Attempt();
128 
129     void Start();
130     void Cancel();
131     GRPC_MUST_USE_RESULT bool Commit(SourceLocation whence = {});
reader()132     RequestBuffer::Reader* reader() { return &reader_; }
133 
134     std::string DebugTag() const;
135 
136    private:
137     auto ClientToServer();
138     auto ServerToClient();
139     auto ServerToClientGotInitialMetadata(ServerMetadataHandle md);
140     auto ServerToClientGotTrailersOnlyResponse();
141 
142     RefCountedPtr<Call> call_;
143     RequestBuffer::Reader reader_;
144     CallInitiator initiator_;
145     bool committed_ = false;
146   };
147 
148   const internal::RetryMethodConfig* GetRetryPolicy();
149 
150   const size_t per_rpc_retry_buffer_size_;
151   const size_t service_config_parser_index_;
152   const RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_;
153 };
154 
155 }  // namespace grpc_core
156 
157 #endif  // GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H
158