1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
15
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/support/port_platform.h>
18
19 #include <thread>
20 #include <vector>
21
22 #include "absl/functional/any_invocable.h"
23 #include "gtest/gtest.h"
24 #include "src/core/lib/event_engine/common_closures.h"
25 #include "test/core/test_util/test_config.h"
26
27 // TODO(hork): parameterize these tests for other WorkQueue implementations.
28
29 namespace {
30 using ::grpc_event_engine::experimental::AnyInvocableClosure;
31 using ::grpc_event_engine::experimental::BasicWorkQueue;
32 using ::grpc_event_engine::experimental::EventEngine;
33
TEST(BasicWorkQueueTest,StartsEmpty)34 TEST(BasicWorkQueueTest, StartsEmpty) {
35 BasicWorkQueue queue;
36 ASSERT_TRUE(queue.Empty());
37 }
38
TEST(BasicWorkQueueTest,TakesClosures)39 TEST(BasicWorkQueueTest, TakesClosures) {
40 BasicWorkQueue queue;
41 bool ran = false;
42 AnyInvocableClosure closure([&ran] { ran = true; });
43 queue.Add(&closure);
44 ASSERT_FALSE(queue.Empty());
45 EventEngine::Closure* popped = queue.PopMostRecent();
46 ASSERT_NE(popped, nullptr);
47 popped->Run();
48 ASSERT_TRUE(ran);
49 ASSERT_TRUE(queue.Empty());
50 }
51
TEST(BasicWorkQueueTest,TakesAnyInvocables)52 TEST(BasicWorkQueueTest, TakesAnyInvocables) {
53 BasicWorkQueue queue;
54 bool ran = false;
55 queue.Add([&ran] { ran = true; });
56 ASSERT_FALSE(queue.Empty());
57 EventEngine::Closure* popped = queue.PopMostRecent();
58 ASSERT_NE(popped, nullptr);
59 popped->Run();
60 ASSERT_TRUE(ran);
61 ASSERT_TRUE(queue.Empty());
62 }
63
TEST(BasicWorkQueueTest,BecomesEmptyOnPopOldest)64 TEST(BasicWorkQueueTest, BecomesEmptyOnPopOldest) {
65 BasicWorkQueue queue;
66 bool ran = false;
67 queue.Add([&ran] { ran = true; });
68 ASSERT_FALSE(queue.Empty());
69 EventEngine::Closure* closure = queue.PopOldest();
70 ASSERT_NE(closure, nullptr);
71 closure->Run();
72 ASSERT_TRUE(ran);
73 ASSERT_TRUE(queue.Empty());
74 }
75
TEST(BasicWorkQueueTest,PopMostRecentIsLIFO)76 TEST(BasicWorkQueueTest, PopMostRecentIsLIFO) {
77 BasicWorkQueue queue;
78 int flag = 0;
79 queue.Add([&flag] { flag |= 1; });
80 queue.Add([&flag] { flag |= 2; });
81 queue.PopMostRecent()->Run();
82 EXPECT_FALSE(flag & 1);
83 EXPECT_TRUE(flag & 2);
84 queue.PopMostRecent()->Run();
85 EXPECT_TRUE(flag & 1);
86 EXPECT_TRUE(flag & 2);
87 ASSERT_TRUE(queue.Empty());
88 }
89
TEST(BasicWorkQueueTest,PopOldestIsFIFO)90 TEST(BasicWorkQueueTest, PopOldestIsFIFO) {
91 BasicWorkQueue queue;
92 int flag = 0;
93 queue.Add([&flag] { flag |= 1; });
94 queue.Add([&flag] { flag |= 2; });
95 queue.PopOldest()->Run();
96 EXPECT_TRUE(flag & 1);
97 EXPECT_FALSE(flag & 2);
98 queue.PopOldest()->Run();
99 EXPECT_TRUE(flag & 1);
100 EXPECT_TRUE(flag & 2);
101 ASSERT_TRUE(queue.Empty());
102 }
103
TEST(BasicWorkQueueTest,ThreadedStress)104 TEST(BasicWorkQueueTest, ThreadedStress) {
105 BasicWorkQueue queue;
106 constexpr int thd_count = 33;
107 constexpr int element_count_per_thd = 3333;
108 std::vector<std::thread> threads;
109 threads.reserve(thd_count);
110 class TestClosure : public EventEngine::Closure {
111 public:
112 void Run() override { delete this; }
113 };
114 for (int i = 0; i < thd_count; i++) {
115 threads.emplace_back([&] {
116 for (int j = 0; j < element_count_per_thd; j++) {
117 queue.Add(new TestClosure());
118 }
119 int run_count = 0;
120 while (run_count < element_count_per_thd) {
121 if (auto* c = queue.PopMostRecent()) {
122 c->Run();
123 ++run_count;
124 }
125 }
126 });
127 }
128 for (auto& thd : threads) thd.join();
129 EXPECT_TRUE(queue.Empty());
130 }
131
132 } // namespace
133
main(int argc,char ** argv)134 int main(int argc, char** argv) {
135 testing::InitGoogleTest(&argc, argv);
136 grpc::testing::TestEnvironment env(&argc, argv);
137 auto result = RUN_ALL_TESTS();
138 return result;
139 }
140