1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <algorithm>
16 #include <memory>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20
21 #include <benchmark/benchmark.h>
22
23 #include "absl/functional/any_invocable.h"
24 #include "absl/status/status.h"
25 #include "absl/time/time.h"
26 #include "gtest/gtest.h"
27
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/grpc.h>
30
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
33 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
34 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
35 #include "src/core/lib/gprpp/sync.h"
36
37 using ::grpc_event_engine::experimental::EventEngine;
38 using ::grpc_event_engine::experimental::Scheduler;
39
40 namespace {
41 class TestScheduler : public Scheduler {
42 public:
TestScheduler(std::shared_ptr<EventEngine> engine)43 explicit TestScheduler(std::shared_ptr<EventEngine> engine)
44 : engine_(std::move(engine)) {}
Run(grpc_event_engine::experimental::EventEngine::Closure * closure)45 void Run(
46 grpc_event_engine::experimental::EventEngine::Closure* closure) override {
47 engine_->Run(closure);
48 }
49
Run(absl::AnyInvocable<void ()> cb)50 void Run(absl::AnyInvocable<void()> cb) override {
51 engine_->Run(std::move(cb));
52 }
53
54 private:
55 std::shared_ptr<EventEngine> engine_;
56 };
57
58 TestScheduler* g_scheduler;
59
60 } // namespace
61
62 namespace grpc_event_engine {
63 namespace experimental {
64
TEST(LockFreeEventTest,BasicTest)65 TEST(LockFreeEventTest, BasicTest) {
66 LockfreeEvent event(g_scheduler);
67 grpc_core::Mutex mu;
68 grpc_core::CondVar cv;
69 event.InitEvent();
70 grpc_core::MutexLock lock(&mu);
71 // Set NotifyOn first and then SetReady
72 event.NotifyOn(
73 PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
74 grpc_core::MutexLock lock(&mu);
75 EXPECT_TRUE(status.ok());
76 cv.Signal();
77 }));
78 event.SetReady();
79 EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
80
81 // SetReady first first and then call NotifyOn
82 event.SetReady();
83 event.NotifyOn(
84 PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
85 grpc_core::MutexLock lock(&mu);
86 EXPECT_TRUE(status.ok());
87 cv.Signal();
88 }));
89 EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
90
91 // Set NotifyOn and then call SetShutdown
92 event.NotifyOn(
93 PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
94 grpc_core::MutexLock lock(&mu);
95 EXPECT_FALSE(status.ok());
96 EXPECT_EQ(status, absl::CancelledError("Shutdown"));
97 cv.Signal();
98 }));
99 event.SetShutdown(absl::CancelledError("Shutdown"));
100 EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
101 event.DestroyEvent();
102 }
103
TEST(LockFreeEventTest,MultiThreadedTest)104 TEST(LockFreeEventTest, MultiThreadedTest) {
105 std::vector<std::thread> threads;
106 LockfreeEvent event(g_scheduler);
107 grpc_core::Mutex mu;
108 grpc_core::CondVar cv;
109 bool signalled = false;
110 int active = 0;
111 static constexpr int kNumOperations = 100;
112 threads.reserve(2);
113 event.InitEvent();
114 // Spin up two threads alternating between NotifyOn and SetReady
115 for (int i = 0; i < 2; i++) {
116 threads.emplace_back([&, thread_id = i]() {
117 for (int j = 0; j < kNumOperations; j++) {
118 grpc_core::MutexLock lock(&mu);
119 // Wait for both threads to process the previous operation before
120 // starting the next one.
121 while (signalled) {
122 cv.Wait(&mu);
123 }
124 active++;
125 if (thread_id == 0) {
126 event.NotifyOn(PosixEngineClosure::TestOnlyToClosure(
127 [&mu, &cv, &signalled](absl::Status status) {
128 grpc_core::MutexLock lock(&mu);
129 EXPECT_TRUE(status.ok());
130 signalled = true;
131 cv.SignalAll();
132 }));
133 } else {
134 event.SetReady();
135 }
136 while (!signalled) {
137 cv.Wait(&mu);
138 }
139 // The last thread to finish the current operation sets signalled to
140 // false and wakes up the other thread if its blocked waiting to
141 // start the next operation.
142 if (--active == 0) {
143 signalled = false;
144 cv.Signal();
145 }
146 }
147 });
148 }
149 for (auto& t : threads) {
150 t.join();
151 }
152 event.SetShutdown(absl::OkStatus());
153 event.DestroyEvent();
154 }
155
156 namespace {
157
158 // A trivial callback sceduler which inherits from the Scheduler interface but
159 // immediatey runs the callback/closure.
160 class BechmarkCallbackScheduler : public Scheduler {
161 public:
162 BechmarkCallbackScheduler() = default;
Run(grpc_event_engine::experimental::EventEngine::Closure * closure)163 void Run(
164 grpc_event_engine::experimental::EventEngine::Closure* closure) override {
165 closure->Run();
166 }
167
Run(absl::AnyInvocable<void ()> cb)168 void Run(absl::AnyInvocable<void()> cb) override { cb(); }
169 };
170
171 // A benchmark which repeatedly registers a NotifyOn callback and invokes the
172 // callback with SetReady. This benchmark is intended to measure the cost of
173 // NotifyOn and SetReady implementations of the lock free event.
BM_LockFreeEvent(benchmark::State & state)174 void BM_LockFreeEvent(benchmark::State& state) {
175 BechmarkCallbackScheduler cb_scheduler;
176 LockfreeEvent event(&cb_scheduler);
177 event.InitEvent();
178 PosixEngineClosure* notify_on_closure =
179 PosixEngineClosure::ToPermanentClosure([](absl::Status /*status*/) {});
180 for (auto s : state) {
181 event.NotifyOn(notify_on_closure);
182 event.SetReady();
183 }
184 event.SetShutdown(absl::CancelledError("Shutting down"));
185 delete notify_on_closure;
186 event.DestroyEvent();
187 }
188 BENCHMARK(BM_LockFreeEvent)->ThreadRange(1, 64);
189
190 } // namespace
191
192 } // namespace experimental
193 } // namespace grpc_event_engine
194
195 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
196 // and others do not. This allows us to support both modes.
197 namespace benchmark {
RunTheBenchmarksNamespaced()198 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
199 } // namespace benchmark
200
main(int argc,char ** argv)201 int main(int argc, char** argv) {
202 ::testing::InitGoogleTest(&argc, argv);
203 benchmark::Initialize(&argc, argv);
204 // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
205 // until we clear out the iomgr shutdown code.
206 grpc_init();
207 g_scheduler = new TestScheduler(
208 grpc_event_engine::experimental::GetDefaultEventEngine());
209 int r = RUN_ALL_TESTS();
210 benchmark::RunTheBenchmarksNamespaced();
211 grpc_shutdown();
212 return r;
213 }
214