1 //
2 // Copyright 2021 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "src/core/ext/filters/fault_injection/fault_injection_filter.h"
18
19 #include <grpc/status.h>
20 #include <grpc/support/port_platform.h>
21 #include <stdint.h>
22
23 #include <algorithm>
24 #include <atomic>
25 #include <functional>
26 #include <string>
27 #include <type_traits>
28 #include <utility>
29
30 #include "absl/log/log.h"
31 #include "absl/meta/type_traits.h"
32 #include "absl/status/status.h"
33 #include "absl/status/statusor.h"
34 #include "absl/strings/numbers.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/strings/string_view.h"
37 #include "absl/types/optional.h"
38 #include "src/core/config/core_configuration.h"
39 #include "src/core/ext/filters/fault_injection/fault_injection_service_config_parser.h"
40 #include "src/core/lib/channel/channel_stack.h"
41 #include "src/core/lib/channel/status_util.h"
42 #include "src/core/lib/debug/trace.h"
43 #include "src/core/lib/promise/context.h"
44 #include "src/core/lib/promise/sleep.h"
45 #include "src/core/lib/promise/try_seq.h"
46 #include "src/core/lib/transport/metadata_batch.h"
47 #include "src/core/lib/transport/transport.h"
48 #include "src/core/service_config/service_config_call_data.h"
49 #include "src/core/util/time.h"
50
51 namespace grpc_core {
52
53 const NoInterceptor FaultInjectionFilter::Call::OnServerInitialMetadata;
54 const NoInterceptor FaultInjectionFilter::Call::OnServerTrailingMetadata;
55 const NoInterceptor FaultInjectionFilter::Call::OnClientToServerMessage;
56 const NoInterceptor FaultInjectionFilter::Call::OnClientToServerHalfClose;
57 const NoInterceptor FaultInjectionFilter::Call::OnServerToClientMessage;
58 const NoInterceptor FaultInjectionFilter::Call::OnFinalize;
59
60 namespace {
61
62 std::atomic<uint32_t> g_active_faults{0};
63 static_assert(
64 std::is_trivially_destructible<std::atomic<uint32_t>>::value,
65 "the active fault counter needs to have a trivially destructible type");
66
67 template <typename T>
AsInt(absl::string_view s)68 auto AsInt(absl::string_view s) -> absl::optional<T> {
69 T x;
70 if (absl::SimpleAtoi(s, &x)) return x;
71 return absl::nullopt;
72 }
73
UnderFraction(absl::InsecureBitGen * rand_generator,const uint32_t numerator,const uint32_t denominator)74 inline bool UnderFraction(absl::InsecureBitGen* rand_generator,
75 const uint32_t numerator,
76 const uint32_t denominator) {
77 if (numerator <= 0) return false;
78 if (numerator >= denominator) return true;
79 // Generate a random number in [0, denominator).
80 const uint32_t random_number =
81 absl::Uniform(absl::IntervalClosedOpen, *rand_generator, 0u, denominator);
82 return random_number < numerator;
83 }
84
85 // Tracks an active faults lifetime.
86 // Increments g_active_faults when created, and decrements it when destroyed.
87 class FaultHandle {
88 public:
FaultHandle(bool active)89 explicit FaultHandle(bool active) : active_(active) {
90 if (active) {
91 g_active_faults.fetch_add(1, std::memory_order_relaxed);
92 }
93 }
~FaultHandle()94 ~FaultHandle() {
95 if (active_) {
96 g_active_faults.fetch_sub(1, std::memory_order_relaxed);
97 }
98 }
99 FaultHandle(const FaultHandle&) = delete;
100 FaultHandle& operator=(const FaultHandle&) = delete;
FaultHandle(FaultHandle && other)101 FaultHandle(FaultHandle&& other) noexcept
102 : active_(std::exchange(other.active_, false)) {}
operator =(FaultHandle && other)103 FaultHandle& operator=(FaultHandle&& other) noexcept {
104 std::swap(active_, other.active_);
105 return *this;
106 }
107
108 private:
109 bool active_;
110 };
111
112 } // namespace
113
114 class FaultInjectionFilter::InjectionDecision {
115 public:
InjectionDecision(uint32_t max_faults,Duration delay_time,absl::optional<absl::Status> abort_request)116 InjectionDecision(uint32_t max_faults, Duration delay_time,
117 absl::optional<absl::Status> abort_request)
118 : max_faults_(max_faults),
119 delay_time_(delay_time),
120 abort_request_(abort_request) {}
121
122 std::string ToString() const;
123 Timestamp DelayUntil();
124 absl::Status MaybeAbort() const;
125
126 private:
127 bool HaveActiveFaultsQuota() const;
128
129 uint32_t max_faults_;
130 Duration delay_time_;
131 absl::optional<absl::Status> abort_request_;
132 FaultHandle active_fault_{false};
133 };
134
135 absl::StatusOr<std::unique_ptr<FaultInjectionFilter>>
Create(const ChannelArgs &,ChannelFilter::Args filter_args)136 FaultInjectionFilter::Create(const ChannelArgs&,
137 ChannelFilter::Args filter_args) {
138 return std::make_unique<FaultInjectionFilter>(filter_args);
139 }
140
FaultInjectionFilter(ChannelFilter::Args filter_args)141 FaultInjectionFilter::FaultInjectionFilter(ChannelFilter::Args filter_args)
142 : index_(filter_args.instance_id()),
143 service_config_parser_index_(
144 FaultInjectionServiceConfigParser::ParserIndex()) {}
145
146 // Construct a promise for one call.
OnClientInitialMetadata(ClientMetadata & md,FaultInjectionFilter * filter)147 ArenaPromise<absl::Status> FaultInjectionFilter::Call::OnClientInitialMetadata(
148 ClientMetadata& md, FaultInjectionFilter* filter) {
149 auto decision = filter->MakeInjectionDecision(md);
150 GRPC_TRACE_LOG(fault_injection_filter, INFO)
151 << "chand=" << this << ": Fault injection triggered "
152 << decision.ToString();
153 auto delay = decision.DelayUntil();
154 return TrySeq(Sleep(delay), [decision = std::move(decision)]() {
155 return decision.MaybeAbort();
156 });
157 }
158
159 FaultInjectionFilter::InjectionDecision
MakeInjectionDecision(const ClientMetadata & initial_metadata)160 FaultInjectionFilter::MakeInjectionDecision(
161 const ClientMetadata& initial_metadata) {
162 // Fetch the fault injection policy from the service config, based on the
163 // relative index for which policy should this CallData use.
164 auto* service_config_call_data = GetContext<ServiceConfigCallData>();
165 auto* method_params = static_cast<FaultInjectionMethodParsedConfig*>(
166 service_config_call_data->GetMethodParsedConfig(
167 service_config_parser_index_));
168 const FaultInjectionMethodParsedConfig::FaultInjectionPolicy* fi_policy =
169 nullptr;
170 if (method_params != nullptr) {
171 fi_policy = method_params->fault_injection_policy(index_);
172 }
173
174 // Shouldn't ever be null, but just in case, return a no-op decision.
175 if (fi_policy == nullptr) {
176 return InjectionDecision(/*max_faults=*/0, /*delay_time=*/Duration::Zero(),
177 /*abort_request=*/absl::nullopt);
178 }
179
180 grpc_status_code abort_code = fi_policy->abort_code;
181 uint32_t abort_percentage_numerator = fi_policy->abort_percentage_numerator;
182 uint32_t delay_percentage_numerator = fi_policy->delay_percentage_numerator;
183 Duration delay = fi_policy->delay;
184
185 // Update the policy with values in initial metadata.
186 if (!fi_policy->abort_code_header.empty() ||
187 !fi_policy->abort_percentage_header.empty() ||
188 !fi_policy->delay_header.empty() ||
189 !fi_policy->delay_percentage_header.empty()) {
190 std::string buffer;
191 if (!fi_policy->abort_code_header.empty() && abort_code == GRPC_STATUS_OK) {
192 auto value = initial_metadata.GetStringValue(fi_policy->abort_code_header,
193 &buffer);
194 if (value.has_value()) {
195 grpc_status_code_from_int(
196 AsInt<int>(*value).value_or(GRPC_STATUS_UNKNOWN), &abort_code);
197 }
198 }
199 if (!fi_policy->abort_percentage_header.empty()) {
200 auto value = initial_metadata.GetStringValue(
201 fi_policy->abort_percentage_header, &buffer);
202 if (value.has_value()) {
203 abort_percentage_numerator = std::min(
204 AsInt<uint32_t>(*value).value_or(-1), abort_percentage_numerator);
205 }
206 }
207 if (!fi_policy->delay_header.empty() && delay == Duration::Zero()) {
208 auto value =
209 initial_metadata.GetStringValue(fi_policy->delay_header, &buffer);
210 if (value.has_value()) {
211 delay = Duration::Milliseconds(
212 std::max(AsInt<int64_t>(*value).value_or(0), int64_t{0}));
213 }
214 }
215 if (!fi_policy->delay_percentage_header.empty()) {
216 auto value = initial_metadata.GetStringValue(
217 fi_policy->delay_percentage_header, &buffer);
218 if (value.has_value()) {
219 delay_percentage_numerator = std::min(
220 AsInt<uint32_t>(*value).value_or(-1), delay_percentage_numerator);
221 }
222 }
223 }
224 // Roll the dice
225 bool delay_request = delay != Duration::Zero();
226 bool abort_request = abort_code != GRPC_STATUS_OK;
227 if (delay_request || abort_request) {
228 MutexLock lock(&mu_);
229 if (delay_request) {
230 delay_request =
231 UnderFraction(&delay_rand_generator_, delay_percentage_numerator,
232 fi_policy->delay_percentage_denominator);
233 }
234 if (abort_request) {
235 abort_request =
236 UnderFraction(&abort_rand_generator_, abort_percentage_numerator,
237 fi_policy->abort_percentage_denominator);
238 }
239 }
240
241 return InjectionDecision(
242 fi_policy->max_faults, delay_request ? delay : Duration::Zero(),
243 abort_request ? absl::optional<absl::Status>(absl::Status(
244 static_cast<absl::StatusCode>(abort_code),
245 fi_policy->abort_message))
246 : absl::nullopt);
247 }
248
HaveActiveFaultsQuota() const249 bool FaultInjectionFilter::InjectionDecision::HaveActiveFaultsQuota() const {
250 return g_active_faults.load(std::memory_order_acquire) < max_faults_;
251 }
252
DelayUntil()253 Timestamp FaultInjectionFilter::InjectionDecision::DelayUntil() {
254 if (delay_time_ != Duration::Zero() && HaveActiveFaultsQuota()) {
255 active_fault_ = FaultHandle{true};
256 return Timestamp::Now() + delay_time_;
257 }
258 return Timestamp::InfPast();
259 }
260
MaybeAbort() const261 absl::Status FaultInjectionFilter::InjectionDecision::MaybeAbort() const {
262 if (abort_request_.has_value() &&
263 (delay_time_ != Duration::Zero() || HaveActiveFaultsQuota())) {
264 return abort_request_.value();
265 }
266 return absl::OkStatus();
267 }
268
ToString() const269 std::string FaultInjectionFilter::InjectionDecision::ToString() const {
270 return absl::StrCat("delay=", delay_time_ != Duration::Zero(),
271 " abort=", abort_request_.has_value());
272 }
273
274 const grpc_channel_filter FaultInjectionFilter::kFilter =
275 MakePromiseBasedFilter<FaultInjectionFilter, FilterEndpoint::kClient>();
276
FaultInjectionFilterRegister(CoreConfiguration::Builder * builder)277 void FaultInjectionFilterRegister(CoreConfiguration::Builder* builder) {
278 FaultInjectionServiceConfigParser::Register(builder);
279 }
280
281 } // namespace grpc_core
282