• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "runtime/include/runtime.h"
19 #include "runtime/thread_pool.h"
20 
21 namespace panda::test {
22 
23 class MockThreadPoolTest : public testing::Test {
24 public:
25     static const size_t TASK_NUMBER = 32;
MockThreadPoolTest()26     MockThreadPoolTest()
27     {
28         RuntimeOptions options;
29         options.SetShouldLoadBootPandaFiles(false);
30         options.SetShouldInitializeIntrinsics(false);
31         Runtime::Create(options);
32         thread_ = panda::MTManagedThread::GetCurrent();
33         thread_->ManagedCodeBegin();
34     }
35 
~MockThreadPoolTest()36     ~MockThreadPoolTest() override
37     {
38         thread_->ManagedCodeEnd();
39         Runtime::Destroy();
40     }
41 
42     NO_COPY_SEMANTIC(MockThreadPoolTest);
43     NO_MOVE_SEMANTIC(MockThreadPoolTest);
44 
45 private:
46     panda::MTManagedThread *thread_;
47 };
48 
49 class MockTask : public TaskInterface {
50 public:
MockTask(size_t identifier=0)51     explicit MockTask(size_t identifier = 0) : identifier_(identifier) {}
52 
53     enum TaskStatus {
54         NOT_STARTED,
55         IN_QUEUE,
56         PROCESSING,
57         COMPLETED,
58     };
59 
IsEmpty() const60     bool IsEmpty() const
61     {
62         return identifier_ == 0;
63     }
64 
GetId() const65     size_t GetId() const
66     {
67         return identifier_;
68     }
69 
GetStatus() const70     TaskStatus GetStatus() const
71     {
72         return status_;
73     }
74 
SetStatus(TaskStatus status)75     void SetStatus(TaskStatus status)
76     {
77         status_ = status;
78     }
79 
80 private:
81     size_t identifier_;
82     TaskStatus status_ = NOT_STARTED;
83 };
84 
85 class MockQueue : public TaskQueueInterface<MockTask> {
86 public:
MockQueue(mem::InternalAllocatorPtr allocator)87     explicit MockQueue(mem::InternalAllocatorPtr allocator) : queue_(allocator->Adapter()) {}
MockQueue(mem::InternalAllocatorPtr allocator,size_t queueSize)88     MockQueue(mem::InternalAllocatorPtr allocator, size_t queueSize)
89         : TaskQueueInterface<MockTask>(queueSize), queue_(allocator->Adapter())
90     {
91     }
92 
GetTask()93     MockTask GetTask() override
94     {
95         if (queue_.empty()) {
96             LOG(DEBUG, RUNTIME) << "Cannot get an element, queue is empty";
97             return MockTask();
98         }
99         auto task = queue_.front();
100         queue_.pop_front();
101         LOG(DEBUG, RUNTIME) << "Extract task " << task.GetId();
102         return task;
103     }
104 
105     // NOLINTNEXTLINE(google-default-arguments)
AddTask(MockTask && task,size_t priority=0)106     void AddTask(MockTask &&task, [[maybe_unused]] size_t priority = 0) override
107     {
108         task.SetStatus(MockTask::IN_QUEUE);
109         queue_.push_front(task);
110     }
111 
Finalize()112     void Finalize() override
113     {
114         queue_.clear();
115     }
116 
117 protected:
GetQueueSize()118     size_t GetQueueSize() override
119     {
120         return queue_.size();
121     }
122 
123 private:
124     PandaList<MockTask> queue_;
125 };
126 
127 class MockTaskController {
128 public:
129     explicit MockTaskController() = default;
130 
SolveTask(MockTask task)131     void SolveTask(MockTask task)
132     {
133         task.SetStatus(MockTask::PROCESSING);
134         // This is required to distribute tasks between different workers rather than solve it instantly
135         // on only one worker.
136         // NOLINTNEXTLINE(readability-magic-numbers)
137         std::this_thread::sleep_for(std::chrono::milliseconds(10U));
138         task.SetStatus(MockTask::COMPLETED);
139         LOG(DEBUG, RUNTIME) << "Task " << task.GetId() << " has been solved";
140         solvedTasks_++;
141     }
142 
GetSolvedTasks()143     size_t GetSolvedTasks()
144     {
145         return solvedTasks_;
146     }
147 
148 private:
149     std::atomic_size_t solvedTasks_ = 0;
150 };
151 
152 class MockProcessor : public ProcessorInterface<MockTask, MockTaskController *> {
153 public:
MockProcessor(MockTaskController * controller)154     explicit MockProcessor(MockTaskController *controller) : controller_(controller) {}
155 
Process(MockTask && task)156     bool Process(MockTask &&task) override
157     {
158         if (task.GetStatus() == MockTask::IN_QUEUE) {
159             controller_->SolveTask(task);
160             return true;
161         }
162         return false;
163     }
164 
165 private:
166     MockTaskController *controller_;
167 };
168 
CreateTasks(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfElements)169 void CreateTasks(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool, size_t numberOfElements)
170 {
171     for (size_t i = 0; i < numberOfElements; i++) {
172         MockTask task(i + 1);
173         LOG(DEBUG, RUNTIME) << "Queue task " << task.GetId();
174         // NOLINTNEXTLINE(performance-move-const-arg)
175         threadPool->PutTask(std::move(task));
176     }
177 }
178 
TestThreadPool(size_t initialNumberOfThreads,size_t scaledNumberOfThreads,float scaleThreshold)179 void TestThreadPool(size_t initialNumberOfThreads, size_t scaledNumberOfThreads, float scaleThreshold)
180 {
181     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
182     auto queue = allocator->New<MockQueue>(allocator);
183     auto controller = allocator->New<MockTaskController>();
184     auto threadPool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
185         allocator, queue, controller, initialNumberOfThreads, "Test thread");
186 
187     CreateTasks(threadPool, MockThreadPoolTest::TASK_NUMBER);
188 
189     if (scaleThreshold < 1.0) {
190         while (controller->GetSolvedTasks() < scaleThreshold * MockThreadPoolTest::TASK_NUMBER) {
191         }
192         threadPool->Scale(scaledNumberOfThreads);
193     }
194 
195     for (;;) {
196         auto solvedTasks = controller->GetSolvedTasks();
197         // NOLINTNEXTLINE(readability-magic-numbers)
198         auto rate = static_cast<size_t>((static_cast<float>(solvedTasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
199         LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solvedTasks << " (" << rate << "%)";
200         if (scaleThreshold == 1.0) {
201             // NOLINTNEXTLINE(readability-magic-numbers)
202             size_t dynamicScaling = rate / 10 + 1;
203             threadPool->Scale(dynamicScaling);
204         }
205 
206         if (solvedTasks == MockThreadPoolTest::TASK_NUMBER) {
207             break;
208         }
209     }
210 
211     allocator->Delete(threadPool);
212     allocator->Delete(controller);
213     allocator->Delete(queue);
214 }
215 
TEST_F(MockThreadPoolTest,SeveralThreads)216 TEST_F(MockThreadPoolTest, SeveralThreads)
217 {
218     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
219     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
220     constexpr float SCALE_THRESHOLD = 0.0;
221     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
222 }
223 
TEST_F(MockThreadPoolTest,ReduceThreads)224 TEST_F(MockThreadPoolTest, ReduceThreads)
225 {
226     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
227     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
228     constexpr float SCALE_THRESHOLD = 0.25;
229     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
230 }
231 
TEST_F(MockThreadPoolTest,IncreaseThreads)232 TEST_F(MockThreadPoolTest, IncreaseThreads)
233 {
234     constexpr size_t NUMBER_OF_THREADS_INITIAL = 4;
235     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
236     constexpr float SCALE_THRESHOLD = 0.25;
237     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
238 }
239 
TEST_F(MockThreadPoolTest,DifferentNumberOfThreads)240 TEST_F(MockThreadPoolTest, DifferentNumberOfThreads)
241 {
242     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
243     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
244     constexpr float SCALE_THRESHOLD = 1.0;
245     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
246 }
247 
ControllerThreadPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfTasks)248 void ControllerThreadPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
249                              size_t numberOfTasks)
250 {
251     CreateTasks(threadPool, numberOfTasks);
252 }
253 
ControllerThreadTryPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfTasks)254 void ControllerThreadTryPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
255                                 size_t numberOfTasks)
256 {
257     for (size_t i = 0; i < numberOfTasks; i++) {
258         for (;;) {
259             if (threadPool->TryPutTask(MockTask {i + 1}) || !threadPool->IsActive()) {
260                 break;
261             }
262         }
263     }
264 }
265 
ControllerThreadScale(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfThreads)266 void ControllerThreadScale(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
267                            size_t numberOfThreads)
268 {
269     threadPool->Scale(numberOfThreads);
270 }
271 
ControllerThreadShutdown(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,bool isShutdown,bool isForceShutdown)272 void ControllerThreadShutdown(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool, bool isShutdown,
273                               bool isForceShutdown)
274 {
275     if (isShutdown) {
276         threadPool->Shutdown(isForceShutdown);
277     }
278 }
279 
TestThreadPoolWithControllers(size_t numberOfThreadsInitial,size_t numberOfThreadsScaled,bool isShutdown,bool isForceShutdown)280 void TestThreadPoolWithControllers(size_t numberOfThreadsInitial, size_t numberOfThreadsScaled, bool isShutdown,
281                                    bool isForceShutdown)
282 {
283     constexpr size_t NUMBER_OF_TASKS = MockThreadPoolTest::TASK_NUMBER / 4;
284     constexpr size_t QUEUE_SIZE = 16;
285 
286     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
287     auto queue = allocator->New<MockQueue>(allocator, QUEUE_SIZE);
288     auto controller = allocator->New<MockTaskController>();
289     auto threadPool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
290         allocator, queue, controller, numberOfThreadsInitial, "Test thread");
291 
292     std::thread controllerThreadPutTask1(ControllerThreadPutTask, threadPool, NUMBER_OF_TASKS);
293     std::thread controllerThreadPutTask2(ControllerThreadPutTask, threadPool, NUMBER_OF_TASKS);
294     std::thread controllerThreadTryPutTask1(ControllerThreadTryPutTask, threadPool, NUMBER_OF_TASKS);
295     std::thread controllerThreadTryPutTask2(ControllerThreadTryPutTask, threadPool, NUMBER_OF_TASKS);
296     std::thread controllerThreadScale1(ControllerThreadScale, threadPool, numberOfThreadsScaled);
297     std::thread controllerThreadScale2(ControllerThreadScale, threadPool,
298                                        numberOfThreadsScaled + numberOfThreadsInitial);
299     std::thread controllerThreadShutdown1(ControllerThreadShutdown, threadPool, isShutdown, isForceShutdown);
300     std::thread controllerThreadShutdown2(ControllerThreadShutdown, threadPool, isShutdown, isForceShutdown);
301 
302     // Wait for tasks completion.
303     for (;;) {
304         auto solvedTasks = controller->GetSolvedTasks();
305         // NOLINTNEXTLINE(readability-magic-numbers)
306         auto rate = static_cast<size_t>((static_cast<float>(solvedTasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
307         (void)rate;
308         LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solvedTasks << " (" << rate << "%)";
309         if (solvedTasks == MockThreadPoolTest::TASK_NUMBER || !threadPool->IsActive()) {
310             break;
311         }
312         // NOLINTNEXTLINE(readability-magic-numbers)
313         std::this_thread::sleep_for(std::chrono::milliseconds(10U));
314     }
315     controllerThreadPutTask1.join();
316     controllerThreadPutTask2.join();
317     controllerThreadTryPutTask1.join();
318     controllerThreadTryPutTask2.join();
319     controllerThreadScale1.join();
320     controllerThreadScale2.join();
321     controllerThreadShutdown1.join();
322     controllerThreadShutdown2.join();
323 
324     allocator->Delete(threadPool);
325     allocator->Delete(controller);
326     allocator->Delete(queue);
327 }
328 
TEST_F(MockThreadPoolTest,Controllers)329 TEST_F(MockThreadPoolTest, Controllers)
330 {
331     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
332     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
333     constexpr bool IS_SHUTDOWN = false;
334     constexpr bool IS_FORCE_SHUTDOWN = false;
335     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
336 }
337 
TEST_F(MockThreadPoolTest,ControllersShutdown)338 TEST_F(MockThreadPoolTest, ControllersShutdown)
339 {
340     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
341     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
342     constexpr bool IS_SHUTDOWN = true;
343     constexpr bool IS_FORCE_SHUTDOWN = false;
344     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
345 }
346 
TEST_F(MockThreadPoolTest,ControllersForceShutdown)347 TEST_F(MockThreadPoolTest, ControllersForceShutdown)
348 {
349     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
350     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
351     constexpr bool IS_SHUTDOWN = true;
352     constexpr bool IS_FORCE_SHUTDOWN = true;
353     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
354 }
355 
356 }  // namespace panda::test
357