• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2021 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "src/core/ext/filters/fault_injection/fault_injection_filter.h"
18 
19 #include <grpc/status.h>
20 #include <grpc/support/port_platform.h>
21 #include <stdint.h>
22 
23 #include <algorithm>
24 #include <atomic>
25 #include <functional>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29 
30 #include "absl/log/log.h"
31 #include "absl/meta/type_traits.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/numbers.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h"
40 #include "src/core/lib/channel/channel_stack.h"
41 #include "src/core/lib/channel/status_util.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/promise/context.h"
44 #include "src/core/lib/promise/sleep.h"
45 #include "src/core/lib/promise/try_seq.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/transport.h"
48 #include "src/core/service_config/service_config_call_data.h"
49 #include "src/core/util/time.h"
50 
51 namespace grpc_core {
52 
53 const NoInterceptor FaultInjectionFilter::Call::OnServerInitialMetadata;
54 const NoInterceptor FaultInjectionFilter::Call::OnServerTrailingMetadata;
55 const NoInterceptor FaultInjectionFilter::Call::OnClientToServerMessage;
56 const NoInterceptor FaultInjectionFilter::Call::OnClientToServerHalfClose;
57 const NoInterceptor FaultInjectionFilter::Call::OnServerToClientMessage;
58 const NoInterceptor FaultInjectionFilter::Call::OnFinalize;
59 
60 namespace {
61 
62 std::atomic<uint32_t> g_active_faults{0};
63 static_assert(
64     std::is_trivially_destructible<std::atomic<uint32_t>>::value,
65     "the active fault counter needs to have a trivially destructible type");
66 
67 template <typename T>
AsInt(absl::string_view s)68 auto AsInt(absl::string_view s) -> absl::optional<T> {
69   T x;
70   if (absl::SimpleAtoi(s, &x)) return x;
71   return absl::nullopt;
72 }
73 
UnderFraction(absl::InsecureBitGen * rand_generator,const uint32_t numerator,const uint32_t denominator)74 inline bool UnderFraction(absl::InsecureBitGen* rand_generator,
75                           const uint32_t numerator,
76                           const uint32_t denominator) {
77   if (numerator <= 0) return false;
78   if (numerator >= denominator) return true;
79   // Generate a random number in [0, denominator).
80   const uint32_t random_number =
81       absl::Uniform(absl::IntervalClosedOpen, *rand_generator, 0u, denominator);
82   return random_number < numerator;
83 }
84 
85 // Tracks an active faults lifetime.
86 // Increments g_active_faults when created, and decrements it when destroyed.
87 class FaultHandle {
88  public:
FaultHandle(bool active)89   explicit FaultHandle(bool active) : active_(active) {
90     if (active) {
91       g_active_faults.fetch_add(1, std::memory_order_relaxed);
92     }
93   }
~FaultHandle()94   ~FaultHandle() {
95     if (active_) {
96       g_active_faults.fetch_sub(1, std::memory_order_relaxed);
97     }
98   }
99   FaultHandle(const FaultHandle&) = delete;
100   FaultHandle& operator=(const FaultHandle&) = delete;
FaultHandle(FaultHandle && other)101   FaultHandle(FaultHandle&& other) noexcept
102       : active_(std::exchange(other.active_, false)) {}
operator =(FaultHandle && other)103   FaultHandle& operator=(FaultHandle&& other) noexcept {
104     std::swap(active_, other.active_);
105     return *this;
106   }
107 
108  private:
109   bool active_;
110 };
111 
112 }  // namespace
113 
114 class FaultInjectionFilter::InjectionDecision {
115  public:
InjectionDecision(uint32_t max_faults,Duration delay_time,absl::optional<absl::Status> abort_request)116   InjectionDecision(uint32_t max_faults, Duration delay_time,
117                     absl::optional<absl::Status> abort_request)
118       : max_faults_(max_faults),
119         delay_time_(delay_time),
120         abort_request_(abort_request) {}
121 
122   std::string ToString() const;
123   Timestamp DelayUntil();
124   absl::Status MaybeAbort() const;
125 
126  private:
127   bool HaveActiveFaultsQuota() const;
128 
129   uint32_t max_faults_;
130   Duration delay_time_;
131   absl::optional<absl::Status> abort_request_;
132   FaultHandle active_fault_{false};
133 };
134 
135 absl::StatusOr<std::unique_ptr<FaultInjectionFilter>>
Create(const ChannelArgs &,ChannelFilter::Args filter_args)136 FaultInjectionFilter::Create(const ChannelArgs&,
137                              ChannelFilter::Args filter_args) {
138   return std::make_unique<FaultInjectionFilter>(filter_args);
139 }
140 
FaultInjectionFilter(ChannelFilter::Args filter_args)141 FaultInjectionFilter::FaultInjectionFilter(ChannelFilter::Args filter_args)
142     : index_(filter_args.instance_id()),
143       service_config_parser_index_(
144           FaultInjectionServiceConfigParser::ParserIndex()) {}
145 
146 // Construct a promise for one call.
OnClientInitialMetadata(ClientMetadata & md,FaultInjectionFilter * filter)147 ArenaPromise<absl::Status> FaultInjectionFilter::Call::OnClientInitialMetadata(
148     ClientMetadata& md, FaultInjectionFilter* filter) {
149   auto decision = filter->MakeInjectionDecision(md);
150   GRPC_TRACE_LOG(fault_injection_filter, INFO)
151       << "chand=" << this << ": Fault injection triggered "
152       << decision.ToString();
153   auto delay = decision.DelayUntil();
154   return TrySeq(Sleep(delay), [decision = std::move(decision)]() {
155     return decision.MaybeAbort();
156   });
157 }
158 
159 FaultInjectionFilter::InjectionDecision
MakeInjectionDecision(const ClientMetadata & initial_metadata)160 FaultInjectionFilter::MakeInjectionDecision(
161     const ClientMetadata& initial_metadata) {
162   // Fetch the fault injection policy from the service config, based on the
163   // relative index for which policy should this CallData use.
164   auto* service_config_call_data = GetContext<ServiceConfigCallData>();
165   auto* method_params = static_cast<FaultInjectionMethodParsedConfig*>(
166       service_config_call_data->GetMethodParsedConfig(
167           service_config_parser_index_));
168   const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy =
169       nullptr;
170   if (method_params != nullptr) {
171     fi_policy = method_params->fault_injection_policy(index_);
172   }
173 
174   // Shouldn't ever be null, but just in case, return a no-op decision.
175   if (fi_policy == nullptr) {
176     return InjectionDecision(/*max_faults=*/0, /*delay_time=*/Duration::Zero(),
177                              /*abort_request=*/absl::nullopt);
178   }
179 
180   grpc_status_code abort_code = fi_policy->abort_code;
181   uint32_t abort_percentage_numerator = fi_policy->abort_percentage_numerator;
182   uint32_t delay_percentage_numerator = fi_policy->delay_percentage_numerator;
183   Duration delay = fi_policy->delay;
184 
185   // Update the policy with values in initial metadata.
186   if (!fi_policy->abort_code_header.empty() ||
187       !fi_policy->abort_percentage_header.empty() ||
188       !fi_policy->delay_header.empty() ||
189       !fi_policy->delay_percentage_header.empty()) {
190     std::string buffer;
191     if (!fi_policy->abort_code_header.empty() && abort_code == GRPC_STATUS_OK) {
192       auto value = initial_metadata.GetStringValue(fi_policy->abort_code_header,
193                                                    &buffer);
194       if (value.has_value()) {
195         grpc_status_code_from_int(
196             AsInt<int>(*value).value_or(GRPC_STATUS_UNKNOWN), &abort_code);
197       }
198     }
199     if (!fi_policy->abort_percentage_header.empty()) {
200       auto value = initial_metadata.GetStringValue(
201           fi_policy->abort_percentage_header, &buffer);
202       if (value.has_value()) {
203         abort_percentage_numerator = std::min(
204             AsInt<uint32_t>(*value).value_or(-1), abort_percentage_numerator);
205       }
206     }
207     if (!fi_policy->delay_header.empty() && delay == Duration::Zero()) {
208       auto value =
209           initial_metadata.GetStringValue(fi_policy->delay_header, &buffer);
210       if (value.has_value()) {
211         delay = Duration::Milliseconds(
212             std::max(AsInt<int64_t>(*value).value_or(0), int64_t{0}));
213       }
214     }
215     if (!fi_policy->delay_percentage_header.empty()) {
216       auto value = initial_metadata.GetStringValue(
217           fi_policy->delay_percentage_header, &buffer);
218       if (value.has_value()) {
219         delay_percentage_numerator = std::min(
220             AsInt<uint32_t>(*value).value_or(-1), delay_percentage_numerator);
221       }
222     }
223   }
224   // Roll the dice
225   bool delay_request = delay != Duration::Zero();
226   bool abort_request = abort_code != GRPC_STATUS_OK;
227   if (delay_request || abort_request) {
228     MutexLock lock(&mu_);
229     if (delay_request) {
230       delay_request =
231           UnderFraction(&delay_rand_generator_, delay_percentage_numerator,
232                         fi_policy->delay_percentage_denominator);
233     }
234     if (abort_request) {
235       abort_request =
236           UnderFraction(&abort_rand_generator_, abort_percentage_numerator,
237                         fi_policy->abort_percentage_denominator);
238     }
239   }
240 
241   return InjectionDecision(
242       fi_policy->max_faults, delay_request ? delay : Duration::Zero(),
243       abort_request ? absl::optional<absl::Status>(absl::Status(
244                           static_cast<absl::StatusCode>(abort_code),
245                           fi_policy->abort_message))
246                     : absl::nullopt);
247 }
248 
HaveActiveFaultsQuota() const249 bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const {
250   return g_active_faults.load(std::memory_order_acquire) < max_faults_;
251 }
252 
DelayUntil()253 Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() {
254   if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) {
255     active_fault_ = FaultHandle{true};
256     return Timestamp::Now() + delay_time_;
257   }
258   return Timestamp::InfPast();
259 }
260 
MaybeAbort() const261 absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort() const {
262   if (abort_request_.has_value() &&
263       (delay_time_ != Duration::Zero() || HaveActiveFaultsQuota())) {
264     return abort_request_.value();
265   }
266   return absl::OkStatus();
267 }
268 
ToString() const269 std::string FaultInjectionFilter::InjectionDecision::ToString() const {
270   return absl::StrCat("delay=", delay_time_ != Duration::Zero(),
271                       " abort=", abort_request_.has_value());
272 }
273 
274 const grpc_channel_filter FaultInjectionFilter::kFilter =
275     MakePromiseBasedFilter<FaultInjectionFilter, FilterEndpoint::kClient>();
276 
FaultInjectionFilterRegister(CoreConfiguration::Builder * builder)277 void FaultInjectionFilterRegister(CoreConfiguration::Builder* builder) {
278   FaultInjectionServiceConfigParser::Register(builder);
279 }
280 
281 }  // namespace grpc_core
282