• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <grpc/support/time.h>
23 
24 #include <memory>
25 #include <utility>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/time/time.h"
30 #include "absl/types/optional.h"
31 #include "src/core/lib/debug/trace.h"
32 
33 static thread_local bool g_timer_thread;
34 
35 namespace grpc_event_engine {
36 namespace experimental {
37 
RunSomeTimers(std::vector<experimental::EventEngine::Closure * > timers)38 void TimerManager::RunSomeTimers(
39     std::vector<experimental::EventEngine::Closure*> timers) {
40   for (auto* timer : timers) {
41     thread_pool_->Run(timer);
42   }
43 }
44 
45 // wait until 'next' (or forever if there is already a timed waiter in the pool)
46 // returns true if the thread should continue executing (false if it should
47 // shutdown)
WaitUntil(grpc_core::Timestamp next)48 bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
49   grpc_core::MutexLock lock(&mu_);
50   if (shutdown_) return false;
51   // If kicked_ is true at this point, it means there was a kick from the timer
52   // system that the timer-manager threads here missed. We cannot trust 'next'
53   // here any longer (since there might be an earlier deadline). So if kicked_
54   // is true at this point, we should quickly exit this and get the next
55   // deadline from the timer system
56   if (!kicked_) {
57     cv_wait_.WaitWithTimeout(&mu_,
58                              absl::Milliseconds((next - host_.Now()).millis()));
59     ++wakeups_;
60   }
61   kicked_ = false;
62   return true;
63 }
64 
MainLoop()65 void TimerManager::MainLoop() {
66   grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
67   absl::optional<std::vector<experimental::EventEngine::Closure*>>
68       check_result = timer_list_->TimerCheck(&next);
69   CHECK(check_result.has_value())
70       << "ERROR: More than one MainLoop is running.";
71   bool timers_found = !check_result->empty();
72   if (timers_found) {
73     RunSomeTimers(std::move(*check_result));
74   }
75   thread_pool_->Run([this, next, timers_found]() {
76     if (!timers_found && !WaitUntil(next)) {
77       main_loop_exit_signal_->Notify();
78       return;
79     }
80     MainLoop();
81   });
82 }
83 
IsTimerManagerThread()84 bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
85 
TimerManager(std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)86 TimerManager::TimerManager(
87     std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
88     : host_(this), thread_pool_(std::move(thread_pool)) {
89   timer_list_ = std::make_unique<TimerList>(&host_);
90   main_loop_exit_signal_.emplace();
91   thread_pool_->Run([this]() { MainLoop(); });
92 }
93 
Now()94 grpc_core::Timestamp TimerManager::Host::Now() {
95   return grpc_core::Timestamp::FromTimespecRoundDown(
96       gpr_now(GPR_CLOCK_MONOTONIC));
97 }
98 
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)99 void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
100                              experimental::EventEngine::Closure* closure) {
101   if (GRPC_TRACE_FLAG_ENABLED(timer)) {
102     grpc_core::MutexLock lock(&mu_);
103     if (shutdown_) {
104       LOG(ERROR) << "WARNING: TimerManager::" << this
105                  << ": scheduling Closure::" << closure
106                  << " after TimerManager has been shut down.";
107     }
108   }
109   timer_list_->TimerInit(timer, deadline, closure);
110 }
111 
TimerCancel(Timer * timer)112 bool TimerManager::TimerCancel(Timer* timer) {
113   return timer_list_->TimerCancel(timer);
114 }
115 
Shutdown()116 void TimerManager::Shutdown() {
117   {
118     grpc_core::MutexLock lock(&mu_);
119     if (shutdown_) return;
120     GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutting down";
121     shutdown_ = true;
122     // Wait on the main loop to exit.
123     cv_wait_.Signal();
124   }
125   main_loop_exit_signal_->WaitForNotification();
126   GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutdown complete";
127 }
128 
~TimerManager()129 TimerManager::~TimerManager() { Shutdown(); }
130 
Kick()131 void TimerManager::Host::Kick() { timer_manager_->Kick(); }
132 
Kick()133 void TimerManager::Kick() {
134   grpc_core::MutexLock lock(&mu_);
135   kicked_ = true;
136   cv_wait_.Signal();
137 }
138 
RestartPostFork()139 void TimerManager::RestartPostFork() {
140   grpc_core::MutexLock lock(&mu_);
141   CHECK(GPR_LIKELY(shutdown_));
142   GRPC_TRACE_VLOG(timer, 2)
143       << "TimerManager::" << this << " restarting after shutdown";
144   shutdown_ = false;
145   main_loop_exit_signal_.emplace();
146   thread_pool_->Run([this]() { MainLoop(); });
147 }
148 
PrepareFork()149 void TimerManager::PrepareFork() { Shutdown(); }
PostforkParent()150 void TimerManager::PostforkParent() { RestartPostFork(); }
PostforkChild()151 void TimerManager::PostforkChild() { RestartPostFork(); }
152 
153 }  // namespace experimental
154 }  // namespace grpc_event_engine
155