• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <algorithm>
16 #include <memory>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20 
21 #include <benchmark/benchmark.h>
22 
23 #include "absl/functional/any_invocable.h"
24 #include "absl/status/status.h"
25 #include "absl/time/time.h"
26 #include "gtest/gtest.h"
27 
28 #include <grpc/event_engine/event_engine.h>
29 #include <grpc/grpc.h>
30 
31 #include "src/core/lib/event_engine/default_event_engine.h"
32 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
33 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
34 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
35 #include "src/core/lib/gprpp/sync.h"
36 
37 using ::grpc_event_engine::experimental::EventEngine;
38 using ::grpc_event_engine::experimental::Scheduler;
39 
40 namespace {
41 class TestScheduler : public Scheduler {
42  public:
TestScheduler(std::shared_ptr<EventEngine> engine)43   explicit TestScheduler(std::shared_ptr<EventEngine> engine)
44       : engine_(std::move(engine)) {}
Run(grpc_event_engine::experimental::EventEngine::Closure * closure)45   void Run(
46       grpc_event_engine::experimental::EventEngine::Closure* closure) override {
47     engine_->Run(closure);
48   }
49 
Run(absl::AnyInvocable<void ()> cb)50   void Run(absl::AnyInvocable<void()> cb) override {
51     engine_->Run(std::move(cb));
52   }
53 
54  private:
55   std::shared_ptr<EventEngine> engine_;
56 };
57 
58 TestScheduler* g_scheduler;
59 
60 }  // namespace
61 
62 namespace grpc_event_engine {
63 namespace experimental {
64 
TEST(LockFreeEventTest,BasicTest)65 TEST(LockFreeEventTest, BasicTest) {
66   LockfreeEvent event(g_scheduler);
67   grpc_core::Mutex mu;
68   grpc_core::CondVar cv;
69   event.InitEvent();
70   grpc_core::MutexLock lock(&mu);
71   // Set NotifyOn first and then SetReady
72   event.NotifyOn(
73       PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
74         grpc_core::MutexLock lock(&mu);
75         EXPECT_TRUE(status.ok());
76         cv.Signal();
77       }));
78   event.SetReady();
79   EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
80 
81   // SetReady first first and then call NotifyOn
82   event.SetReady();
83   event.NotifyOn(
84       PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
85         grpc_core::MutexLock lock(&mu);
86         EXPECT_TRUE(status.ok());
87         cv.Signal();
88       }));
89   EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
90 
91   // Set NotifyOn and then call SetShutdown
92   event.NotifyOn(
93       PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
94         grpc_core::MutexLock lock(&mu);
95         EXPECT_FALSE(status.ok());
96         EXPECT_EQ(status, absl::CancelledError("Shutdown"));
97         cv.Signal();
98       }));
99   event.SetShutdown(absl::CancelledError("Shutdown"));
100   EXPECT_FALSE(cv.WaitWithTimeout(&mu, absl::Seconds(10)));
101   event.DestroyEvent();
102 }
103 
TEST(LockFreeEventTest,MultiThreadedTest)104 TEST(LockFreeEventTest, MultiThreadedTest) {
105   std::vector<std::thread> threads;
106   LockfreeEvent event(g_scheduler);
107   grpc_core::Mutex mu;
108   grpc_core::CondVar cv;
109   bool signalled = false;
110   int active = 0;
111   static constexpr int kNumOperations = 100;
112   threads.reserve(2);
113   event.InitEvent();
114   // Spin up two threads alternating between NotifyOn and SetReady
115   for (int i = 0; i < 2; i++) {
116     threads.emplace_back([&, thread_id = i]() {
117       for (int j = 0; j < kNumOperations; j++) {
118         grpc_core::MutexLock lock(&mu);
119         // Wait for both threads to process the previous operation before
120         // starting the next one.
121         while (signalled) {
122           cv.Wait(&mu);
123         }
124         active++;
125         if (thread_id == 0) {
126           event.NotifyOn(PosixEngineClosure::TestOnlyToClosure(
127               [&mu, &cv, &signalled](absl::Status status) {
128                 grpc_core::MutexLock lock(&mu);
129                 EXPECT_TRUE(status.ok());
130                 signalled = true;
131                 cv.SignalAll();
132               }));
133         } else {
134           event.SetReady();
135         }
136         while (!signalled) {
137           cv.Wait(&mu);
138         }
139         // The last thread to finish the current operation sets signalled to
140         // false and wakes up the other thread if its blocked waiting to
141         // start the next operation.
142         if (--active == 0) {
143           signalled = false;
144           cv.Signal();
145         }
146       }
147     });
148   }
149   for (auto& t : threads) {
150     t.join();
151   }
152   event.SetShutdown(absl::OkStatus());
153   event.DestroyEvent();
154 }
155 
156 namespace {
157 
158 // A trivial callback sceduler which inherits from the Scheduler interface but
159 // immediatey runs the callback/closure.
160 class BechmarkCallbackScheduler : public Scheduler {
161  public:
162   BechmarkCallbackScheduler() = default;
Run(grpc_event_engine::experimental::EventEngine::Closure * closure)163   void Run(
164       grpc_event_engine::experimental::EventEngine::Closure* closure) override {
165     closure->Run();
166   }
167 
Run(absl::AnyInvocable<void ()> cb)168   void Run(absl::AnyInvocable<void()> cb) override { cb(); }
169 };
170 
171 // A benchmark which repeatedly registers a NotifyOn callback and invokes the
172 // callback with SetReady. This benchmark is intended to measure the cost of
173 // NotifyOn and SetReady implementations of the lock free event.
BM_LockFreeEvent(benchmark::State & state)174 void BM_LockFreeEvent(benchmark::State& state) {
175   BechmarkCallbackScheduler cb_scheduler;
176   LockfreeEvent event(&cb_scheduler);
177   event.InitEvent();
178   PosixEngineClosure* notify_on_closure =
179       PosixEngineClosure::ToPermanentClosure([](absl::Status /*status*/) {});
180   for (auto s : state) {
181     event.NotifyOn(notify_on_closure);
182     event.SetReady();
183   }
184   event.SetShutdown(absl::CancelledError("Shutting down"));
185   delete notify_on_closure;
186   event.DestroyEvent();
187 }
188 BENCHMARK(BM_LockFreeEvent)->ThreadRange(1, 64);
189 
190 }  // namespace
191 
192 }  // namespace experimental
193 }  // namespace grpc_event_engine
194 
195 // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
196 // and others do not. This allows us to support both modes.
197 namespace benchmark {
RunTheBenchmarksNamespaced()198 void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
199 }  // namespace benchmark
200 
main(int argc,char ** argv)201 int main(int argc, char** argv) {
202   ::testing::InitGoogleTest(&argc, argv);
203   benchmark::Initialize(&argc, argv);
204   // TODO(ctiller): EventEngine temporarily needs grpc to be initialized first
205   // until we clear out the iomgr shutdown code.
206   grpc_init();
207   g_scheduler = new TestScheduler(
208       grpc_event_engine::experimental::GetDefaultEventEngine());
209   int r = RUN_ALL_TESTS();
210   benchmark::RunTheBenchmarksNamespaced();
211   grpc_shutdown();
212   return r;
213 }
214