1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/event_engine/posix_engine/timer.h"
20
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/port_platform.h>
23
24 #include <algorithm>
25 #include <atomic>
26 #include <limits>
27 #include <utility>
28
29 #include "src/core/lib/event_engine/posix_engine/timer_heap.h"
30 #include "src/core/util/time.h"
31 #include "src/core/util/useful.h"
32
33 namespace grpc_event_engine {
34 namespace experimental {
35
36 static const size_t kInvalidHeapIndex = std::numeric_limits<size_t>::max();
37 static const double kAddDeadlineScale = 0.33;
38 static const double kMinQueueWindowDuration = 0.01;
39 static const double kMaxQueueWindowDuration = 1.0;
40
ComputeMinDeadline()41 grpc_core::Timestamp TimerList::Shard::ComputeMinDeadline() {
42 return heap.is_empty()
43 ? queue_deadline_cap + grpc_core::Duration::Epsilon()
44 : grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
45 heap.Top()->deadline);
46 }
47
Shard()48 TimerList::Shard::Shard() : stats(1.0 / kAddDeadlineScale, 0.1, 0.5) {}
49
TimerList(TimerListHost * host)50 TimerList::TimerList(TimerListHost* host)
51 : host_(host),
52 num_shards_(grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u)),
53 min_timer_(host_->Now().milliseconds_after_process_epoch()),
54 shards_(new Shard[num_shards_]),
55 shard_queue_(new Shard*[num_shards_]) {
56 for (size_t i = 0; i < num_shards_; i++) {
57 Shard& shard = shards_[i];
58 shard.queue_deadline_cap =
59 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
60 min_timer_.load(std::memory_order_relaxed));
61 shard.shard_queue_index = i;
62 shard.list.next = shard.list.prev = &shard.list;
63 shard.min_deadline = shard.ComputeMinDeadline();
64 shard_queue_[i] = &shard;
65 }
66 }
67
68 namespace {
69 // returns true if the first element in the list
ListJoin(Timer * head,Timer * timer)70 void ListJoin(Timer* head, Timer* timer) {
71 timer->next = head;
72 timer->prev = head->prev;
73 timer->next->prev = timer->prev->next = timer;
74 }
75
ListRemove(Timer * timer)76 void ListRemove(Timer* timer) {
77 timer->next->prev = timer->prev;
78 timer->prev->next = timer->next;
79 }
80 } // namespace
81
SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index)82 void TimerList::SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) {
83 Shard* temp;
84 temp = shard_queue_[first_shard_queue_index];
85 shard_queue_[first_shard_queue_index] =
86 shard_queue_[first_shard_queue_index + 1];
87 shard_queue_[first_shard_queue_index + 1] = temp;
88 shard_queue_[first_shard_queue_index]->shard_queue_index =
89 first_shard_queue_index;
90 shard_queue_[first_shard_queue_index + 1]->shard_queue_index =
91 first_shard_queue_index + 1;
92 }
93
NoteDeadlineChange(Shard * shard)94 void TimerList::NoteDeadlineChange(Shard* shard) {
95 while (shard->shard_queue_index > 0 &&
96 shard->min_deadline <
97 shard_queue_[shard->shard_queue_index - 1]->min_deadline) {
98 SwapAdjacentShardsInQueue(shard->shard_queue_index - 1);
99 }
100 while (shard->shard_queue_index < num_shards_ - 1 &&
101 shard->min_deadline >
102 shard_queue_[shard->shard_queue_index + 1]->min_deadline) {
103 SwapAdjacentShardsInQueue(shard->shard_queue_index);
104 }
105 }
106
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)107 void TimerList::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
108 experimental::EventEngine::Closure* closure) {
109 bool is_first_timer = false;
110 Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)];
111 timer->closure = closure;
112 timer->deadline = deadline.milliseconds_after_process_epoch();
113
114 #ifndef NDEBUG
115 timer->hash_table_next = nullptr;
116 #endif
117
118 {
119 grpc_core::MutexLock lock(&shard->mu);
120 timer->pending = true;
121 grpc_core::Timestamp now = host_->Now();
122 if (deadline <= now) {
123 deadline = now;
124 }
125
126 shard->stats.AddSample((deadline - now).millis() / 1000.0);
127
128 if (deadline < shard->queue_deadline_cap) {
129 is_first_timer = shard->heap.Add(timer);
130 } else {
131 timer->heap_index = kInvalidHeapIndex;
132 ListJoin(&shard->list, timer);
133 }
134 }
135
136 // Deadline may have decreased, we need to adjust the main queue. Note
137 // that there is a potential racy unlocked region here. There could be a
138 // reordering of multiple TimerInit calls, at this point, but the < test
139 // below should ensure that we err on the side of caution. There could
140 // also be a race with TimerCheck, which might beat us to the lock. In
141 // that case, it is possible that the timer that we added will have already
142 // run by the time we hold the lock, but that too is a safe error.
143 // Finally, it's possible that the TimerCheck that intervened failed to
144 // trigger the new timer because the min_deadline hadn't yet been reduced.
145 // In that case, the timer will simply have to wait for the next
146 // TimerCheck.
147 if (is_first_timer) {
148 grpc_core::MutexLock lock(&mu_);
149 if (deadline < shard->min_deadline) {
150 grpc_core::Timestamp old_min_deadline = shard_queue_[0]->min_deadline;
151 shard->min_deadline = deadline;
152 NoteDeadlineChange(shard);
153 if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
154 min_timer_.store(deadline.milliseconds_after_process_epoch(),
155 std::memory_order_relaxed);
156 host_->Kick();
157 }
158 }
159 }
160 }
161
TimerCancel(Timer * timer)162 bool TimerList::TimerCancel(Timer* timer) {
163 Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)];
164 grpc_core::MutexLock lock(&shard->mu);
165
166 if (timer->pending) {
167 timer->pending = false;
168 if (timer->heap_index == kInvalidHeapIndex) {
169 ListRemove(timer);
170 } else {
171 shard->heap.Remove(timer);
172 }
173 return true;
174 }
175
176 return false;
177 }
178
179 // Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
180 // all relevant timers in shard->list (i.e timers with deadlines earlier than
181 // 'queue_deadline_cap') into into shard->heap.
182 // Returns 'true' if shard->heap has at least ONE element
RefillHeap(grpc_core::Timestamp now)183 bool TimerList::Shard::RefillHeap(grpc_core::Timestamp now) {
184 // Compute the new queue window width and bound by the limits:
185 double computed_deadline_delta = stats.UpdateAverage() * kAddDeadlineScale;
186 double deadline_delta =
187 grpc_core::Clamp(computed_deadline_delta, kMinQueueWindowDuration,
188 kMaxQueueWindowDuration);
189 Timer *timer, *next;
190
191 // Compute the new cap and put all timers under it into the queue:
192 queue_deadline_cap = std::max(now, queue_deadline_cap) +
193 grpc_core::Duration::FromSecondsAsDouble(deadline_delta);
194
195 for (timer = list.next; timer != &list; timer = next) {
196 next = timer->next;
197 auto timer_deadline =
198 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
199 timer->deadline);
200
201 if (timer_deadline < queue_deadline_cap) {
202 ListRemove(timer);
203 heap.Add(timer);
204 }
205 }
206 return !heap.is_empty();
207 }
208
209 // This pops the next non-cancelled timer with deadline <= now from the
210 // queue, or returns NULL if there isn't one.
PopOne(grpc_core::Timestamp now)211 Timer* TimerList::Shard::PopOne(grpc_core::Timestamp now) {
212 Timer* timer;
213 for (;;) {
214 if (heap.is_empty()) {
215 if (now < queue_deadline_cap) return nullptr;
216 if (!RefillHeap(now)) return nullptr;
217 }
218 timer = heap.Top();
219 auto timer_deadline =
220 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
221 timer->deadline);
222 if (timer_deadline > now) return nullptr;
223 timer->pending = false;
224 heap.Pop();
225 return timer;
226 }
227 }
228
PopTimers(grpc_core::Timestamp now,grpc_core::Timestamp * new_min_deadline,std::vector<experimental::EventEngine::Closure * > * out)229 void TimerList::Shard::PopTimers(
230 grpc_core::Timestamp now, grpc_core::Timestamp* new_min_deadline,
231 std::vector<experimental::EventEngine::Closure*>* out) {
232 grpc_core::MutexLock lock(&mu);
233 while (Timer* timer = PopOne(now)) {
234 out->push_back(timer->closure);
235 }
236 *new_min_deadline = ComputeMinDeadline();
237 }
238
FindExpiredTimers(grpc_core::Timestamp now,grpc_core::Timestamp * next)239 std::vector<experimental::EventEngine::Closure*> TimerList::FindExpiredTimers(
240 grpc_core::Timestamp now, grpc_core::Timestamp* next) {
241 grpc_core::Timestamp min_timer =
242 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
243 min_timer_.load(std::memory_order_relaxed));
244
245 std::vector<experimental::EventEngine::Closure*> done;
246 if (now < min_timer) {
247 if (next != nullptr) *next = std::min(*next, min_timer);
248 return done;
249 }
250
251 grpc_core::MutexLock lock(&mu_);
252
253 while (shard_queue_[0]->min_deadline < now ||
254 (now != grpc_core::Timestamp::InfFuture() &&
255 shard_queue_[0]->min_deadline == now)) {
256 grpc_core::Timestamp new_min_deadline;
257
258 // For efficiency, we pop as many available timers as we can from the
259 // shard. This may violate perfect timer deadline ordering, but that
260 // shouldn't be a big deal because we don't make ordering guarantees.
261 shard_queue_[0]->PopTimers(now, &new_min_deadline, &done);
262
263 // An TimerInit() on the shard could intervene here, adding a new
264 // timer that is earlier than new_min_deadline. However,
265 // TimerInit() will block on the mutex before it can call
266 // set_min_deadline, so this one will complete first and then the Addtimer
267 // will reduce the min_deadline (perhaps unnecessarily).
268 shard_queue_[0]->min_deadline = new_min_deadline;
269 NoteDeadlineChange(shard_queue_[0]);
270 }
271
272 if (next) {
273 *next = std::min(*next, shard_queue_[0]->min_deadline);
274 }
275
276 min_timer_.store(
277 shard_queue_[0]->min_deadline.milliseconds_after_process_epoch(),
278 std::memory_order_relaxed);
279
280 return done;
281 }
282
283 absl::optional<std::vector<experimental::EventEngine::Closure*>>
TimerCheck(grpc_core::Timestamp * next)284 TimerList::TimerCheck(grpc_core::Timestamp* next) {
285 // prelude
286 grpc_core::Timestamp now = host_->Now();
287
288 // fetch from a thread-local first: this avoids contention on a globally
289 // mutable cacheline in the common case
290 grpc_core::Timestamp min_timer =
291 grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
292 min_timer_.load(std::memory_order_relaxed));
293
294 if (now < min_timer) {
295 if (next != nullptr) {
296 *next = std::min(*next, min_timer);
297 }
298 return std::vector<experimental::EventEngine::Closure*>();
299 }
300
301 if (!checker_mu_.TryLock()) return absl::nullopt;
302 std::vector<experimental::EventEngine::Closure*> run =
303 FindExpiredTimers(now, next);
304 checker_mu_.Unlock();
305
306 return std::move(run);
307 }
308
309 } // namespace experimental
310 } // namespace grpc_event_engine
311