1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
20
21 #include <grpc/support/port_platform.h>
22 #include <grpc/support/time.h>
23
24 #include <memory>
25 #include <utility>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/time/time.h"
30 #include "absl/types/optional.h"
31 #include "src/core/lib/debug/trace.h"
32
33 static thread_local bool g_timer_thread;
34
35 namespace grpc_event_engine {
36 namespace experimental {
37
RunSomeTimers(std::vector<experimental::EventEngine::Closure * > timers)38 void TimerManager::RunSomeTimers(
39 std::vector<experimental::EventEngine::Closure*> timers) {
40 for (auto* timer : timers) {
41 thread_pool_->Run(timer);
42 }
43 }
44
45 // wait until 'next' (or forever if there is already a timed waiter in the pool)
46 // returns true if the thread should continue executing (false if it should
47 // shutdown)
WaitUntil(grpc_core::Timestamp next)48 bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
49 grpc_core::MutexLock lock(&mu_);
50 if (shutdown_) return false;
51 // If kicked_ is true at this point, it means there was a kick from the timer
52 // system that the timer-manager threads here missed. We cannot trust 'next'
53 // here any longer (since there might be an earlier deadline). So if kicked_
54 // is true at this point, we should quickly exit this and get the next
55 // deadline from the timer system
56 if (!kicked_) {
57 cv_wait_.WaitWithTimeout(&mu_,
58 absl::Milliseconds((next - host_.Now()).millis()));
59 ++wakeups_;
60 }
61 kicked_ = false;
62 return true;
63 }
64
MainLoop()65 void TimerManager::MainLoop() {
66 grpc_core::Timestamp next = grpc_core::Timestamp::InfFuture();
67 absl::optional<std::vector<experimental::EventEngine::Closure*>>
68 check_result = timer_list_->TimerCheck(&next);
69 CHECK(check_result.has_value())
70 << "ERROR: More than one MainLoop is running.";
71 bool timers_found = !check_result->empty();
72 if (timers_found) {
73 RunSomeTimers(std::move(*check_result));
74 }
75 thread_pool_->Run([this, next, timers_found]() {
76 if (!timers_found && !WaitUntil(next)) {
77 main_loop_exit_signal_->Notify();
78 return;
79 }
80 MainLoop();
81 });
82 }
83
IsTimerManagerThread()84 bool TimerManager::IsTimerManagerThread() { return g_timer_thread; }
85
TimerManager(std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)86 TimerManager::TimerManager(
87 std::shared_ptr<grpc_event_engine::experimental::ThreadPool> thread_pool)
88 : host_(this), thread_pool_(std::move(thread_pool)) {
89 timer_list_ = std::make_unique<TimerList>(&host_);
90 main_loop_exit_signal_.emplace();
91 thread_pool_->Run([this]() { MainLoop(); });
92 }
93
Now()94 grpc_core::Timestamp TimerManager::Host::Now() {
95 return grpc_core::Timestamp::FromTimespecRoundDown(
96 gpr_now(GPR_CLOCK_MONOTONIC));
97 }
98
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)99 void TimerManager::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
100 experimental::EventEngine::Closure* closure) {
101 if (GRPC_TRACE_FLAG_ENABLED(timer)) {
102 grpc_core::MutexLock lock(&mu_);
103 if (shutdown_) {
104 LOG(ERROR) << "WARNING: TimerManager::" << this
105 << ": scheduling Closure::" << closure
106 << " after TimerManager has been shut down.";
107 }
108 }
109 timer_list_->TimerInit(timer, deadline, closure);
110 }
111
TimerCancel(Timer * timer)112 bool TimerManager::TimerCancel(Timer* timer) {
113 return timer_list_->TimerCancel(timer);
114 }
115
Shutdown()116 void TimerManager::Shutdown() {
117 {
118 grpc_core::MutexLock lock(&mu_);
119 if (shutdown_) return;
120 GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutting down";
121 shutdown_ = true;
122 // Wait on the main loop to exit.
123 cv_wait_.Signal();
124 }
125 main_loop_exit_signal_->WaitForNotification();
126 GRPC_TRACE_VLOG(timer, 2) << "TimerManager::" << this << " shutdown complete";
127 }
128
~TimerManager()129 TimerManager::~TimerManager() { Shutdown(); }
130
Kick()131 void TimerManager::Host::Kick() { timer_manager_->Kick(); }
132
Kick()133 void TimerManager::Kick() {
134 grpc_core::MutexLock lock(&mu_);
135 kicked_ = true;
136 cv_wait_.Signal();
137 }
138
RestartPostFork()139 void TimerManager::RestartPostFork() {
140 grpc_core::MutexLock lock(&mu_);
141 CHECK(GPR_LIKELY(shutdown_));
142 GRPC_TRACE_VLOG(timer, 2)
143 << "TimerManager::" << this << " restarting after shutdown";
144 shutdown_ = false;
145 main_loop_exit_signal_.emplace();
146 thread_pool_->Run([this]() { MainLoop(); });
147 }
148
PrepareFork()149 void TimerManager::PrepareFork() { Shutdown(); }
PostforkParent()150 void TimerManager::PostforkParent() { RestartPostFork(); }
PostforkChild()151 void TimerManager::PostforkChild() { RestartPostFork(); }
152
153 } // namespace experimental
154 } // namespace grpc_event_engine
155