1 /**
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "runtime/include/runtime.h"
19 #include "runtime/thread_pool.h"
20
21 namespace panda::test {
22
23 class MockThreadPoolTest : public testing::Test {
24 public:
25 static const size_t TASK_NUMBER = 32;
MockThreadPoolTest()26 MockThreadPoolTest()
27 {
28 RuntimeOptions options;
29 options.SetShouldLoadBootPandaFiles(false);
30 options.SetShouldInitializeIntrinsics(false);
31 Runtime::Create(options);
32 thread_ = panda::MTManagedThread::GetCurrent();
33 thread_->ManagedCodeBegin();
34 }
35
~MockThreadPoolTest()36 ~MockThreadPoolTest()
37 {
38 thread_->ManagedCodeEnd();
39 Runtime::Destroy();
40 }
41
42 protected:
43 panda::MTManagedThread *thread_;
44 };
45
46 class MockTask : public TaskInterface {
47 public:
MockTask(size_t identifier=0)48 explicit MockTask(size_t identifier = 0) : identifier_(identifier) {}
49
50 enum TaskStatus {
51 NOT_STARTED,
52 IN_QUEUE,
53 PROCESSING,
54 COMPLETED,
55 };
56
IsEmpty() const57 bool IsEmpty() const
58 {
59 return identifier_ == 0;
60 }
61
GetId() const62 size_t GetId() const
63 {
64 return identifier_;
65 }
66
GetStatus() const67 TaskStatus GetStatus() const
68 {
69 return status_;
70 }
71
SetStatus(TaskStatus status)72 void SetStatus(TaskStatus status)
73 {
74 status_ = status;
75 }
76
77 private:
78 size_t identifier_;
79 TaskStatus status_ = NOT_STARTED;
80 };
81
82 class MockQueue : public TaskQueueInterface<MockTask> {
83 public:
MockQueue(mem::InternalAllocatorPtr allocator)84 explicit MockQueue(mem::InternalAllocatorPtr allocator) : queue_(allocator->Adapter()) {}
MockQueue(mem::InternalAllocatorPtr allocator,size_t queue_size)85 MockQueue(mem::InternalAllocatorPtr allocator, size_t queue_size)
86 : TaskQueueInterface<MockTask>(queue_size), queue_(allocator->Adapter())
87 {
88 }
89
GetTask()90 MockTask GetTask() override
91 {
92 if (queue_.empty()) {
93 LOG(DEBUG, RUNTIME) << "Cannot get an element, queue is empty";
94 return MockTask();
95 }
96 auto task = queue_.front();
97 queue_.pop_front();
98 LOG(DEBUG, RUNTIME) << "Extract task " << task.GetId();
99 return task;
100 }
101
102 // NOLINTNEXTLINE(google-default-arguments)
AddTask(MockTask task,size_t priority=0)103 void AddTask(MockTask task, [[maybe_unused]] size_t priority = 0) override
104 {
105 task.SetStatus(MockTask::IN_QUEUE);
106 queue_.push_front(task);
107 }
108
Finalize()109 void Finalize() override
110 {
111 queue_.clear();
112 }
113
114 protected:
GetQueueSize()115 size_t GetQueueSize() override
116 {
117 return queue_.size();
118 }
119
120 private:
121 PandaList<MockTask> queue_;
122 };
123
124 class MockTaskController {
125 public:
MockTaskController()126 explicit MockTaskController() {}
127
SolveTask(MockTask task)128 void SolveTask(MockTask task)
129 {
130 task.SetStatus(MockTask::PROCESSING);
131 // This is required to distribute tasks between different workers rather than solve it instantly
132 // on only one worker.
133 std::this_thread::sleep_for(std::chrono::milliseconds(10));
134 task.SetStatus(MockTask::COMPLETED);
135 LOG(DEBUG, RUNTIME) << "Task " << task.GetId() << " has been solved";
136 solved_tasks_++;
137 }
138
GetSolvedTasks()139 size_t GetSolvedTasks()
140 {
141 return solved_tasks_;
142 }
143
144 private:
145 std::atomic_size_t solved_tasks_ = 0;
146 };
147
148 class MockProcessor : public ProcessorInterface<MockTask, MockTaskController *> {
149 public:
MockProcessor(MockTaskController * controller)150 explicit MockProcessor(MockTaskController *controller) : controller_(controller) {}
151
Process(MockTask task)152 bool Process(MockTask task) override
153 {
154 if (task.GetStatus() == MockTask::IN_QUEUE) {
155 controller_->SolveTask(task);
156 return true;
157 }
158 return false;
159 }
160
161 private:
162 MockTaskController *controller_;
163 };
164
CreateTasks(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_elements)165 void CreateTasks(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool, size_t number_of_elements)
166 {
167 for (size_t i = 0; i < number_of_elements; i++) {
168 MockTask task(i + 1);
169 thread_pool->PutTask(task);
170 LOG(DEBUG, RUNTIME) << "Queue task " << task.GetId();
171 }
172 }
173
TestThreadPool(size_t initial_number_of_threads,size_t scaled_number_of_threads,float scale_threshold)174 void TestThreadPool(size_t initial_number_of_threads, size_t scaled_number_of_threads, float scale_threshold)
175 {
176 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
177 auto queue = allocator->New<MockQueue>(allocator);
178 auto controller = allocator->New<MockTaskController>();
179 auto thread_pool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
180 allocator, queue, controller, initial_number_of_threads, "Test thread");
181
182 CreateTasks(thread_pool, MockThreadPoolTest::TASK_NUMBER);
183
184 if (scale_threshold < 1.0) {
185 while (controller->GetSolvedTasks() < scale_threshold * MockThreadPoolTest::TASK_NUMBER) {
186 }
187 thread_pool->Scale(scaled_number_of_threads);
188 }
189
190 for (;;) {
191 auto solved_tasks = controller->GetSolvedTasks();
192 size_t rate = static_cast<size_t>((static_cast<float>(solved_tasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
193 LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solved_tasks << " (" << rate << "%)";
194 if (scale_threshold == 1.0) {
195 size_t dynamic_scaling = rate / 10 + 1;
196 thread_pool->Scale(dynamic_scaling);
197 }
198
199 if (solved_tasks == MockThreadPoolTest::TASK_NUMBER) {
200 break;
201 }
202 }
203
204 allocator->Delete(thread_pool);
205 allocator->Delete(controller);
206 allocator->Delete(queue);
207 }
208
TEST_F(MockThreadPoolTest,SeveralThreads)209 TEST_F(MockThreadPoolTest, SeveralThreads)
210 {
211 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
212 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
213 constexpr float SCALE_THRESHOLD = 0.0;
214 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
215 }
216
TEST_F(MockThreadPoolTest,ReduceThreads)217 TEST_F(MockThreadPoolTest, ReduceThreads)
218 {
219 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
220 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
221 constexpr float SCALE_THRESHOLD = 0.25;
222 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
223 }
224
TEST_F(MockThreadPoolTest,IncreaseThreads)225 TEST_F(MockThreadPoolTest, IncreaseThreads)
226 {
227 constexpr size_t NUMBER_OF_THREADS_INITIAL = 4;
228 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
229 constexpr float SCALE_THRESHOLD = 0.25;
230 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
231 }
232
TEST_F(MockThreadPoolTest,DifferentNumberOfThreads)233 TEST_F(MockThreadPoolTest, DifferentNumberOfThreads)
234 {
235 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
236 constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
237 constexpr float SCALE_THRESHOLD = 1.0;
238 TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
239 }
240
ControllerThreadPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_tasks)241 void ControllerThreadPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
242 size_t number_of_tasks)
243 {
244 CreateTasks(thread_pool, number_of_tasks);
245 }
246
ControllerThreadTryPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_tasks)247 void ControllerThreadTryPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
248 size_t number_of_tasks)
249 {
250 for (size_t i = 0; i < number_of_tasks; i++) {
251 MockTask task(i + 1);
252 for (;;) {
253 if (thread_pool->TryPutTask(task) || !thread_pool->IsActive()) {
254 break;
255 }
256 }
257 }
258 }
259
ControllerThreadScale(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_threads)260 void ControllerThreadScale(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
261 size_t number_of_threads)
262 {
263 thread_pool->Scale(number_of_threads);
264 }
265
ControllerThreadShutdown(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,bool is_shutdown,bool is_force_shutdown)266 void ControllerThreadShutdown(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool, bool is_shutdown,
267 bool is_force_shutdown)
268 {
269 if (is_shutdown) {
270 thread_pool->Shutdown(is_force_shutdown);
271 }
272 }
273
TestThreadPoolWithControllers(size_t number_of_threads_initial,size_t number_of_threads_scaled,bool is_shutdown,bool is_force_shutdown)274 void TestThreadPoolWithControllers(size_t number_of_threads_initial, size_t number_of_threads_scaled, bool is_shutdown,
275 bool is_force_shutdown)
276 {
277 constexpr size_t NUMBER_OF_TASKS = MockThreadPoolTest::TASK_NUMBER / 4;
278 constexpr size_t QUEUE_SIZE = 16;
279
280 auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
281 auto queue = allocator->New<MockQueue>(allocator, QUEUE_SIZE);
282 auto controller = allocator->New<MockTaskController>();
283 auto thread_pool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
284 allocator, queue, controller, number_of_threads_initial, "Test thread");
285
286 std::thread controller_thread_put_task_1(ControllerThreadPutTask, thread_pool, NUMBER_OF_TASKS);
287 std::thread controller_thread_put_task_2(ControllerThreadPutTask, thread_pool, NUMBER_OF_TASKS);
288 std::thread controller_thread_try_put_task_1(ControllerThreadTryPutTask, thread_pool, NUMBER_OF_TASKS);
289 std::thread controller_thread_try_put_task_2(ControllerThreadTryPutTask, thread_pool, NUMBER_OF_TASKS);
290 std::thread controller_thread_scale_1(ControllerThreadScale, thread_pool, number_of_threads_scaled);
291 std::thread controller_thread_scale_2(ControllerThreadScale, thread_pool,
292 number_of_threads_scaled + number_of_threads_initial);
293 std::thread controller_thread_shutdown_1(ControllerThreadShutdown, thread_pool, is_shutdown, is_force_shutdown);
294 std::thread controller_thread_shutdown_2(ControllerThreadShutdown, thread_pool, is_shutdown, is_force_shutdown);
295
296 // Wait for tasks completion.
297 for (;;) {
298 auto solved_tasks = controller->GetSolvedTasks();
299 size_t rate = static_cast<size_t>((static_cast<float>(solved_tasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
300 LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solved_tasks << " (" << rate << "%)";
301 if (solved_tasks == MockThreadPoolTest::TASK_NUMBER || !thread_pool->IsActive()) {
302 break;
303 }
304 std::this_thread::sleep_for(std::chrono::milliseconds(10));
305 }
306 controller_thread_put_task_1.join();
307 controller_thread_put_task_2.join();
308 controller_thread_try_put_task_1.join();
309 controller_thread_try_put_task_2.join();
310 controller_thread_scale_1.join();
311 controller_thread_scale_2.join();
312 controller_thread_shutdown_1.join();
313 controller_thread_shutdown_2.join();
314
315 allocator->Delete(thread_pool);
316 allocator->Delete(controller);
317 allocator->Delete(queue);
318 }
319
TEST_F(MockThreadPoolTest,Controllers)320 TEST_F(MockThreadPoolTest, Controllers)
321 {
322 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
323 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
324 constexpr bool IS_SHUTDOWN = false;
325 constexpr bool IS_FORCE_SHUTDOWN = false;
326 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
327 }
328
TEST_F(MockThreadPoolTest,ControllersShutdown)329 TEST_F(MockThreadPoolTest, ControllersShutdown)
330 {
331 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
332 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
333 constexpr bool IS_SHUTDOWN = true;
334 constexpr bool IS_FORCE_SHUTDOWN = false;
335 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
336 }
337
TEST_F(MockThreadPoolTest,ControllersForceShutdown)338 TEST_F(MockThreadPoolTest, ControllersForceShutdown)
339 {
340 constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
341 constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
342 constexpr bool IS_SHUTDOWN = true;
343 constexpr bool IS_FORCE_SHUTDOWN = true;
344 TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
345 }
346
347 } // namespace panda::test
348