• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/event_engine/posix_engine/timer.h"
20 
21 #include <grpc/support/cpu.h>
22 #include <grpc/support/port_platform.h>
23 
24 #include <algorithm>
25 #include <atomic>
26 #include <limits>
27 #include <utility>
28 
29 #include "src/core/lib/event_engine/posix_engine/timer_heap.h"
30 #include "src/core/util/time.h"
31 #include "src/core/util/useful.h"
32 
33 namespace grpc_event_engine {
34 namespace experimental {
35 
36 static const size_t kInvalidHeapIndex = std::numeric_limits<size_t>::max();
37 static const double kAddDeadlineScale = 0.33;
38 static const double kMinQueueWindowDuration = 0.01;
39 static const double kMaxQueueWindowDuration = 1.0;
40 
ComputeMinDeadline()41 grpc_core::Timestamp TimerList::Shard::ComputeMinDeadline() {
42   return heap.is_empty()
43              ? queue_deadline_cap + grpc_core::Duration::Epsilon()
44              : grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
45                    heap.Top()->deadline);
46 }
47 
Shard()48 TimerList::Shard::Shard() : stats(1.0 / kAddDeadlineScale, 0.1, 0.5) {}
49 
TimerList(TimerListHost * host)50 TimerList::TimerList(TimerListHost* host)
51     : host_(host),
52       num_shards_(grpc_core::Clamp(2 * gpr_cpu_num_cores(), 1u, 32u)),
53       min_timer_(host_->Now().milliseconds_after_process_epoch()),
54       shards_(new Shard[num_shards_]),
55       shard_queue_(new Shard*[num_shards_]) {
56   for (size_t i = 0; i < num_shards_; i++) {
57     Shard& shard = shards_[i];
58     shard.queue_deadline_cap =
59         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
60             min_timer_.load(std::memory_order_relaxed));
61     shard.shard_queue_index = i;
62     shard.list.next = shard.list.prev = &shard.list;
63     shard.min_deadline = shard.ComputeMinDeadline();
64     shard_queue_[i] = &shard;
65   }
66 }
67 
68 namespace {
69 // returns true if the first element in the list
ListJoin(Timer * head,Timer * timer)70 void ListJoin(Timer* head, Timer* timer) {
71   timer->next = head;
72   timer->prev = head->prev;
73   timer->next->prev = timer->prev->next = timer;
74 }
75 
ListRemove(Timer * timer)76 void ListRemove(Timer* timer) {
77   timer->next->prev = timer->prev;
78   timer->prev->next = timer->next;
79 }
80 }  // namespace
81 
SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index)82 void TimerList::SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index) {
83   Shard* temp;
84   temp = shard_queue_[first_shard_queue_index];
85   shard_queue_[first_shard_queue_index] =
86       shard_queue_[first_shard_queue_index + 1];
87   shard_queue_[first_shard_queue_index + 1] = temp;
88   shard_queue_[first_shard_queue_index]->shard_queue_index =
89       first_shard_queue_index;
90   shard_queue_[first_shard_queue_index + 1]->shard_queue_index =
91       first_shard_queue_index + 1;
92 }
93 
NoteDeadlineChange(Shard * shard)94 void TimerList::NoteDeadlineChange(Shard* shard) {
95   while (shard->shard_queue_index > 0 &&
96          shard->min_deadline <
97              shard_queue_[shard->shard_queue_index - 1]->min_deadline) {
98     SwapAdjacentShardsInQueue(shard->shard_queue_index - 1);
99   }
100   while (shard->shard_queue_index < num_shards_ - 1 &&
101          shard->min_deadline >
102              shard_queue_[shard->shard_queue_index + 1]->min_deadline) {
103     SwapAdjacentShardsInQueue(shard->shard_queue_index);
104   }
105 }
106 
TimerInit(Timer * timer,grpc_core::Timestamp deadline,experimental::EventEngine::Closure * closure)107 void TimerList::TimerInit(Timer* timer, grpc_core::Timestamp deadline,
108                           experimental::EventEngine::Closure* closure) {
109   bool is_first_timer = false;
110   Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)];
111   timer->closure = closure;
112   timer->deadline = deadline.milliseconds_after_process_epoch();
113 
114 #ifndef NDEBUG
115   timer->hash_table_next = nullptr;
116 #endif
117 
118   {
119     grpc_core::MutexLock lock(&shard->mu);
120     timer->pending = true;
121     grpc_core::Timestamp now = host_->Now();
122     if (deadline <= now) {
123       deadline = now;
124     }
125 
126     shard->stats.AddSample((deadline - now).millis() / 1000.0);
127 
128     if (deadline < shard->queue_deadline_cap) {
129       is_first_timer = shard->heap.Add(timer);
130     } else {
131       timer->heap_index = kInvalidHeapIndex;
132       ListJoin(&shard->list, timer);
133     }
134   }
135 
136   // Deadline may have decreased, we need to adjust the main queue.  Note
137   // that there is a potential racy unlocked region here.  There could be a
138   // reordering of multiple TimerInit calls, at this point, but the < test
139   // below should ensure that we err on the side of caution.  There could
140   // also be a race with TimerCheck, which might beat us to the lock.  In
141   // that case, it is possible that the timer that we added will have already
142   // run by the time we hold the lock, but that too is a safe error.
143   // Finally, it's possible that the TimerCheck that intervened failed to
144   // trigger the new timer because the min_deadline hadn't yet been reduced.
145   // In that case, the timer will simply have to wait for the next
146   // TimerCheck.
147   if (is_first_timer) {
148     grpc_core::MutexLock lock(&mu_);
149     if (deadline < shard->min_deadline) {
150       grpc_core::Timestamp old_min_deadline = shard_queue_[0]->min_deadline;
151       shard->min_deadline = deadline;
152       NoteDeadlineChange(shard);
153       if (shard->shard_queue_index == 0 && deadline < old_min_deadline) {
154         min_timer_.store(deadline.milliseconds_after_process_epoch(),
155                          std::memory_order_relaxed);
156         host_->Kick();
157       }
158     }
159   }
160 }
161 
TimerCancel(Timer * timer)162 bool TimerList::TimerCancel(Timer* timer) {
163   Shard* shard = &shards_[grpc_core::HashPointer(timer, num_shards_)];
164   grpc_core::MutexLock lock(&shard->mu);
165 
166   if (timer->pending) {
167     timer->pending = false;
168     if (timer->heap_index == kInvalidHeapIndex) {
169       ListRemove(timer);
170     } else {
171       shard->heap.Remove(timer);
172     }
173     return true;
174   }
175 
176   return false;
177 }
178 
179 // Rebalances the timer shard by computing a new 'queue_deadline_cap' and moving
180 // all relevant timers in shard->list (i.e timers with deadlines earlier than
181 // 'queue_deadline_cap') into into shard->heap.
182 // Returns 'true' if shard->heap has at least ONE element
RefillHeap(grpc_core::Timestamp now)183 bool TimerList::Shard::RefillHeap(grpc_core::Timestamp now) {
184   // Compute the new queue window width and bound by the limits:
185   double computed_deadline_delta = stats.UpdateAverage() * kAddDeadlineScale;
186   double deadline_delta =
187       grpc_core::Clamp(computed_deadline_delta, kMinQueueWindowDuration,
188                        kMaxQueueWindowDuration);
189   Timer *timer, *next;
190 
191   // Compute the new cap and put all timers under it into the queue:
192   queue_deadline_cap = std::max(now, queue_deadline_cap) +
193                        grpc_core::Duration::FromSecondsAsDouble(deadline_delta);
194 
195   for (timer = list.next; timer != &list; timer = next) {
196     next = timer->next;
197     auto timer_deadline =
198         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
199             timer->deadline);
200 
201     if (timer_deadline < queue_deadline_cap) {
202       ListRemove(timer);
203       heap.Add(timer);
204     }
205   }
206   return !heap.is_empty();
207 }
208 
209 // This pops the next non-cancelled timer with deadline <= now from the
210 // queue, or returns NULL if there isn't one.
PopOne(grpc_core::Timestamp now)211 Timer* TimerList::Shard::PopOne(grpc_core::Timestamp now) {
212   Timer* timer;
213   for (;;) {
214     if (heap.is_empty()) {
215       if (now < queue_deadline_cap) return nullptr;
216       if (!RefillHeap(now)) return nullptr;
217     }
218     timer = heap.Top();
219     auto timer_deadline =
220         grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
221             timer->deadline);
222     if (timer_deadline > now) return nullptr;
223     timer->pending = false;
224     heap.Pop();
225     return timer;
226   }
227 }
228 
PopTimers(grpc_core::Timestamp now,grpc_core::Timestamp * new_min_deadline,std::vector<experimental::EventEngine::Closure * > * out)229 void TimerList::Shard::PopTimers(
230     grpc_core::Timestamp now, grpc_core::Timestamp* new_min_deadline,
231     std::vector<experimental::EventEngine::Closure*>* out) {
232   grpc_core::MutexLock lock(&mu);
233   while (Timer* timer = PopOne(now)) {
234     out->push_back(timer->closure);
235   }
236   *new_min_deadline = ComputeMinDeadline();
237 }
238 
FindExpiredTimers(grpc_core::Timestamp now,grpc_core::Timestamp * next)239 std::vector<experimental::EventEngine::Closure*> TimerList::FindExpiredTimers(
240     grpc_core::Timestamp now, grpc_core::Timestamp* next) {
241   grpc_core::Timestamp min_timer =
242       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
243           min_timer_.load(std::memory_order_relaxed));
244 
245   std::vector<experimental::EventEngine::Closure*> done;
246   if (now < min_timer) {
247     if (next != nullptr) *next = std::min(*next, min_timer);
248     return done;
249   }
250 
251   grpc_core::MutexLock lock(&mu_);
252 
253   while (shard_queue_[0]->min_deadline < now ||
254          (now != grpc_core::Timestamp::InfFuture() &&
255           shard_queue_[0]->min_deadline == now)) {
256     grpc_core::Timestamp new_min_deadline;
257 
258     // For efficiency, we pop as many available timers as we can from the
259     // shard.  This may violate perfect timer deadline ordering, but that
260     // shouldn't be a big deal because we don't make ordering guarantees.
261     shard_queue_[0]->PopTimers(now, &new_min_deadline, &done);
262 
263     // An TimerInit() on the shard could intervene here, adding a new
264     // timer that is earlier than new_min_deadline.  However,
265     // TimerInit() will block on the mutex before it can call
266     // set_min_deadline, so this one will complete first and then the Addtimer
267     // will reduce the min_deadline (perhaps unnecessarily).
268     shard_queue_[0]->min_deadline = new_min_deadline;
269     NoteDeadlineChange(shard_queue_[0]);
270   }
271 
272   if (next) {
273     *next = std::min(*next, shard_queue_[0]->min_deadline);
274   }
275 
276   min_timer_.store(
277       shard_queue_[0]->min_deadline.milliseconds_after_process_epoch(),
278       std::memory_order_relaxed);
279 
280   return done;
281 }
282 
283 absl::optional<std::vector<experimental::EventEngine::Closure*>>
TimerCheck(grpc_core::Timestamp * next)284 TimerList::TimerCheck(grpc_core::Timestamp* next) {
285   // prelude
286   grpc_core::Timestamp now = host_->Now();
287 
288   // fetch from a thread-local first: this avoids contention on a globally
289   // mutable cacheline in the common case
290   grpc_core::Timestamp min_timer =
291       grpc_core::Timestamp::FromMillisecondsAfterProcessEpoch(
292           min_timer_.load(std::memory_order_relaxed));
293 
294   if (now < min_timer) {
295     if (next != nullptr) {
296       *next = std::min(*next, min_timer);
297     }
298     return std::vector<experimental::EventEngine::Closure*>();
299   }
300 
301   if (!checker_mu_.TryLock()) return absl::nullopt;
302   std::vector<experimental::EventEngine::Closure*> run =
303       FindExpiredTimers(now, next);
304   checker_mu_.Unlock();
305 
306   return std::move(run);
307 }
308 
309 }  // namespace experimental
310 }  // namespace grpc_event_engine
311