• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/event_engine/event_engine.h>
16 
17 #include <algorithm>
18 #include <atomic>
19 #include <chrono>
20 #include <cstdint>
21 #include <memory>
22 #include <random>
23 #include <thread>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/functional/any_invocable.h"
29 #include "absl/functional/bind_front.h"
30 #include "absl/functional/function_ref.h"
31 #include "absl/log/log.h"
32 #include "absl/time/clock.h"
33 #include "absl/time/time.h"
34 #include "gmock/gmock.h"
35 #include "gtest/gtest.h"
36 #include "src/core/lib/event_engine/time_util.h"
37 #include "src/core/util/sync.h"
38 #include "test/core/event_engine/test_suite/event_engine_test_framework.h"
39 
40 using ::testing::ElementsAre;
41 using namespace std::chrono_literals;
42 
43 namespace grpc_event_engine {
44 namespace experimental {
45 
InitTimerTests()46 void InitTimerTests() {}
47 
48 }  // namespace experimental
49 }  // namespace grpc_event_engine
50 
51 class EventEngineTimerTest : public EventEngineTest {
52  public:
53   void ScheduleCheckCB(std::chrono::steady_clock::time_point when,
54                        std::atomic<int>* call_count,
55                        std::atomic<int>* fail_count, int total_expected);
56 
57  protected:
WaitForSignalled(absl::Duration timeout)58   void WaitForSignalled(absl::Duration timeout)
59       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
60     absl::Time deadline = absl::Now() + timeout;
61     while (!signaled_) {
62       timeout = deadline - absl::Now();
63       ASSERT_GT(timeout, absl::ZeroDuration());
64       cv_.WaitWithTimeout(&mu_, timeout);
65     }
66   }
67 
68   grpc_core::Mutex mu_;
69   grpc_core::CondVar cv_;
70   bool signaled_ ABSL_GUARDED_BY(mu_) = false;
71 };
72 
TEST_F(EventEngineTimerTest,ImmediateCallbackIsExecutedQuickly)73 TEST_F(EventEngineTimerTest, ImmediateCallbackIsExecutedQuickly) {
74   auto engine = this->NewEventEngine();
75   grpc_core::MutexLock lock(&mu_);
76   engine->RunAfter(0ms, [this]() {
77     grpc_core::MutexLock lock(&mu_);
78     signaled_ = true;
79     cv_.Signal();
80   });
81   WaitForSignalled(absl::Seconds(5));
82 }
83 
TEST_F(EventEngineTimerTest,SupportsCancellation)84 TEST_F(EventEngineTimerTest, SupportsCancellation) {
85   auto engine = this->NewEventEngine();
86   auto handle = engine->RunAfter(24h, []() {});
87   ASSERT_TRUE(engine->Cancel(handle));
88 }
89 
TEST_F(EventEngineTimerTest,CancelledCallbackIsNotExecuted)90 TEST_F(EventEngineTimerTest, CancelledCallbackIsNotExecuted) {
91   {
92     auto engine = this->NewEventEngine();
93     auto handle = engine->RunAfter(24h, [this]() {
94       grpc_core::MutexLock lock(&mu_);
95       signaled_ = true;
96     });
97     ASSERT_TRUE(engine->Cancel(handle));
98   }
99   // The engine is deleted, and all closures should have been flushed
100   grpc_core::MutexLock lock(&mu_);
101   ASSERT_FALSE(signaled_);
102 }
103 
TEST_F(EventEngineTimerTest,TimersRespectScheduleOrdering)104 TEST_F(EventEngineTimerTest, TimersRespectScheduleOrdering) {
105   // Note: this is a brittle test if the first call to `RunAfter` takes longer
106   // than the second callback's wait time.
107   std::vector<uint8_t> ordered;
108   uint8_t count = 0;
109   grpc_core::MutexLock lock(&mu_);
110   {
111     auto engine = this->NewEventEngine();
112     engine->RunAfter(3000ms, [&]() {
113       grpc_core::MutexLock lock(&mu_);
114       ordered.push_back(2);
115       ++count;
116       cv_.Signal();
117     });
118     engine->RunAfter(0ms, [&]() {
119       grpc_core::MutexLock lock(&mu_);
120       ordered.push_back(1);
121       ++count;
122       cv_.Signal();
123     });
124     // Ensure both callbacks have run.
125     while (count != 2) {
126       cv_.WaitWithTimeout(&mu_, absl::Milliseconds(8));
127     }
128   }
129   // The engine is deleted, and all closures should have been flushed beforehand
130   ASSERT_THAT(ordered, ElementsAre(1, 2));
131 }
132 
TEST_F(EventEngineTimerTest,CancellingExecutedCallbackIsNoopAndReturnsFalse)133 TEST_F(EventEngineTimerTest, CancellingExecutedCallbackIsNoopAndReturnsFalse) {
134   auto engine = this->NewEventEngine();
135   grpc_core::MutexLock lock(&mu_);
136   auto handle = engine->RunAfter(0ms, [this]() {
137     grpc_core::MutexLock lock(&mu_);
138     signaled_ = true;
139     cv_.Signal();
140   });
141   WaitForSignalled(absl::Seconds(10));
142   // The callback has run, and now we'll try to cancel it.
143   ASSERT_FALSE(engine->Cancel(handle));
144 }
145 
ScheduleCheckCB(std::chrono::steady_clock::time_point when,std::atomic<int> * call_count,std::atomic<int> * fail_count,int total_expected)146 void EventEngineTimerTest::ScheduleCheckCB(
147     std::chrono::steady_clock::time_point when, std::atomic<int>* call_count,
148     std::atomic<int>* fail_count, int total_expected) {
149   auto now = std::chrono::steady_clock::now();
150   EXPECT_LE(when, now) << "Callback was run "
151                        << grpc_event_engine::experimental::Milliseconds(when -
152                                                                         now)
153                        << " ms too early: ";
154   if (when > now) ++(*fail_count);
155   if (++(*call_count) == total_expected) {
156     grpc_core::MutexLock lock(&mu_);
157     signaled_ = true;
158     cv_.Signal();
159   }
160 }
161 
TEST_F(EventEngineTimerTest,StressTestTimersNotCalledBeforeScheduled)162 TEST_F(EventEngineTimerTest, StressTestTimersNotCalledBeforeScheduled) {
163   auto engine = this->NewEventEngine();
164   constexpr int thread_count = 10;
165   constexpr int call_count_per_thread = 100;
166   constexpr float timeout_min_seconds = 1;
167   constexpr float timeout_max_seconds = 10;
168   std::atomic<int> call_count{0};
169   std::atomic<int> failed_call_count{0};
170   std::vector<std::thread> threads;
171   threads.reserve(thread_count);
172   for (int thread_n = 0; thread_n < thread_count; ++thread_n) {
173     threads.emplace_back([&]() {
174       std::random_device rd;
175       std::mt19937 gen(rd());
176       std::uniform_real_distribution<> dis(timeout_min_seconds,
177                                            timeout_max_seconds);
178       for (int call_n = 0; call_n < call_count_per_thread; ++call_n) {
179         const auto dur = static_cast<int64_t>(1e9 * dis(gen));
180         auto deadline =
181             std::chrono::steady_clock::now() + std::chrono::nanoseconds(dur);
182         engine->RunAfter(
183             std::chrono::nanoseconds(dur),
184             absl::bind_front(&EventEngineTimerTest::ScheduleCheckCB, this,
185                              deadline, &call_count, &failed_call_count,
186                              thread_count * call_count_per_thread));
187       }
188     });
189   }
190   for (auto& t : threads) {
191     t.join();
192   }
193   grpc_core::MutexLock lock(&mu_);
194   // to protect against spurious wakeups.
195   while (!signaled_) {
196     cv_.Wait(&mu_);
197   }
198   if (failed_call_count.load() != 0) {
199     VLOG(2) << "failed timer count: " << failed_call_count.load() << " of "
200             << (thread_count * call_count);
201   }
202   ASSERT_EQ(0, failed_call_count.load());
203 }
204 
205 // Common implementation for the Run and RunAfter test variants below
206 // Calls run_fn multiple times, and will get stuck if the implementation does a
207 // blocking inline execution of the closure. This test will timeout on failure.
ImmediateRunTestInternal(absl::FunctionRef<void (absl::AnyInvocable<void ()>)> run_fn,grpc_core::Mutex & mu,grpc_core::CondVar & cv)208 void ImmediateRunTestInternal(
209     absl::FunctionRef<void(absl::AnyInvocable<void()>)> run_fn,
210     grpc_core::Mutex& mu, grpc_core::CondVar& cv) {
211   constexpr int num_concurrent_runs = 32;
212   constexpr int num_iterations = 100;
213   constexpr absl::Duration run_timeout = absl::Seconds(60);
214   std::atomic<int> waiters{0};
215   std::atomic<int> execution_count{0};
216   auto cb = [&mu, &cv, &run_timeout, &waiters, &execution_count]() {
217     waiters.fetch_add(1);
218     grpc_core::MutexLock lock(&mu);
219     EXPECT_FALSE(cv.WaitWithTimeout(&mu, run_timeout))
220         << "callback timed out waiting.";
221     execution_count.fetch_add(1);
222   };
223   for (int i = 0; i < num_iterations; i++) {
224     waiters.store(0);
225     execution_count.store(0);
226     for (int run = 0; run < num_concurrent_runs; run++) {
227       run_fn(cb);
228     }
229     while (waiters.load() != num_concurrent_runs) {
230       absl::SleepFor(absl::Milliseconds(33));
231     }
232     cv.SignalAll();
233     while (execution_count.load() != num_concurrent_runs) {
234       absl::SleepFor(absl::Milliseconds(33));
235     }
236   }
237 }
238 
239 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
240 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread)241 TEST_F(EventEngineTimerTest,
242        DISABLED_RunDoesNotImmediatelyExecuteInTheSameThread) {
243   auto engine = this->NewEventEngine();
244   ImmediateRunTestInternal(
245       [&engine](absl::AnyInvocable<void()> cb) { engine->Run(std::move(cb)); },
246       mu_, cv_);
247 }
248 
249 // TODO(hork): re-enabled after either I've implemented XFAIL, or fixed the
250 // ThreadPool's behavior under backlog.
TEST_F(EventEngineTimerTest,DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread)251 TEST_F(EventEngineTimerTest,
252        DISABLED_RunAfterDoesNotImmediatelyExecuteInTheSameThread) {
253   auto engine = this->NewEventEngine();
254   ImmediateRunTestInternal(
255       [&engine](absl::AnyInvocable<void()> cb) {
256         engine->RunAfter(std::chrono::seconds(0), std::move(cb));
257       },
258       mu_, cv_);
259 }
260