• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
20 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
21 
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/support/port_platform.h>
24 #include <stddef.h>
25 
26 #include <atomic>
27 #include <cstdint>
28 #include <memory>
29 #include <vector>
30 
31 #include "absl/base/thread_annotations.h"
32 #include "absl/types/optional.h"
33 #include "src/core/lib/event_engine/posix_engine/timer_heap.h"
34 #include "src/core/util/sync.h"
35 #include "src/core/util/time.h"
36 #include "src/core/util/time_averaged_stats.h"
37 
38 namespace grpc_event_engine {
39 namespace experimental {
40 
41 struct Timer {
42   int64_t deadline;
43   // kInvalidHeapIndex if not in heap.
44   size_t heap_index;
45   bool pending;
46   struct Timer* next;
47   struct Timer* prev;
48   experimental::EventEngine::Closure* closure;
49 #ifndef NDEBUG
50   struct Timer* hash_table_next;
51 #endif
52 
53   grpc_event_engine::experimental::EventEngine::TaskHandle task_handle;
54 };
55 
56 // Dependency injection: allow tests and/or TimerManager to inject
57 // their own implementations of Now, Kick.
58 class TimerListHost {
59  public:
60   // Return the current timestamp.
61   // Abstracted so that tests can be run deterministically.
62   virtual grpc_core::Timestamp Now() = 0;
63   // Wake up a thread to check for timers.
64   virtual void Kick() = 0;
65 
66  protected:
67   ~TimerListHost() = default;
68 };
69 
70 class TimerList {
71  public:
72   explicit TimerList(TimerListHost* host);
73 
74   TimerList(const TimerList&) = delete;
75   TimerList& operator=(const TimerList&) = delete;
76 
77   // Initialize a Timer.
78   // When expired, the closure will be run. If the timer is canceled, the
79   // closure will not be run. Behavior is undefined for a deadline of
80   // grpc_core::Timestamp::InfFuture().
81   void TimerInit(Timer* timer, grpc_core::Timestamp deadline,
82                  experimental::EventEngine::Closure* closure);
83 
84   // Cancel a Timer.
85   // Returns false if the timer cannot be canceled. This will happen if the
86   // timer has already fired, or if its closure is currently running. The
87   // closure is guaranteed to run eventually if this method returns false.
88   // Otherwise, this returns true, and the closure will not be run.
89   GRPC_MUST_USE_RESULT bool TimerCancel(Timer* timer);
90 
91   // Check for timers to be run, and return them.
92   // Return nullopt if timers could not be checked due to contention with
93   // another thread checking.
94   // Return a vector of closures that *must* be run otherwise.
95   // If next is non-null, TRY to update *next with the next running timer
96   // IF that timer occurs before *next current value.
97   // *next is never guaranteed to be updated on any given execution; however,
98   // with high probability at least one thread in the system will see an update
99   // at any time slice.
100   absl::optional<std::vector<experimental::EventEngine::Closure*>> TimerCheck(
101       grpc_core::Timestamp* next);
102 
103  private:
104   // A "timer shard". Contains a 'heap' and a 'list' of timers. All timers with
105   // deadlines earlier than 'queue_deadline_cap' are maintained in the heap and
106   // others are maintained in the list (unordered). This helps to keep the
107   // number of elements in the heap low.
108   //
109   // The 'queue_deadline_cap' gets recomputed periodically based on the timer
110   // stats maintained in 'stats' and the relevant timers are then moved from the
111   // 'list' to 'heap'.
112   //
113   struct Shard {
114     Shard();
115 
116     grpc_core::Timestamp ComputeMinDeadline() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
117     bool RefillHeap(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
118     Timer* PopOne(grpc_core::Timestamp now) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu);
119     void PopTimers(grpc_core::Timestamp now,
120                    grpc_core::Timestamp* new_min_deadline,
121                    std::vector<experimental::EventEngine::Closure*>* out)
122         ABSL_LOCKS_EXCLUDED(mu);
123 
124     grpc_core::Mutex mu;
125     grpc_core::TimeAveragedStats stats ABSL_GUARDED_BY(mu);
126     // All and only timers with deadlines < this will be in the heap.
127     grpc_core::Timestamp queue_deadline_cap ABSL_GUARDED_BY(mu);
128     // The deadline of the next timer due in this shard.
129     grpc_core::Timestamp min_deadline ABSL_GUARDED_BY(&TimerList::mu_);
130     // Index of this timer_shard in the g_shard_queue.
131     uint32_t shard_queue_index ABSL_GUARDED_BY(&TimerList::mu_);
132     // This holds all timers with deadlines < queue_deadline_cap. Timers in this
133     // list have the top bit of their deadline set to 0.
134     TimerHeap heap ABSL_GUARDED_BY(mu);
135     // This holds timers whose deadline is >= queue_deadline_cap.
136     Timer list ABSL_GUARDED_BY(mu);
137   };
138 
139   void SwapAdjacentShardsInQueue(uint32_t first_shard_queue_index)
140       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
141   void NoteDeadlineChange(Shard* shard) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
142   std::vector<experimental::EventEngine::Closure*> FindExpiredTimers(
143       grpc_core::Timestamp now, grpc_core::Timestamp* next);
144 
145   TimerListHost* const host_;
146   const size_t num_shards_;
147   grpc_core::Mutex mu_;
148   // The deadline of the next timer due across all timer shards
149   std::atomic<uint64_t> min_timer_;
150   // Allow only one FindExpiredTimers at once (used as a TryLock, protects no
151   // fields but ensures limits on concurrency)
152   grpc_core::Mutex checker_mu_;
153   // Array of timer shards. Whenever a timer (Timer *) is added, its address
154   // is hashed to select the timer shard to add the timer to
155   const std::unique_ptr<Shard[]> shards_;
156   // Maintains a sorted list of timer shards (sorted by their min_deadline, i.e
157   // the deadline of the next timer in each shard).
158   const std::unique_ptr<Shard*[]> shard_queue_ ABSL_GUARDED_BY(mu_);
159 };
160 
161 }  // namespace experimental
162 }  // namespace grpc_event_engine
163 
164 #endif  // GRPC_SRC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_TIMER_H
165