1 /**
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "runtime/include/runtime.h"
19 #include "runtime/thread_pool.h"
20
21 namespace panda::test {
22
23 class MockThreadPoolTest : public testing::Test {
24 public:
25 static const size_t TASK_NUMBER = 32;
MockThreadPoolTest()26 MockThreadPoolTest()
27 {
28 RuntimeOptions options;
29 options.SetShouldLoadBootPandaFiles(false);
30 options.SetShouldInitializeIntrinsics(false);
31 Runtime::Create(options);
32 thread_ = panda::MTManagedThread::GetCurrent();
33 thread_->ManagedCodeBegin();
34 }
35
~MockThreadPoolTest()36 ~MockThreadPoolTest() override
37 {
38 thread_->ManagedCodeEnd();
39 Runtime::Destroy();
40 }
41
42 NO_COPY_SEMANTIC(MockThreadPoolTest);
43 NO_MOVE_SEMANTIC(MockThreadPoolTest);
44
45 private:
46 panda::MTManagedThread *thread_;
47 };
48
49 class MockTask : public TaskInterface {
50 public:
MockTask(size_t identifier=0)51 explicit MockTask(size_t identifier = 0) : identifier_(identifier) {}
52
53 enum TaskStatus {
54 NOT_STARTED,
55 IN_QUEUE,
56 PROCESSING,
57 COMPLETED,
58 };
59
IsEmpty() const60 bool IsEmpty() const
61 {
62 return identifier_ == 0;
63 }
64
GetId() const65 size_t GetId() const
66 {
67 return identifier_;
68 }
69
GetStatus() const70 TaskStatus GetStatus() const
71 {
72 return status_;
73 }
74
SetStatus(TaskStatus status)75 void SetStatus(TaskStatus status)
76 {
77 status_ = status;
78 }
79
80 private:
81 size_t identifier_;
82 TaskStatus status_ = NOT_STARTED;
83 };
84
85 class MockQueue : public TaskQueueInterface<MockTask> {
86 public:
MockQueue(mem::InternalAllocatorPtr allocator)87 explicit MockQueue(mem::InternalAllocatorPtr allocator) : queue_(allocator->Adapter()) {}
MockQueue(mem::InternalAllocatorPtr allocator,size_t queueSize)88 MockQueue(mem::InternalAllocatorPtr allocator, size_t queueSize)
89 : TaskQueueInterface<MockTask>(queueSize), queue_(allocator->Adapter())
90 {
91 }
92
GetTask()93 MockTask GetTask() override
94 {
95 if (queue_.empty()) {
96 LOG(DEBUG, RUNTIME) << "Cannot get an element, queue is empty";
97 return MockTask();
98 }
99 auto task = queue_.front();
100 queue_.pop_front();
101 LOG(DEBUG, RUNTIME) << "Extract task " << task.GetId();
102 return task;
103 }
104
105 // NOLINTNEXTLINE(google-default-arguments)
AddTask(MockTask && task,size_t priority=0)106 void AddTask(MockTask &&task, [[maybe_unused]] size_t priority = 0) override
107 {
108 task.SetStatus(MockTask::IN_QUEUE);
109 queue_.push_front(task);
110 }
111
Finalize()112 void Finalize() override
113 {
114 queue_.clear();
115 }
116
117 protected:
GetQueueSize()118 size_t GetQueueSize() override
119 {
120 return queue_.size();
121 }
122
123 private:
124 PandaList<MockTask> queue_;
125 };
126
127 class MockTaskController {
128 public:
129 explicit MockTaskController() = default;
130
SolveTask(MockTask task)131 void SolveTask(MockTask task)
132 {
133 task.SetStatus(MockTask::PROCESSING);
134 // This is required to distribute tasks between different workers rather than solve it instantly
135 // on only one worker.
136 // NOLINTNEXTLINE(readability-magic-numbers)
137 std::this_thread::sleep_for(std::chrono::milliseconds(10U));
138 task.SetStatus(MockTask::COMPLETED);
139 LOG(DEBUG, RUNTIME) << "Task " << task.GetId() << " has been solved";
140 solvedTasks_++;
141 }
142
GetSolvedTasks()143 size_t GetSolvedTasks()
144 {
145 return solvedTasks_;
146 }
147
148 private:
149 std::atomic_size_t solvedTasks_ = 0;
150 };
151
152 class MockProcessor : public ProcessorInterface<MockTask, MockTaskController *> {
153 public:
MockProcessor(MockTaskController * controller)154 explicit MockProcessor(MockTaskController *controller) : controller_(controller) {}
155
Process(MockTask && task)156 bool Process(MockTask &&task) override
157 {
158 if (task.GetStatus() == MockTask::IN_QUEUE) {
159 controller_->SolveTask(task);
160 return true;
161 }
162 return false;
163 }
164
165 private:
166 MockTaskController *controller_;
167 };
168
CreateTasks(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfElements)169 void CreateTasks(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool, size_t numberOfElements)
170 {
171 for (size_t i = 0; i < numberOfElements; i++) {
172 MockTask task(i + 1);
173 LOG(DEBUG, RUNTIME) << "Queue task " << task.GetId();
174 // NOLINTNEXTLINE(performance-move-const-arg)
175 threadPool->PutTask(std::move(task));
176 }
177 }
178
TestThreadPool(size_t initialNumberOfThreads,size_t scaledNumberOfThreads,float scaleThreshold)179 void TestThreadPool(size_t initialNumberOfThreads, size_t scaledNumberOfThreads, float scaleThreshold)
180 {
181 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
182 auto queue = allocator->New<MockQueue>(allocator);
183 auto controller = allocator->New<MockTaskController>();
184 auto threadPool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
185 allocator, queue, controller, initialNumberOfThreads, "Test thread");
186
187 CreateTasks(threadPool, MockThreadPoolTest::TASK_NUMBER);
188
189 if (scaleThreshold < 1.0) {
190 while (controller->GetSolvedTasks() < scaleThreshold * MockThreadPoolTest::TASK_NUMBER) {
191 }
192 threadPool->Scale(scaledNumberOfThreads);
193 }
194
195 for (;;) {
196 auto solvedTasks = controller->GetSolvedTasks();
197 // NOLINTNEXTLINE(readability-magic-numbers)
198 auto rate = static_cast<size_t>((static_cast<float>(solvedTasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
199 LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solvedTasks << " (" << rate << "%)";
200 if (scaleThreshold == 1.0) {
201 // NOLINTNEXTLINE(readability-magic-numbers)
202 size_t dynamicScaling = rate / 10 + 1;
203 threadPool->Scale(dynamicScaling);
204 }
205
206 if (solvedTasks == MockThreadPoolTest::TASK_NUMBER) {
207 break;
208 }
209 }
210
211 allocator->Delete(threadPool);
212 allocator->Delete(controller);
213 allocator->Delete(queue);
214 }
215
TEST_F(MockThreadPoolTest,SeveralThreads)216 TEST_F(MockThreadPoolTest, SeveralThreads)
217 {
218 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
219 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
220 constexpr float SCALE_THRESHOLD = 0.0;
221 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
222 }
223
TEST_F(MockThreadPoolTest,ReduceThreads)224 TEST_F(MockThreadPoolTest, ReduceThreads)
225 {
226 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
227 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
228 constexpr float SCALE_THRESHOLD = 0.25;
229 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
230 }
231
TEST_F(MockThreadPoolTest,IncreaseThreads)232 TEST_F(MockThreadPoolTest, IncreaseThreads)
233 {
234 constexpr size_t NUMBER_OF_THREADS_INITIAL = 4;
235 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
236 constexpr float SCALE_THRESHOLD = 0.25;
237 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
238 }
239
TEST_F(MockThreadPoolTest,DifferentNumberOfThreads)240 TEST_F(MockThreadPoolTest, DifferentNumberOfThreads)
241 {
242 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
243 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
244 constexpr float SCALE_THRESHOLD = 1.0;
245 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
246 }
247
ControllerThreadPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfTasks)248 void ControllerThreadPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
249 size_t numberOfTasks)
250 {
251 CreateTasks(threadPool, numberOfTasks);
252 }
253
ControllerThreadTryPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfTasks)254 void ControllerThreadTryPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
255 size_t numberOfTasks)
256 {
257 for (size_t i = 0; i < numberOfTasks; i++) {
258 for (;;) {
259 if (threadPool->TryPutTask(MockTask {i + 1}) || !threadPool->IsActive()) {
260 break;
261 }
262 }
263 }
264 }
265
ControllerThreadScale(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,size_t numberOfThreads)266 void ControllerThreadScale(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool,
267 size_t numberOfThreads)
268 {
269 threadPool->Scale(numberOfThreads);
270 }
271
ControllerThreadShutdown(ThreadPool<MockTask,MockProcessor,MockTaskController * > * threadPool,bool isShutdown,bool isForceShutdown)272 void ControllerThreadShutdown(ThreadPool<MockTask, MockProcessor, MockTaskController *> *threadPool, bool isShutdown,
273 bool isForceShutdown)
274 {
275 if (isShutdown) {
276 threadPool->Shutdown(isForceShutdown);
277 }
278 }
279
TestThreadPoolWithControllers(size_t numberOfThreadsInitial,size_t numberOfThreadsScaled,bool isShutdown,bool isForceShutdown)280 void TestThreadPoolWithControllers(size_t numberOfThreadsInitial, size_t numberOfThreadsScaled, bool isShutdown,
281 bool isForceShutdown)
282 {
283 constexpr size_t NUMBER_OF_TASKS = MockThreadPoolTest::TASK_NUMBER / 4;
284 constexpr size_t QUEUE_SIZE = 16;
285
286 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
287 auto queue = allocator->New<MockQueue>(allocator, QUEUE_SIZE);
288 auto controller = allocator->New<MockTaskController>();
289 auto threadPool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
290 allocator, queue, controller, numberOfThreadsInitial, "Test thread");
291
292 std::thread controllerThreadPutTask1(ControllerThreadPutTask, threadPool, NUMBER_OF_TASKS);
293 std::thread controllerThreadPutTask2(ControllerThreadPutTask, threadPool, NUMBER_OF_TASKS);
294 std::thread controllerThreadTryPutTask1(ControllerThreadTryPutTask, threadPool, NUMBER_OF_TASKS);
295 std::thread controllerThreadTryPutTask2(ControllerThreadTryPutTask, threadPool, NUMBER_OF_TASKS);
296 std::thread controllerThreadScale1(ControllerThreadScale, threadPool, numberOfThreadsScaled);
297 std::thread controllerThreadScale2(ControllerThreadScale, threadPool,
298 numberOfThreadsScaled + numberOfThreadsInitial);
299 std::thread controllerThreadShutdown1(ControllerThreadShutdown, threadPool, isShutdown, isForceShutdown);
300 std::thread controllerThreadShutdown2(ControllerThreadShutdown, threadPool, isShutdown, isForceShutdown);
301
302 // Wait for tasks completion.
303 for (;;) {
304 auto solvedTasks = controller->GetSolvedTasks();
305 // NOLINTNEXTLINE(readability-magic-numbers)
306 auto rate = static_cast<size_t>((static_cast<float>(solvedTasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
307 (void)rate;
308 LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solvedTasks << " (" << rate << "%)";
309 if (solvedTasks == MockThreadPoolTest::TASK_NUMBER || !threadPool->IsActive()) {
310 break;
311 }
312 // NOLINTNEXTLINE(readability-magic-numbers)
313 std::this_thread::sleep_for(std::chrono::milliseconds(10U));
314 }
315 controllerThreadPutTask1.join();
316 controllerThreadPutTask2.join();
317 controllerThreadTryPutTask1.join();
318 controllerThreadTryPutTask2.join();
319 controllerThreadScale1.join();
320 controllerThreadScale2.join();
321 controllerThreadShutdown1.join();
322 controllerThreadShutdown2.join();
323
324 allocator->Delete(threadPool);
325 allocator->Delete(controller);
326 allocator->Delete(queue);
327 }
328
TEST_F(MockThreadPoolTest,Controllers)329 TEST_F(MockThreadPoolTest, Controllers)
330 {
331 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
332 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
333 constexpr bool IS_SHUTDOWN = false;
334 constexpr bool IS_FORCE_SHUTDOWN = false;
335 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
336 }
337
TEST_F(MockThreadPoolTest,ControllersShutdown)338 TEST_F(MockThreadPoolTest, ControllersShutdown)
339 {
340 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
341 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
342 constexpr bool IS_SHUTDOWN = true;
343 constexpr bool IS_FORCE_SHUTDOWN = false;
344 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
345 }
346
TEST_F(MockThreadPoolTest,ControllersForceShutdown)347 TEST_F(MockThreadPoolTest, ControllersForceShutdown)
348 {
349 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
350 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
351 constexpr bool IS_SHUTDOWN = true;
352 constexpr bool IS_FORCE_SHUTDOWN = true;
353 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
354 }
355
356 } // namespace panda::test
357