• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // TODO(ctiller): Add a unit test suite for these filters once it's practical to
16 // mock transport operations.
17 
18 #include "src/core/ext/filters/channel_idle/legacy_channel_idle_filter.h"
19 
20 #include <grpc/impl/channel_arg_names.h>
21 #include <grpc/support/port_platform.h>
22 
23 #include <functional>
24 #include <utility>
25 
26 #include "absl/base/thread_annotations.h"
27 #include "absl/meta/type_traits.h"
28 #include "absl/random/random.h"
29 #include "absl/status/statusor.h"
30 #include "absl/types/optional.h"
31 #include "src/core/config/core_configuration.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/channel/promise_based_filter.h"
34 #include "src/core/lib/debug/trace.h"
35 #include "src/core/lib/experiments/experiments.h"
36 #include "src/core/lib/iomgr/closure.h"
37 #include "src/core/lib/iomgr/error.h"
38 #include "src/core/lib/iomgr/exec_ctx.h"
39 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
40 #include "src/core/lib/promise/loop.h"
41 #include "src/core/lib/promise/poll.h"
42 #include "src/core/lib/promise/promise.h"
43 #include "src/core/lib/promise/sleep.h"
44 #include "src/core/lib/promise/try_seq.h"
45 #include "src/core/lib/resource_quota/arena.h"
46 #include "src/core/lib/surface/channel_stack_type.h"
47 #include "src/core/lib/transport/http2_errors.h"
48 #include "src/core/lib/transport/metadata_batch.h"
49 #include "src/core/util/debug_location.h"
50 #include "src/core/util/no_destruct.h"
51 #include "src/core/util/orphanable.h"
52 #include "src/core/util/per_cpu.h"
53 #include "src/core/util/status_helper.h"
54 #include "src/core/util/sync.h"
55 
56 namespace grpc_core {
57 
58 namespace {
59 
60 constexpr Duration kDefaultIdleTimeout = Duration::Minutes(30);
61 
62 // If these settings change, make sure that we are not sending a GOAWAY for
63 // inproc transport, since a GOAWAY to inproc ends up destroying the transport.
64 const auto kDefaultMaxConnectionAge = Duration::Infinity();
65 const auto kDefaultMaxConnectionAgeGrace = Duration::Infinity();
66 const auto kDefaultMaxConnectionIdle = Duration::Infinity();
67 const auto kMaxConnectionAgeJitter = 0.1;
68 
69 }  // namespace
70 
GetClientIdleTimeout(const ChannelArgs & args)71 Duration GetClientIdleTimeout(const ChannelArgs& args) {
72   return args.GetDurationFromIntMillis(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS)
73       .value_or(kDefaultIdleTimeout);
74 }
75 
76 struct LegacyMaxAgeFilter::Config {
77   Duration max_connection_age;
78   Duration max_connection_idle;
79   Duration max_connection_age_grace;
80 
enablegrpc_core::LegacyMaxAgeFilter::Config81   bool enable() const {
82     return max_connection_age != Duration::Infinity() ||
83            max_connection_idle != Duration::Infinity();
84   }
85 
86   // A random jitter of +/-10% will be added to MAX_CONNECTION_AGE and
87   // MAX_CONNECTION_IDLE to spread out reconnection storms.
FromChannelArgsgrpc_core::LegacyMaxAgeFilter::Config88   static Config FromChannelArgs(const ChannelArgs& args) {
89     const Duration args_max_age =
90         args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_MS)
91             .value_or(kDefaultMaxConnectionAge);
92     const Duration args_max_idle =
93         args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_IDLE_MS)
94             .value_or(kDefaultMaxConnectionIdle);
95     const Duration args_max_age_grace =
96         args.GetDurationFromIntMillis(GRPC_ARG_MAX_CONNECTION_AGE_GRACE_MS)
97             .value_or(kDefaultMaxConnectionAgeGrace);
98     // generate a random number between 1 - kMaxConnectionAgeJitter and
99     // 1 + kMaxConnectionAgeJitter
100     struct BitGen {
101       Mutex mu;
102       absl::BitGen bit_gen ABSL_GUARDED_BY(mu);
103       double MakeUniformDouble(double min, double max) {
104         MutexLock lock(&mu);
105         return absl::Uniform(bit_gen, min, max);
106       }
107     };
108     static NoDestruct<PerCpu<BitGen>> bit_gen(PerCpuOptions().SetMaxShards(8));
109     const double multiplier = bit_gen->this_cpu().MakeUniformDouble(
110         1.0 - kMaxConnectionAgeJitter, 1.0 + kMaxConnectionAgeJitter);
111     // GRPC_MILLIS_INF_FUTURE - 0.5 converts the value to float, so that result
112     // will not be cast to int implicitly before the comparison.
113     return Config{args_max_age * multiplier, args_max_idle * multiplier,
114                   args_max_age_grace};
115   }
116 };
117 
118 // We need access to the channel stack here to send a goaway - but that access
119 // is deprecated and will be removed when call-v3 is fully enabled. This filter
120 // will be removed at that time also, so just disable the deprecation warning
121 // for now.
122 ABSL_INTERNAL_DISABLE_DEPRECATED_DECLARATION_WARNING
123 absl::StatusOr<std::unique_ptr<LegacyClientIdleFilter>>
Create(const ChannelArgs & args,ChannelFilter::Args filter_args)124 LegacyClientIdleFilter::Create(const ChannelArgs& args,
125                                ChannelFilter::Args filter_args) {
126   return std::make_unique<LegacyClientIdleFilter>(filter_args.channel_stack(),
127                                                   GetClientIdleTimeout(args));
128 }
129 
Create(const ChannelArgs & args,ChannelFilter::Args filter_args)130 absl::StatusOr<std::unique_ptr<LegacyMaxAgeFilter>> LegacyMaxAgeFilter::Create(
131     const ChannelArgs& args, ChannelFilter::Args filter_args) {
132   return std::make_unique<LegacyMaxAgeFilter>(filter_args.channel_stack(),
133                                               Config::FromChannelArgs(args));
134 }
135 ABSL_INTERNAL_RESTORE_DEPRECATED_DECLARATION_WARNING
136 
Shutdown()137 void LegacyMaxAgeFilter::Shutdown() {
138   max_age_activity_.Reset();
139   LegacyChannelIdleFilter::Shutdown();
140 }
141 
PostInit()142 void LegacyMaxAgeFilter::PostInit() {
143   struct StartupClosure {
144     RefCountedPtr<grpc_channel_stack> channel_stack;
145     LegacyMaxAgeFilter* filter;
146     grpc_closure closure;
147   };
148   auto run_startup = [](void* p, grpc_error_handle) {
149     auto* startup = static_cast<StartupClosure*>(p);
150     // Trigger idle timer
151     startup->filter->IncreaseCallCount();
152     startup->filter->DecreaseCallCount();
153     grpc_transport_op* op = grpc_make_transport_op(nullptr);
154     op->start_connectivity_watch.reset(
155         new ConnectivityWatcher(startup->filter));
156     op->start_connectivity_watch_state = GRPC_CHANNEL_IDLE;
157     grpc_channel_next_op(
158         grpc_channel_stack_element(startup->channel_stack.get(), 0), op);
159     delete startup;
160   };
161   auto* startup =
162       new StartupClosure{this->channel_stack()->Ref(), this, grpc_closure{}};
163   GRPC_CLOSURE_INIT(&startup->closure, run_startup, startup, nullptr);
164   ExecCtx::Run(DEBUG_LOCATION, &startup->closure, absl::OkStatus());
165 
166   auto channel_stack = this->channel_stack()->Ref();
167 
168   // Start the max age timer
169   if (max_connection_age_ != Duration::Infinity()) {
170     auto arena = SimpleArenaAllocator(0)->MakeArena();
171     arena->SetContext<grpc_event_engine::experimental::EventEngine>(
172         channel_stack->EventEngine());
173     max_age_activity_.Set(MakeActivity(
174         TrySeq(
175             // First sleep until the max connection age
176             Sleep(Timestamp::Now() + max_connection_age_),
177             // Then send a goaway.
178             [this] {
179               GRPC_CHANNEL_STACK_REF(this->channel_stack(),
180                                      "max_age send_goaway");
181               // Jump out of the activity to send the goaway.
182               auto fn = [](void* arg, grpc_error_handle) {
183                 auto* channel_stack = static_cast<grpc_channel_stack*>(arg);
184                 grpc_transport_op* op = grpc_make_transport_op(nullptr);
185                 op->goaway_error = grpc_error_set_int(
186                     GRPC_ERROR_CREATE("max_age"),
187                     StatusIntProperty::kHttp2Error, GRPC_HTTP2_NO_ERROR);
188                 grpc_channel_element* elem =
189                     grpc_channel_stack_element(channel_stack, 0);
190                 elem->filter->start_transport_op(elem, op);
191                 GRPC_CHANNEL_STACK_UNREF(channel_stack, "max_age send_goaway");
192               };
193               ExecCtx::Run(
194                   DEBUG_LOCATION,
195                   GRPC_CLOSURE_CREATE(fn, this->channel_stack(), nullptr),
196                   absl::OkStatus());
197               return Immediate(absl::OkStatus());
198             },
199             // Sleep for the grace period
200             [this] {
201               return Sleep(Timestamp::Now() + max_connection_age_grace_);
202             }),
203         ExecCtxWakeupScheduler(),
204         [channel_stack, this](absl::Status status) {
205           // OnDone -- close the connection if the promise completed
206           // successfully.
207           // (if it did not, it was cancelled)
208           if (status.ok()) CloseChannel("max connection age");
209         },
210         std::move(arena)));
211   }
212 }
213 
214 // Construct a promise for one call.
MakeCallPromise(CallArgs call_args,NextPromiseFactory next_promise_factory)215 ArenaPromise<ServerMetadataHandle> LegacyChannelIdleFilter::MakeCallPromise(
216     CallArgs call_args, NextPromiseFactory next_promise_factory) {
217   using Decrementer =
218       std::unique_ptr<LegacyChannelIdleFilter, CallCountDecreaser>;
219   IncreaseCallCount();
220   return ArenaPromise<ServerMetadataHandle>(
221       [decrementer = Decrementer(this),
222        next = next_promise_factory(std::move(call_args))]() mutable
223           -> Poll<ServerMetadataHandle> { return next(); });
224 }
225 
StartTransportOp(grpc_transport_op * op)226 bool LegacyChannelIdleFilter::StartTransportOp(grpc_transport_op* op) {
227   // Catch the disconnect_with_error transport op.
228   if (!op->disconnect_with_error.ok()) Shutdown();
229   // Pass the op to the next filter.
230   return false;
231 }
232 
Shutdown()233 void LegacyChannelIdleFilter::Shutdown() {
234   // IncreaseCallCount() introduces a phony call and prevent the timer from
235   // being reset by other threads.
236   IncreaseCallCount();
237   activity_.Reset();
238 }
239 
IncreaseCallCount()240 void LegacyChannelIdleFilter::IncreaseCallCount() {
241   idle_filter_state_->IncreaseCallCount();
242 }
243 
DecreaseCallCount()244 void LegacyChannelIdleFilter::DecreaseCallCount() {
245   if (idle_filter_state_->DecreaseCallCount()) {
246     // If there are no more calls in progress, start the idle timer.
247     StartIdleTimer();
248   }
249 }
250 
StartIdleTimer()251 void LegacyChannelIdleFilter::StartIdleTimer() {
252   GRPC_TRACE_LOG(client_idle_filter, INFO)
253       << "(client idle filter) timer has started";
254   auto idle_filter_state = idle_filter_state_;
255   // Hold a ref to the channel stack for the timer callback.
256   auto channel_stack = channel_stack_->Ref();
257   auto timeout = client_idle_timeout_;
258   auto promise = Loop([timeout, idle_filter_state]() {
259     return TrySeq(Sleep(Timestamp::Now() + timeout),
260                   [idle_filter_state]() -> Poll<LoopCtl<absl::Status>> {
261                     if (idle_filter_state->CheckTimer()) {
262                       return Continue{};
263                     } else {
264                       return absl::OkStatus();
265                     }
266                   });
267   });
268   auto arena = SimpleArenaAllocator()->MakeArena();
269   arena->SetContext<grpc_event_engine::experimental::EventEngine>(
270       channel_stack_->EventEngine());
271   activity_.Set(MakeActivity(
272       std::move(promise), ExecCtxWakeupScheduler{},
273       [channel_stack, this](absl::Status status) {
274         if (status.ok()) CloseChannel("connection idle");
275       },
276       std::move(arena)));
277 }
278 
CloseChannel(absl::string_view reason)279 void LegacyChannelIdleFilter::CloseChannel(absl::string_view reason) {
280   auto* op = grpc_make_transport_op(nullptr);
281   op->disconnect_with_error = grpc_error_set_int(
282       GRPC_ERROR_CREATE(reason), StatusIntProperty::ChannelConnectivityState,
283       GRPC_CHANNEL_IDLE);
284   // Pass the transport op down to the channel stack.
285   auto* elem = grpc_channel_stack_element(channel_stack_, 0);
286   elem->filter->start_transport_op(elem, op);
287 }
288 
289 const grpc_channel_filter LegacyClientIdleFilter::kFilter =
290     MakePromiseBasedFilter<LegacyClientIdleFilter, FilterEndpoint::kClient>();
291 const grpc_channel_filter LegacyMaxAgeFilter::kFilter =
292     MakePromiseBasedFilter<LegacyMaxAgeFilter, FilterEndpoint::kServer>();
293 
RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder * builder)294 void RegisterLegacyChannelIdleFilters(CoreConfiguration::Builder* builder) {
295   builder->channel_init()
296       ->RegisterV2Filter<LegacyClientIdleFilter>(GRPC_CLIENT_CHANNEL)
297       .ExcludeFromMinimalStack()
298       .If([](const ChannelArgs& channel_args) {
299         return GetClientIdleTimeout(channel_args) != Duration::Infinity();
300       });
301   builder->channel_init()
302       ->RegisterV2Filter<LegacyMaxAgeFilter>(GRPC_SERVER_CHANNEL)
303       .ExcludeFromMinimalStack()
304       .If([](const ChannelArgs& channel_args) {
305         return LegacyMaxAgeFilter::Config::FromChannelArgs(channel_args)
306             .enable();
307       });
308 }
309 
LegacyMaxAgeFilter(grpc_channel_stack * channel_stack,const Config & max_age_config)310 LegacyMaxAgeFilter::LegacyMaxAgeFilter(grpc_channel_stack* channel_stack,
311                                        const Config& max_age_config)
312     : LegacyChannelIdleFilter(channel_stack,
313                               max_age_config.max_connection_idle),
314       max_connection_age_(max_age_config.max_connection_age),
315       max_connection_age_grace_(max_age_config.max_connection_age_grace) {}
316 
317 }  // namespace grpc_core
318