• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
15 
16 #include <thread>
17 #include <vector>
18 
19 #include "absl/functional/any_invocable.h"
20 #include "gtest/gtest.h"
21 
22 #include <grpc/event_engine/event_engine.h>
23 #include <grpc/support/port_platform.h>
24 
25 #include "src/core/lib/event_engine/common_closures.h"
26 #include "test/core/util/test_config.h"
27 
28 // TODO(hork): parameterize these tests for other WorkQueue implementations.
29 
30 namespace {
31 using ::grpc_event_engine::experimental::AnyInvocableClosure;
32 using ::grpc_event_engine::experimental::BasicWorkQueue;
33 using ::grpc_event_engine::experimental::EventEngine;
34 
TEST(BasicWorkQueueTest,StartsEmpty)35 TEST(BasicWorkQueueTest, StartsEmpty) {
36   BasicWorkQueue queue;
37   ASSERT_TRUE(queue.Empty());
38 }
39 
TEST(BasicWorkQueueTest,TakesClosures)40 TEST(BasicWorkQueueTest, TakesClosures) {
41   BasicWorkQueue queue;
42   bool ran = false;
43   AnyInvocableClosure closure([&ran] { ran = true; });
44   queue.Add(&closure);
45   ASSERT_FALSE(queue.Empty());
46   EventEngine::Closure* popped = queue.PopMostRecent();
47   ASSERT_NE(popped, nullptr);
48   popped->Run();
49   ASSERT_TRUE(ran);
50   ASSERT_TRUE(queue.Empty());
51 }
52 
TEST(BasicWorkQueueTest,TakesAnyInvocables)53 TEST(BasicWorkQueueTest, TakesAnyInvocables) {
54   BasicWorkQueue queue;
55   bool ran = false;
56   queue.Add([&ran] { ran = true; });
57   ASSERT_FALSE(queue.Empty());
58   EventEngine::Closure* popped = queue.PopMostRecent();
59   ASSERT_NE(popped, nullptr);
60   popped->Run();
61   ASSERT_TRUE(ran);
62   ASSERT_TRUE(queue.Empty());
63 }
64 
TEST(BasicWorkQueueTest,BecomesEmptyOnPopOldest)65 TEST(BasicWorkQueueTest, BecomesEmptyOnPopOldest) {
66   BasicWorkQueue queue;
67   bool ran = false;
68   queue.Add([&ran] { ran = true; });
69   ASSERT_FALSE(queue.Empty());
70   EventEngine::Closure* closure = queue.PopOldest();
71   ASSERT_NE(closure, nullptr);
72   closure->Run();
73   ASSERT_TRUE(ran);
74   ASSERT_TRUE(queue.Empty());
75 }
76 
TEST(BasicWorkQueueTest,PopMostRecentIsLIFO)77 TEST(BasicWorkQueueTest, PopMostRecentIsLIFO) {
78   BasicWorkQueue queue;
79   int flag = 0;
80   queue.Add([&flag] { flag |= 1; });
81   queue.Add([&flag] { flag |= 2; });
82   queue.PopMostRecent()->Run();
83   EXPECT_FALSE(flag & 1);
84   EXPECT_TRUE(flag & 2);
85   queue.PopMostRecent()->Run();
86   EXPECT_TRUE(flag & 1);
87   EXPECT_TRUE(flag & 2);
88   ASSERT_TRUE(queue.Empty());
89 }
90 
TEST(BasicWorkQueueTest,PopOldestIsFIFO)91 TEST(BasicWorkQueueTest, PopOldestIsFIFO) {
92   BasicWorkQueue queue;
93   int flag = 0;
94   queue.Add([&flag] { flag |= 1; });
95   queue.Add([&flag] { flag |= 2; });
96   queue.PopOldest()->Run();
97   EXPECT_TRUE(flag & 1);
98   EXPECT_FALSE(flag & 2);
99   queue.PopOldest()->Run();
100   EXPECT_TRUE(flag & 1);
101   EXPECT_TRUE(flag & 2);
102   ASSERT_TRUE(queue.Empty());
103 }
104 
TEST(BasicWorkQueueTest,ThreadedStress)105 TEST(BasicWorkQueueTest, ThreadedStress) {
106   BasicWorkQueue queue;
107   constexpr int thd_count = 33;
108   constexpr int element_count_per_thd = 3333;
109   std::vector<std::thread> threads;
110   threads.reserve(thd_count);
111   class TestClosure : public EventEngine::Closure {
112    public:
113     void Run() override { delete this; }
114   };
115   for (int i = 0; i < thd_count; i++) {
116     threads.emplace_back([&] {
117       for (int j = 0; j < element_count_per_thd; j++) {
118         queue.Add(new TestClosure());
119       }
120       int run_count = 0;
121       while (run_count < element_count_per_thd) {
122         if (auto* c = queue.PopMostRecent()) {
123           c->Run();
124           ++run_count;
125         }
126       }
127     });
128   }
129   for (auto& thd : threads) thd.join();
130   EXPECT_TRUE(queue.Empty());
131 }
132 
133 }  // namespace
134 
main(int argc,char ** argv)135 int main(int argc, char** argv) {
136   testing::InitGoogleTest(&argc, argv);
137   grpc::testing::TestEnvironment env(&argc, argv);
138   auto result = RUN_ALL_TESTS();
139   return result;
140 }
141