1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/event_engine/event_engine.h>
16
17 #include <algorithm>
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <memory>
22 #include <random>
23 #include <thread>
24 #include <utility>
25 #include <vector>
26
27 #include "absl/base/thread_annotations.h"
28 #include "absl/functional/any_invocable.h"
29 #include "absl/functional/bind_front.h"
30 #include "absl/functional/function_ref.h"
31 #include "absl/log/log.h"
32 #include "absl/time/clock.h"
33 #include "absl/time/time.h"
34 #include "gmock/gmock.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/event_engine/time_util.h"
37 #include "src/core/util/sync.h"
38 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
39
40 using ::testing::ElementsAre;
41 using namespace std::chrono_literals;
42
43 namespace grpc_event_engine {
44 namespace experimental {
45
InitTimerTests()46 void InitTimerTests() {}
47
48 } // namespace experimental
49 } // namespace grpc_event_engine
50
51 class EventEngineTimerTest : public EventEngineTest {
52 public:
53 void ScheduleCheckCB(std::chrono::steady_clock::time_point when,
54 std::atomic<int>* call_count,
55 std::atomic<int>* fail_count, int total_expected);
56
57 protected:
WaitForSignalled(absl::Duration timeout)58 void WaitForSignalled(absl::Duration timeout)
59 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
60 absl::Time deadline = absl::Now() + timeout;
61 while (!signaled_) {
62 timeout = deadline - absl::Now();
63 ASSERT_GT(timeout, absl::ZeroDuration());
64 cv_.WaitWithTimeout(&mu_, timeout);
65 }
66 }
67
68 grpc_core::Mutex mu_;
69 grpc_core::CondVar cv_;
70 bool signaled_ ABSL_GUARDED_BY(mu_) = false;
71 };
72
TEST_F(EventEngineTimerTest,ImmediateCallbackIsExecutedQuickly)73 TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
74 auto engine = this->NewEventEngine();
75 grpc_core::MutexLock lock(&mu_);
76 engine->RunAfter(0ms, [this]() {
77 grpc_core::MutexLock lock(&mu_);
78 signaled_ = true;
79 cv_.Signal();
80 });
81 WaitForSignalled(absl::Seconds(5));
82 }
83
TEST_F(EventEngineTimerTest,SupportsCancellation)84 TEST_F(EventEngineTimerTest, SupportsCancellation) {
85 auto engine = this->NewEventEngine();
86 auto handle = engine->RunAfter(24h, []() {});
87 ASSERT_TRUE(engine->Cancel(handle));
88 }
89
TEST_F(EventEngineTimerTest,CancelledCallbackIsNotExecuted)90 TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
91 {
92 auto engine = this->NewEventEngine();
93 auto handle = engine->RunAfter(24h, [this]() {
94 grpc_core::MutexLock lock(&mu_);
95 signaled_ = true;
96 });
97 ASSERT_TRUE(engine->Cancel(handle));
98 }
99 // The engine is deleted, and all closures should have been flushed
100 grpc_core::MutexLock lock(&mu_);
101 ASSERT_FALSE(signaled_);
102 }
103
TEST_F(EventEngineTimerTest,TimersRespectScheduleOrdering)104 TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
105 // Note: this is a brittle test if the first call to `RunAfter` takes longer
106 // than the second callback's wait time.
107 std::vector<uint8_t> ordered;
108 uint8_t count = 0;
109 grpc_core::MutexLock lock(&mu_);
110 {
111 auto engine = this->NewEventEngine();
112 engine->RunAfter(3000ms, [&]() {
113 grpc_core::MutexLock lock(&mu_);
114 ordered.push_back(2);
115 ++count;
116 cv_.Signal();
117 });
118 engine->RunAfter(0ms, [&]() {
119 grpc_core::MutexLock lock(&mu_);
120 ordered.push_back(1);
121 ++count;
122 cv_.Signal();
123 });
124 // Ensure both callbacks have run.
125 while (count != 2) {
126 cv_.WaitWithTimeout(&mu_, absl::Milliseconds(8));
127 }
128 }
129 // The engine is deleted, and all closures should have been flushed beforehand
130 ASSERT_THAT(ordered, ElementsAre(1, 2));
131 }
132
TEST_F(EventEngineTimerTest,CancellingExecutedCallbackIsNoopAndReturnsFalse)133 TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
134 auto engine = this->NewEventEngine();
135 grpc_core::MutexLock lock(&mu_);
136 auto handle = engine->RunAfter(0ms, [this]() {
137 grpc_core::MutexLock lock(&mu_);
138 signaled_ = true;
139 cv_.Signal();
140 });
141 WaitForSignalled(absl::Seconds(10));
142 // The callback has run, and now we'll try to cancel it.
143 ASSERT_FALSE(engine->Cancel(handle));
144 }
145
ScheduleCheckCB(std::chrono::steady_clock::time_point when,std::atomic<int> * call_count,std::atomic<int> * fail_count,int total_expected)146 void EventEngineTimerTest::ScheduleCheckCB(
147 std::chrono::steady_clock::time_point when, std::atomic<int>* call_count,
148 std::atomic<int>* fail_count, int total_expected) {
149 auto now = std::chrono::steady_clock::now();
150 EXPECT_LE(when, now) << "Callback was run "
151 << grpc_event_engine::experimental::Milliseconds(when -
152 now)
153 << " ms too early: ";
154 if (when > now) ++(*fail_count);
155 if (++(*call_count) == total_expected) {
156 grpc_core::MutexLock lock(&mu_);
157 signaled_ = true;
158 cv_.Signal();
159 }
160 }
161
TEST_F(EventEngineTimerTest,StressTestTimersNotCalledBeforeScheduled)162 TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
163 auto engine = this->NewEventEngine();
164 constexpr int thread_count = 10;
165 constexpr int call_count_per_thread = 100;
166 constexpr float timeout_min_seconds = 1;
167 constexpr float timeout_max_seconds = 10;
168 std::atomic<int> call_count{0};
169 std::atomic<int> failed_call_count{0};
170 std::vector<std::thread> threads;
171 threads.reserve(thread_count);
172 for (int thread_n = 0; thread_n < thread_count; ++thread_n) {
173 threads.emplace_back([&]() {
174 std::random_device rd;
175 std::mt19937 gen(rd());
176 std::uniform_real_distribution<> dis(timeout_min_seconds,
177 timeout_max_seconds);
178 for (int call_n = 0; call_n < call_count_per_thread; ++call_n) {
179 const auto dur = static_cast<int64_t>(1e9 * dis(gen));
180 auto deadline =
181 std::chrono::steady_clock::now() + std::chrono::nanoseconds(dur);
182 engine->RunAfter(
183 std::chrono::nanoseconds(dur),
184 absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
185 deadline, &call_count, &failed_call_count,
186 thread_count * call_count_per_thread));
187 }
188 });
189 }
190 for (auto& t : threads) {
191 t.join();
192 }
193 grpc_core::MutexLock lock(&mu_);
194 // to protect against spurious wakeups.
195 while (!signaled_) {
196 cv_.Wait(&mu_);
197 }
198 if (failed_call_count.load() != 0) {
199 VLOG(2) << "failed timer count: " << failed_call_count.load() << " of "
200 << (thread_count * call_count);
201 }
202 ASSERT_EQ(0, failed_call_count.load());
203 }
204
205 // Common implementation for the Run and RunAfter test variants below
206 // Calls run_fn multiple times, and will get stuck if the implementation does a
207 // blocking inline execution of the closure. This test will timeout on failure.
ImmediateRunTestInternal(absl::FunctionRef<void (absl::AnyInvocable<void ()>)> run_fn,grpc_core::Mutex & mu,grpc_core::CondVar & cv)208 void ImmediateRunTestInternal(
209 absl::FunctionRef<void(absl::AnyInvocable<void()>)> run_fn,
210 grpc_core::Mutex& mu, grpc_core::CondVar& cv) {
211 constexpr int num_concurrent_runs = 32;
212 constexpr int num_iterations = 100;
213 constexpr absl::Duration run_timeout = absl::Seconds(60);
214 std::atomic<int> waiters{0};
215 std::atomic<int> execution_count{0};
216 auto cb = [&mu, &cv, &run_timeout, &waiters, &execution_count]() {
217 waiters.fetch_add(1);
218 grpc_core::MutexLock lock(&mu);
219 EXPECT_FALSE(cv.WaitWithTimeout(&mu, run_timeout))
220 << "callback timed out waiting.";
221 execution_count.fetch_add(1);
222 };
223 for (int i = 0; i < num_iterations; i++) {
224 waiters.store(0);
225 execution_count.store(0);
226 for (int run = 0; run < num_concurrent_runs; run++) {
227 run_fn(cb);
228 }
229 while (waiters.load() != num_concurrent_runs) {
230 absl::SleepFor(absl::Milliseconds(33));
231 }
232 cv.SignalAll();
233 while (execution_count.load() != num_concurrent_runs) {
234 absl::SleepFor(absl::Milliseconds(33));
235 }
236 }
237 }
238
239 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
240 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread)241 TEST_F(EventEngineTimerTest,
242 DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread) {
243 auto engine = this->NewEventEngine();
244 ImmediateRunTestInternal(
245 [&engine](absl::AnyInvocable<void()> cb) { engine->Run(std::move(cb)); },
246 mu_, cv_);
247 }
248
249 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
250 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread)251 TEST_F(EventEngineTimerTest,
252 DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread) {
253 auto engine = this->NewEventEngine();
254 ImmediateRunTestInternal(
255 [&engine](absl::AnyInvocable<void()> cb) {
256 engine->RunAfter(std::chrono::seconds(0), std::move(cb));
257 },
258 mu_, cv_);
259 }
260