1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H 16 #define GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H 17 18 #include "src/core/call/request_buffer.h" 19 #include "src/core/client_channel/client_channel_args.h" 20 #include "src/core/client_channel/retry_service_config.h" 21 #include "src/core/client_channel/retry_throttle.h" 22 #include "src/core/filter/filter_args.h" 23 #include "src/core/lib/transport/interception_chain.h" 24 #include "src/core/util/backoff.h" 25 26 namespace grpc_core { 27 28 namespace retry_detail { 29 class RetryState { 30 public: 31 RetryState( 32 const internal::RetryMethodConfig* retry_policy, 33 RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data); 34 35 // if nullopt --> commit & don't retry 36 // if duration --> retry after duration 37 absl::optional<Duration> ShouldRetry( 38 const ServerMetadata& md, bool committed, 39 absl::FunctionRef<std::string()> lazy_attempt_debug_string); num_attempts_completed()40 int num_attempts_completed() const { return num_attempts_completed_; } 41 42 template <typename Sink> AbslStringify(Sink & sink,const RetryState & state)43 friend void AbslStringify(Sink& sink, const RetryState& state) { 44 sink.Append(absl::StrCat( 45 "policy:{", 46 state.retry_policy_ != nullptr ? absl::StrCat(*state.retry_policy_) 47 : "none", 48 "} throttle:", state.retry_throttle_data_ != nullptr, 49 " attempts:", state.num_attempts_completed_)); 50 } 51 52 private: 53 const internal::RetryMethodConfig* const retry_policy_; 54 RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_; 55 int num_attempts_completed_ = 0; 56 BackOff retry_backoff_; 57 }; 58 59 absl::StatusOr<RefCountedPtr<internal::ServerRetryThrottleData>> 60 ServerRetryThrottleDataFromChannelArgs(const ChannelArgs& args); 61 } // namespace retry_detail 62 63 class RetryInterceptor : public Interceptor { 64 public: 65 RetryInterceptor( 66 const ChannelArgs& args, 67 RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data); 68 69 static absl::StatusOr<RefCountedPtr<RetryInterceptor>> Create( 70 const ChannelArgs& args, const FilterArgs&); 71 Orphaned()72 void Orphaned() override {} 73 74 protected: 75 void InterceptCall(UnstartedCallHandler unstarted_call_handler) override; 76 77 private: 78 class Attempt; 79 80 class Call final 81 : public RefCounted<Call, NonPolymorphicRefCount, UnrefCallDtor> { 82 public: 83 Call(RefCountedPtr<RetryInterceptor> interceptor, CallHandler call_handler); 84 85 void StartAttempt(); 86 void Start(); 87 request_buffer()88 RequestBuffer* request_buffer() { return &request_buffer_; } call_handler()89 CallHandler* call_handler() { return &call_handler_; } interceptor()90 RetryInterceptor* interceptor() { return interceptor_.get(); } 91 // if nullopt --> commit & don't retry 92 // if duration --> retry after duration ShouldRetry(const ServerMetadata & md,absl::FunctionRef<std::string ()> lazy_attempt_debug_string)93 absl::optional<Duration> ShouldRetry( 94 const ServerMetadata& md, 95 absl::FunctionRef<std::string()> lazy_attempt_debug_string) { 96 return retry_state_.ShouldRetry(md, request_buffer_.committed(), 97 lazy_attempt_debug_string); 98 } num_attempts_completed()99 int num_attempts_completed() const { 100 return retry_state_.num_attempts_completed(); 101 } RemoveAttempt(Attempt * attempt)102 void RemoveAttempt(Attempt* attempt) { 103 if (current_attempt_ == attempt) current_attempt_ = nullptr; 104 } IsCurrentAttempt(Attempt * attempt)105 bool IsCurrentAttempt(Attempt* attempt) { 106 CHECK(attempt != nullptr); 107 return current_attempt_ == attempt; 108 } 109 110 std::string DebugTag(); 111 112 private: 113 void MaybeCommit(size_t buffered); 114 auto ClientToBuffer(); 115 116 RequestBuffer request_buffer_; 117 CallHandler call_handler_; 118 RefCountedPtr<RetryInterceptor> interceptor_; 119 Attempt* current_attempt_ = nullptr; 120 retry_detail::RetryState retry_state_; 121 }; 122 123 class Attempt final 124 : public RefCounted<Attempt, NonPolymorphicRefCount, UnrefCallDtor> { 125 public: 126 explicit Attempt(RefCountedPtr<Call> call); 127 ~Attempt(); 128 129 void Start(); 130 void Cancel(); 131 GRPC_MUST_USE_RESULT bool Commit(SourceLocation whence = {}); reader()132 RequestBuffer::Reader* reader() { return &reader_; } 133 134 std::string DebugTag() const; 135 136 private: 137 auto ClientToServer(); 138 auto ServerToClient(); 139 auto ServerToClientGotInitialMetadata(ServerMetadataHandle md); 140 auto ServerToClientGotTrailersOnlyResponse(); 141 142 RefCountedPtr<Call> call_; 143 RequestBuffer::Reader reader_; 144 CallInitiator initiator_; 145 bool committed_ = false; 146 }; 147 148 const internal::RetryMethodConfig* GetRetryPolicy(); 149 150 const size_t per_rpc_retry_buffer_size_; 151 const size_t service_config_parser_index_; 152 const RefCountedPtr<internal::ServerRetryThrottleData> retry_throttle_data_; 153 }; 154 155 } // namespace grpc_core 156 157 #endif // GRPC_SRC_CORE_CLIENT_CHANNEL_RETRY_INTERCEPTOR_H 158