1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // TODO(ctiller): Add a unit test suite for these filters once it's practical to
16 // mock transport operations.
17
18 #include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
19
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/support/port_platform.h>
22
23 #include <functional>
24 #include <utility>
25
26 #include "absl/base/thread_annotations.h"
27 #include "absl/meta/type_traits.h"
28 #include "absl/random/random.h"
29 #include "absl/status/statusor.h"
30 #include "absl/types/optional.h"
31 #include "src/core/config/core_configuration.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/channel/promise_based_filter.h"
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/experiments/experiments.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
40 #include "src/core/lib/promise/loop.h"
41 #include "src/core/lib/promise/poll.h"
42 #include "src/core/lib/promise/promise.h"
43 #include "src/core/lib/promise/sleep.h"
44 #include "src/core/lib/promise/try_seq.h"
45 #include "src/core/lib/resource_quota/arena.h"
46 #include "src/core/lib/surface/channel_stack_type.h"
47 #include "src/core/lib/transport/http2_errors.h"
48 #include "src/core/lib/transport/metadata_batch.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/no_destruct.h"
51 #include "src/core/util/orphanable.h"
52 #include "src/core/util/per_cpu.h"
53 #include "src/core/util/status_helper.h"
54 #include "src/core/util/sync.h"
55
56 namespace grpc_core {
57
58 namespace {
59
60 constexpr Duration kDefaultIdleTimeout = Duration::Minutes(30);
61
62 // If these settings change, make sure that we are not sending a GOAWAY for
63 // inproc transport, since a GOAWAY to inproc ends up destroying the transport.
64 const auto kDefaultMaxConnectionAge = Duration::Infinity();
65 const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity();
66 const auto kDefaultMaxConnectionIdle = Duration::Infinity();
67 const auto kMaxConnectionAgeJitter = 0.1;
68
69 } // namespace
70
GetClientIdleTimeout(const ChannelArgs & args)71 Duration GetClientIdleTimeout(const ChannelArgs& args) {
72 return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
73 .value_or(kDefaultIdleTimeout);
74 }
75
76 struct LegacyMaxAgeFilter::Config {
77 Duration max_connection_age;
78 Duration max_connection_idle;
79 Duration max_connection_age_grace;
80
enablegrpc_core::LegacyMaxAgeFilter::Config81 bool enable() const {
82 return max_connection_age != Duration::Infinity() ||
83 max_connection_idle != Duration::Infinity();
84 }
85
86 // A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and
87 // MAX_CONNECTION_IDLE to spread out reconnection storms.
FromChannelArgsgrpc_core::LegacyMaxAgeFilter::Config88 static Config FromChannelArgs(const ChannelArgs& args) {
89 const Duration args_max_age =
90 args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS)
91 .value_or(kDefaultMaxConnectionAge);
92 const Duration args_max_idle =
93 args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
94 .value_or(kDefaultMaxConnectionIdle);
95 const Duration args_max_age_grace =
96 args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)
97 .value_or(kDefaultMaxConnectionAgeGrace);
98 // generate a random number between 1 - kMaxConnectionAgeJitter and
99 // 1 + kMaxConnectionAgeJitter
100 struct BitGen {
101 Mutex mu;
102 absl::BitGen bit_gen ABSL_GUARDED_BY(mu);
103 double MakeUniformDouble(double min, double max) {
104 MutexLock lock(&mu);
105 return absl::Uniform(bit_gen, min, max);
106 }
107 };
108 static NoDestruct<PerCpu<BitGen>> bit_gen(PerCpuOptions().SetMaxShards(8));
109 const double multiplier = bit_gen->this_cpu().MakeUniformDouble(
110 1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter);
111 // GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
112 // will not be cast to int implicitly before the comparison.
113 return Config{args_max_age * multiplier, args_max_idle * multiplier,
114 args_max_age_grace};
115 }
116 };
117
118 // We need access to the channel stack here to send a goaway - but that access
119 // is deprecated and will be removed when call-v3 is fully enabled. This filter
120 // will be removed at that time also, so just disable the deprecation warning
121 // for now.
122 ABSL_INTERNAL_DISABLE_DEPRECATED_DECLARATION_WARNING
123 absl::StatusOr<std::unique_ptr<LegacyClientIdleFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args filter_args)124 LegacyClientIdleFilter::Create(const ChannelArgs& args,
125 ChannelFilter::Args filter_args) {
126 return std::make_unique<LegacyClientIdleFilter>(filter_args.channel_stack(),
127 GetClientIdleTimeout(args));
128 }
129
Create(const ChannelArgs & args,ChannelFilter::Args filter_args)130 absl::StatusOr<std::unique_ptr<LegacyMaxAgeFilter>> LegacyMaxAgeFilter::Create(
131 const ChannelArgs& args, ChannelFilter::Args filter_args) {
132 return std::make_unique<LegacyMaxAgeFilter>(filter_args.channel_stack(),
133 Config::FromChannelArgs(args));
134 }
135 ABSL_INTERNAL_RESTORE_DEPRECATED_DECLARATION_WARNING
136
Shutdown()137 void LegacyMaxAgeFilter::Shutdown() {
138 max_age_activity_.Reset();
139 LegacyChannelIdleFilter::Shutdown();
140 }
141
PostInit()142 void LegacyMaxAgeFilter::PostInit() {
143 struct StartupClosure {
144 RefCountedPtr<grpc_channel_stack> channel_stack;
145 LegacyMaxAgeFilter* filter;
146 grpc_closure closure;
147 };
148 auto run_startup = [](void* p, grpc_error_handle) {
149 auto* startup = static_cast<StartupClosure*>(p);
150 // Trigger idle timer
151 startup->filter->IncreaseCallCount();
152 startup->filter->DecreaseCallCount();
153 grpc_transport_op* op = grpc_make_transport_op(nullptr);
154 op->start_connectivity_watch.reset(
155 new ConnectivityWatcher(startup->filter));
156 op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
157 grpc_channel_next_op(
158 grpc_channel_stack_element(startup->channel_stack.get(), 0), op);
159 delete startup;
160 };
161 auto* startup =
162 new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}};
163 GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr);
164 ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus());
165
166 auto channel_stack = this->channel_stack()->Ref();
167
168 // Start the max age timer
169 if (max_connection_age_ != Duration::Infinity()) {
170 auto arena = SimpleArenaAllocator(0)->MakeArena();
171 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
172 channel_stack->EventEngine());
173 max_age_activity_.Set(MakeActivity(
174 TrySeq(
175 // First sleep until the max connection age
176 Sleep(Timestamp::Now() + max_connection_age_),
177 // Then send a goaway.
178 [this] {
179 GRPC_CHANNEL_STACK_REF(this->channel_stack(),
180 "max_age send_goaway");
181 // Jump out of the activity to send the goaway.
182 auto fn = [](void* arg, grpc_error_handle) {
183 auto* channel_stack = static_cast<grpc_channel_stack*>(arg);
184 grpc_transport_op* op = grpc_make_transport_op(nullptr);
185 op->goaway_error = grpc_error_set_int(
186 GRPC_ERROR_CREATE("max_age"),
187 StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR);
188 grpc_channel_element* elem =
189 grpc_channel_stack_element(channel_stack, 0);
190 elem->filter->start_transport_op(elem, op);
191 GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway");
192 };
193 ExecCtx::Run(
194 DEBUG_LOCATION,
195 GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr),
196 absl::OkStatus());
197 return Immediate(absl::OkStatus());
198 },
199 // Sleep for the grace period
200 [this] {
201 return Sleep(Timestamp::Now() + max_connection_age_grace_);
202 }),
203 ExecCtxWakeupScheduler(),
204 [channel_stack, this](absl::Status status) {
205 // OnDone -- close the connection if the promise completed
206 // successfully.
207 // (if it did not, it was cancelled)
208 if (status.ok()) CloseChannel("max connection age");
209 },
210 std::move(arena)));
211 }
212 }
213
214 // Construct a promise for one call.
MakeCallPromise(CallArgs call_args,NextPromiseFactory next_promise_factory)215 ArenaPromise<ServerMetadataHandle> LegacyChannelIdleFilter::MakeCallPromise(
216 CallArgs call_args, NextPromiseFactory next_promise_factory) {
217 using Decrementer =
218 std::unique_ptr<LegacyChannelIdleFilter, CallCountDecreaser>;
219 IncreaseCallCount();
220 return ArenaPromise<ServerMetadataHandle>(
221 [decrementer = Decrementer(this),
222 next = next_promise_factory(std::move(call_args))]() mutable
223 -> Poll<ServerMetadataHandle> { return next(); });
224 }
225
StartTransportOp(grpc_transport_op * op)226 bool LegacyChannelIdleFilter::StartTransportOp(grpc_transport_op* op) {
227 // Catch the disconnect_with_error transport op.
228 if (!op->disconnect_with_error.ok()) Shutdown();
229 // Pass the op to the next filter.
230 return false;
231 }
232
Shutdown()233 void LegacyChannelIdleFilter::Shutdown() {
234 // IncreaseCallCount() introduces a phony call and prevent the timer from
235 // being reset by other threads.
236 IncreaseCallCount();
237 activity_.Reset();
238 }
239
IncreaseCallCount()240 void LegacyChannelIdleFilter::IncreaseCallCount() {
241 idle_filter_state_->IncreaseCallCount();
242 }
243
DecreaseCallCount()244 void LegacyChannelIdleFilter::DecreaseCallCount() {
245 if (idle_filter_state_->DecreaseCallCount()) {
246 // If there are no more calls in progress, start the idle timer.
247 StartIdleTimer();
248 }
249 }
250
StartIdleTimer()251 void LegacyChannelIdleFilter::StartIdleTimer() {
252 GRPC_TRACE_LOG(client_idle_filter, INFO)
253 << "(client idle filter) timer has started";
254 auto idle_filter_state = idle_filter_state_;
255 // Hold a ref to the channel stack for the timer callback.
256 auto channel_stack = channel_stack_->Ref();
257 auto timeout = client_idle_timeout_;
258 auto promise = Loop([timeout, idle_filter_state]() {
259 return TrySeq(Sleep(Timestamp::Now() + timeout),
260 [idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
261 if (idle_filter_state->CheckTimer()) {
262 return Continue{};
263 } else {
264 return absl::OkStatus();
265 }
266 });
267 });
268 auto arena = SimpleArenaAllocator()->MakeArena();
269 arena->SetContext<grpc_event_engine::experimental::EventEngine>(
270 channel_stack_->EventEngine());
271 activity_.Set(MakeActivity(
272 std::move(promise), ExecCtxWakeupScheduler{},
273 [channel_stack, this](absl::Status status) {
274 if (status.ok()) CloseChannel("connection idle");
275 },
276 std::move(arena)));
277 }
278
CloseChannel(absl::string_view reason)279 void LegacyChannelIdleFilter::CloseChannel(absl::string_view reason) {
280 auto* op = grpc_make_transport_op(nullptr);
281 op->disconnect_with_error = grpc_error_set_int(
282 GRPC_ERROR_CREATE(reason), StatusIntProperty::ChannelConnectivityState,
283 GRPC_CHANNEL_IDLE);
284 // Pass the transport op down to the channel stack.
285 auto* elem = grpc_channel_stack_element(channel_stack_, 0);
286 elem->filter->start_transport_op(elem, op);
287 }
288
289 const grpc_channel_filter LegacyClientIdleFilter::kFilter =
290 MakePromiseBasedFilter<LegacyClientIdleFilter, FilterEndpoint::kClient>();
291 const grpc_channel_filter LegacyMaxAgeFilter::kFilter =
292 MakePromiseBasedFilter<LegacyMaxAgeFilter, FilterEndpoint::kServer>();
293
RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder * builder)294 void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) {
295 builder->channel_init()
296 ->RegisterV2Filter<LegacyClientIdleFilter>(GRPC_CLIENT_CHANNEL)
297 .ExcludeFromMinimalStack()
298 .If([](const ChannelArgs& channel_args) {
299 return GetClientIdleTimeout(channel_args) != Duration::Infinity();
300 });
301 builder->channel_init()
302 ->RegisterV2Filter<LegacyMaxAgeFilter>(GRPC_SERVER_CHANNEL)
303 .ExcludeFromMinimalStack()
304 .If([](const ChannelArgs& channel_args) {
305 return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args)
306 .enable();
307 });
308 }
309
LegacyMaxAgeFilter(grpc_channel_stack * channel_stack,const Config & max_age_config)310 LegacyMaxAgeFilter::LegacyMaxAgeFilter(grpc_channel_stack* channel_stack,
311 const Config& max_age_config)
312 : LegacyChannelIdleFilter(channel_stack,
313 max_age_config.max_connection_idle),
314 max_connection_age_(max_age_config.max_connection_age),
315 max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
316
317 } // namespace grpc_core
318