• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
15 
16 #include <grpc/event_engine/event_engine.h>
17 #include <grpc/support/port_platform.h>
18 
19 #include <thread>
20 #include <vector>
21 
22 #include "absl/functional/any_invocable.h"
23 #include "gtest/gtest.h"
24 #include "src/core/lib/event_engine/common_closures.h"
25 #include "test/core/test_util/test_config.h"
26 
27 // TODO(hork): parameterize these tests for other WorkQueue implementations.
28 
29 namespace {
30 using ::grpc_event_engine::experimental::AnyInvocableClosure;
31 using ::grpc_event_engine::experimental::BasicWorkQueue;
32 using ::grpc_event_engine::experimental::EventEngine;
33 
TEST(BasicWorkQueueTest,StartsEmpty)34 TEST(BasicWorkQueueTest, StartsEmpty) {
35   BasicWorkQueue queue;
36   ASSERT_TRUE(queue.Empty());
37 }
38 
TEST(BasicWorkQueueTest,TakesClosures)39 TEST(BasicWorkQueueTest, TakesClosures) {
40   BasicWorkQueue queue;
41   bool ran = false;
42   AnyInvocableClosure closure([&ran] { ran = true; });
43   queue.Add(&closure);
44   ASSERT_FALSE(queue.Empty());
45   EventEngine::Closure* popped = queue.PopMostRecent();
46   ASSERT_NE(popped, nullptr);
47   popped->Run();
48   ASSERT_TRUE(ran);
49   ASSERT_TRUE(queue.Empty());
50 }
51 
TEST(BasicWorkQueueTest,TakesAnyInvocables)52 TEST(BasicWorkQueueTest, TakesAnyInvocables) {
53   BasicWorkQueue queue;
54   bool ran = false;
55   queue.Add([&ran] { ran = true; });
56   ASSERT_FALSE(queue.Empty());
57   EventEngine::Closure* popped = queue.PopMostRecent();
58   ASSERT_NE(popped, nullptr);
59   popped->Run();
60   ASSERT_TRUE(ran);
61   ASSERT_TRUE(queue.Empty());
62 }
63 
TEST(BasicWorkQueueTest,BecomesEmptyOnPopOldest)64 TEST(BasicWorkQueueTest, BecomesEmptyOnPopOldest) {
65   BasicWorkQueue queue;
66   bool ran = false;
67   queue.Add([&ran] { ran = true; });
68   ASSERT_FALSE(queue.Empty());
69   EventEngine::Closure* closure = queue.PopOldest();
70   ASSERT_NE(closure, nullptr);
71   closure->Run();
72   ASSERT_TRUE(ran);
73   ASSERT_TRUE(queue.Empty());
74 }
75 
TEST(BasicWorkQueueTest,PopMostRecentIsLIFO)76 TEST(BasicWorkQueueTest, PopMostRecentIsLIFO) {
77   BasicWorkQueue queue;
78   int flag = 0;
79   queue.Add([&flag] { flag |= 1; });
80   queue.Add([&flag] { flag |= 2; });
81   queue.PopMostRecent()->Run();
82   EXPECT_FALSE(flag & 1);
83   EXPECT_TRUE(flag & 2);
84   queue.PopMostRecent()->Run();
85   EXPECT_TRUE(flag & 1);
86   EXPECT_TRUE(flag & 2);
87   ASSERT_TRUE(queue.Empty());
88 }
89 
TEST(BasicWorkQueueTest,PopOldestIsFIFO)90 TEST(BasicWorkQueueTest, PopOldestIsFIFO) {
91   BasicWorkQueue queue;
92   int flag = 0;
93   queue.Add([&flag] { flag |= 1; });
94   queue.Add([&flag] { flag |= 2; });
95   queue.PopOldest()->Run();
96   EXPECT_TRUE(flag & 1);
97   EXPECT_FALSE(flag & 2);
98   queue.PopOldest()->Run();
99   EXPECT_TRUE(flag & 1);
100   EXPECT_TRUE(flag & 2);
101   ASSERT_TRUE(queue.Empty());
102 }
103 
TEST(BasicWorkQueueTest,ThreadedStress)104 TEST(BasicWorkQueueTest, ThreadedStress) {
105   BasicWorkQueue queue;
106   constexpr int thd_count = 33;
107   constexpr int element_count_per_thd = 3333;
108   std::vector<std::thread> threads;
109   threads.reserve(thd_count);
110   class TestClosure : public EventEngine::Closure {
111    public:
112     void Run() override { delete this; }
113   };
114   for (int i = 0; i < thd_count; i++) {
115     threads.emplace_back([&] {
116       for (int j = 0; j < element_count_per_thd; j++) {
117         queue.Add(new TestClosure());
118       }
119       int run_count = 0;
120       while (run_count < element_count_per_thd) {
121         if (auto* c = queue.PopMostRecent()) {
122           c->Run();
123           ++run_count;
124         }
125       }
126     });
127   }
128   for (auto& thd : threads) thd.join();
129   EXPECT_TRUE(queue.Empty());
130 }
131 
132 }  // namespace
133 
main(int argc,char ** argv)134 int main(int argc, char** argv) {
135   testing::InitGoogleTest(&argc, argv);
136   grpc::testing::TestEnvironment env(&argc, argv);
137   auto result = RUN_ALL_TESTS();
138   return result;
139 }
140