1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
15
16 #include <thread>
17 #include <vector>
18
19 #include "absl/functional/any_invocable.h"
20 #include "gtest/gtest.h"
21
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/support/port_platform.h>
24
25 #include "src/core/lib/event_engine/common_closures.h"
26 #include "test/core/util/test_config.h"
27
28 // TODO(hork): parameterize these tests for other WorkQueue implementations.
29
30 namespace {
31 using ::grpc_event_engine::experimental::AnyInvocableClosure;
32 using ::grpc_event_engine::experimental::BasicWorkQueue;
33 using ::grpc_event_engine::experimental::EventEngine;
34
TEST(BasicWorkQueueTest,StartsEmpty)35 TEST(BasicWorkQueueTest, StartsEmpty) {
36 BasicWorkQueue queue;
37 ASSERT_TRUE(queue.Empty());
38 }
39
TEST(BasicWorkQueueTest,TakesClosures)40 TEST(BasicWorkQueueTest, TakesClosures) {
41 BasicWorkQueue queue;
42 bool ran = false;
43 AnyInvocableClosure closure([&ran] { ran = true; });
44 queue.Add(&closure);
45 ASSERT_FALSE(queue.Empty());
46 EventEngine::Closure* popped = queue.PopMostRecent();
47 ASSERT_NE(popped, nullptr);
48 popped->Run();
49 ASSERT_TRUE(ran);
50 ASSERT_TRUE(queue.Empty());
51 }
52
TEST(BasicWorkQueueTest,TakesAnyInvocables)53 TEST(BasicWorkQueueTest, TakesAnyInvocables) {
54 BasicWorkQueue queue;
55 bool ran = false;
56 queue.Add([&ran] { ran = true; });
57 ASSERT_FALSE(queue.Empty());
58 EventEngine::Closure* popped = queue.PopMostRecent();
59 ASSERT_NE(popped, nullptr);
60 popped->Run();
61 ASSERT_TRUE(ran);
62 ASSERT_TRUE(queue.Empty());
63 }
64
TEST(BasicWorkQueueTest,BecomesEmptyOnPopOldest)65 TEST(BasicWorkQueueTest, BecomesEmptyOnPopOldest) {
66 BasicWorkQueue queue;
67 bool ran = false;
68 queue.Add([&ran] { ran = true; });
69 ASSERT_FALSE(queue.Empty());
70 EventEngine::Closure* closure = queue.PopOldest();
71 ASSERT_NE(closure, nullptr);
72 closure->Run();
73 ASSERT_TRUE(ran);
74 ASSERT_TRUE(queue.Empty());
75 }
76
TEST(BasicWorkQueueTest,PopMostRecentIsLIFO)77 TEST(BasicWorkQueueTest, PopMostRecentIsLIFO) {
78 BasicWorkQueue queue;
79 int flag = 0;
80 queue.Add([&flag] { flag |= 1; });
81 queue.Add([&flag] { flag |= 2; });
82 queue.PopMostRecent()->Run();
83 EXPECT_FALSE(flag & 1);
84 EXPECT_TRUE(flag & 2);
85 queue.PopMostRecent()->Run();
86 EXPECT_TRUE(flag & 1);
87 EXPECT_TRUE(flag & 2);
88 ASSERT_TRUE(queue.Empty());
89 }
90
TEST(BasicWorkQueueTest,PopOldestIsFIFO)91 TEST(BasicWorkQueueTest, PopOldestIsFIFO) {
92 BasicWorkQueue queue;
93 int flag = 0;
94 queue.Add([&flag] { flag |= 1; });
95 queue.Add([&flag] { flag |= 2; });
96 queue.PopOldest()->Run();
97 EXPECT_TRUE(flag & 1);
98 EXPECT_FALSE(flag & 2);
99 queue.PopOldest()->Run();
100 EXPECT_TRUE(flag & 1);
101 EXPECT_TRUE(flag & 2);
102 ASSERT_TRUE(queue.Empty());
103 }
104
TEST(BasicWorkQueueTest,ThreadedStress)105 TEST(BasicWorkQueueTest, ThreadedStress) {
106 BasicWorkQueue queue;
107 constexpr int thd_count = 33;
108 constexpr int element_count_per_thd = 3333;
109 std::vector<std::thread> threads;
110 threads.reserve(thd_count);
111 class TestClosure : public EventEngine::Closure {
112 public:
113 void Run() override { delete this; }
114 };
115 for (int i = 0; i < thd_count; i++) {
116 threads.emplace_back([&] {
117 for (int j = 0; j < element_count_per_thd; j++) {
118 queue.Add(new TestClosure());
119 }
120 int run_count = 0;
121 while (run_count < element_count_per_thd) {
122 if (auto* c = queue.PopMostRecent()) {
123 c->Run();
124 ++run_count;
125 }
126 }
127 });
128 }
129 for (auto& thd : threads) thd.join();
130 EXPECT_TRUE(queue.Empty());
131 }
132
133 } // namespace
134
main(int argc,char ** argv)135 int main(int argc, char** argv) {
136 testing::InitGoogleTest(&argc, argv);
137 grpc::testing::TestEnvironment env(&argc, argv);
138 auto result = RUN_ALL_TESTS();
139 return result;
140 }
141