• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "runtime/include/runtime.h"
19 #include "runtime/thread_pool.h"
20 
21 namespace panda::test {
22 
23 class MockThreadPoolTest : public testing::Test {
24 public:
25     static const size_t TASK_NUMBER = 32;
MockThreadPoolTest()26     MockThreadPoolTest()
27     {
28         RuntimeOptions options;
29         options.SetShouldLoadBootPandaFiles(false);
30         options.SetShouldInitializeIntrinsics(false);
31         Runtime::Create(options);
32         thread_ = panda::MTManagedThread::GetCurrent();
33         thread_->ManagedCodeBegin();
34     }
35 
~MockThreadPoolTest()36     ~MockThreadPoolTest()
37     {
38         thread_->ManagedCodeEnd();
39         Runtime::Destroy();
40     }
41 
42 protected:
43     panda::MTManagedThread *thread_;
44 };
45 
46 class MockTask : public TaskInterface {
47 public:
MockTask(size_t identifier=0)48     explicit MockTask(size_t identifier = 0) : identifier_(identifier) {}
49 
50     enum TaskStatus {
51         NOT_STARTED,
52         IN_QUEUE,
53         PROCESSING,
54         COMPLETED,
55     };
56 
IsEmpty() const57     bool IsEmpty() const
58     {
59         return identifier_ == 0;
60     }
61 
GetId() const62     size_t GetId() const
63     {
64         return identifier_;
65     }
66 
GetStatus() const67     TaskStatus GetStatus() const
68     {
69         return status_;
70     }
71 
SetStatus(TaskStatus status)72     void SetStatus(TaskStatus status)
73     {
74         status_ = status;
75     }
76 
77 private:
78     size_t identifier_;
79     TaskStatus status_ = NOT_STARTED;
80 };
81 
82 class MockQueue : public TaskQueueInterface<MockTask> {
83 public:
MockQueue(mem::InternalAllocatorPtr allocator)84     explicit MockQueue(mem::InternalAllocatorPtr allocator) : queue_(allocator->Adapter()) {}
MockQueue(mem::InternalAllocatorPtr allocator,size_t queue_size)85     MockQueue(mem::InternalAllocatorPtr allocator, size_t queue_size)
86         : TaskQueueInterface<MockTask>(queue_size), queue_(allocator->Adapter())
87     {
88     }
89 
GetTask()90     MockTask GetTask() override
91     {
92         if (queue_.empty()) {
93             LOG(DEBUG, RUNTIME) << "Cannot get an element, queue is empty";
94             return MockTask();
95         }
96         auto task = queue_.front();
97         queue_.pop_front();
98         LOG(DEBUG, RUNTIME) << "Extract task " << task.GetId();
99         return task;
100     }
101 
102     // NOLINTNEXTLINE(google-default-arguments)
AddTask(MockTask task,size_t priority=0)103     void AddTask(MockTask task, [[maybe_unused]] size_t priority = 0) override
104     {
105         task.SetStatus(MockTask::IN_QUEUE);
106         queue_.push_front(task);
107     }
108 
Finalize()109     void Finalize() override
110     {
111         queue_.clear();
112     }
113 
114 protected:
GetQueueSize()115     size_t GetQueueSize() override
116     {
117         return queue_.size();
118     }
119 
120 private:
121     PandaList<MockTask> queue_;
122 };
123 
124 class MockTaskController {
125 public:
MockTaskController()126     explicit MockTaskController() {}
127 
SolveTask(MockTask task)128     void SolveTask(MockTask task)
129     {
130         task.SetStatus(MockTask::PROCESSING);
131         // This is required to distribute tasks between different workers rather than solve it instantly
132         // on only one worker.
133         std::this_thread::sleep_for(std::chrono::milliseconds(10));
134         task.SetStatus(MockTask::COMPLETED);
135         LOG(DEBUG, RUNTIME) << "Task " << task.GetId() << " has been solved";
136         solved_tasks_++;
137     }
138 
GetSolvedTasks()139     size_t GetSolvedTasks()
140     {
141         return solved_tasks_;
142     }
143 
144 private:
145     std::atomic_size_t solved_tasks_ = 0;
146 };
147 
148 class MockProcessor : public ProcessorInterface<MockTask, MockTaskController *> {
149 public:
MockProcessor(MockTaskController * controller)150     explicit MockProcessor(MockTaskController *controller) : controller_(controller) {}
151 
Process(MockTask task)152     bool Process(MockTask task) override
153     {
154         if (task.GetStatus() == MockTask::IN_QUEUE) {
155             controller_->SolveTask(task);
156             return true;
157         }
158         return false;
159     }
160 
161 private:
162     MockTaskController *controller_;
163 };
164 
CreateTasks(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_elements)165 void CreateTasks(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool, size_t number_of_elements)
166 {
167     for (size_t i = 0; i < number_of_elements; i++) {
168         MockTask task(i + 1);
169         thread_pool->PutTask(task);
170         LOG(DEBUG, RUNTIME) << "Queue task " << task.GetId();
171     }
172 }
173 
TestThreadPool(size_t initial_number_of_threads,size_t scaled_number_of_threads,float scale_threshold)174 void TestThreadPool(size_t initial_number_of_threads, size_t scaled_number_of_threads, float scale_threshold)
175 {
176     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
177     auto queue = allocator->New<MockQueue>(allocator);
178     auto controller = allocator->New<MockTaskController>();
179     auto thread_pool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
180         allocator, queue, controller, initial_number_of_threads, "Test thread");
181 
182     CreateTasks(thread_pool, MockThreadPoolTest::TASK_NUMBER);
183 
184     if (scale_threshold < 1.0) {
185         while (controller->GetSolvedTasks() < scale_threshold * MockThreadPoolTest::TASK_NUMBER) {
186         }
187         thread_pool->Scale(scaled_number_of_threads);
188     }
189 
190     for (;;) {
191         auto solved_tasks = controller->GetSolvedTasks();
192         size_t rate = static_cast<size_t>((static_cast<float>(solved_tasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
193         LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solved_tasks << " (" << rate << "%)";
194         if (scale_threshold == 1.0) {
195             size_t dynamic_scaling = rate / 10 + 1;
196             thread_pool->Scale(dynamic_scaling);
197         }
198 
199         if (solved_tasks == MockThreadPoolTest::TASK_NUMBER) {
200             break;
201         }
202     }
203 
204     allocator->Delete(thread_pool);
205     allocator->Delete(controller);
206     allocator->Delete(queue);
207 }
208 
TEST_F(MockThreadPoolTest,SeveralThreads)209 TEST_F(MockThreadPoolTest, SeveralThreads)
210 {
211     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
212     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
213     constexpr float SCALE_THRESHOLD = 0.0;
214     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
215 }
216 
TEST_F(MockThreadPoolTest,ReduceThreads)217 TEST_F(MockThreadPoolTest, ReduceThreads)
218 {
219     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
220     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
221     constexpr float SCALE_THRESHOLD = 0.25;
222     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
223 }
224 
TEST_F(MockThreadPoolTest,IncreaseThreads)225 TEST_F(MockThreadPoolTest, IncreaseThreads)
226 {
227     constexpr size_t NUMBER_OF_THREADS_INITIAL = 4;
228     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
229     constexpr float SCALE_THRESHOLD = 0.25;
230     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
231 }
232 
TEST_F(MockThreadPoolTest,DifferentNumberOfThreads)233 TEST_F(MockThreadPoolTest, DifferentNumberOfThreads)
234 {
235     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
236     constexpr size_t NUMBER_OF_THREADS_SCALED = 8;
237     constexpr float SCALE_THRESHOLD = 1.0;
238     TestThreadPool(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, SCALE_THRESHOLD);
239 }
240 
ControllerThreadPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_tasks)241 void ControllerThreadPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
242                              size_t number_of_tasks)
243 {
244     CreateTasks(thread_pool, number_of_tasks);
245 }
246 
ControllerThreadTryPutTask(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_tasks)247 void ControllerThreadTryPutTask(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
248                                 size_t number_of_tasks)
249 {
250     for (size_t i = 0; i < number_of_tasks; i++) {
251         MockTask task(i + 1);
252         for (;;) {
253             if (thread_pool->TryPutTask(task) || !thread_pool->IsActive()) {
254                 break;
255             }
256         }
257     }
258 }
259 
ControllerThreadScale(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,size_t number_of_threads)260 void ControllerThreadScale(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool,
261                            size_t number_of_threads)
262 {
263     thread_pool->Scale(number_of_threads);
264 }
265 
ControllerThreadShutdown(ThreadPool<MockTask,MockProcessor,MockTaskController * > * thread_pool,bool is_shutdown,bool is_force_shutdown)266 void ControllerThreadShutdown(ThreadPool<MockTask, MockProcessor, MockTaskController *> *thread_pool, bool is_shutdown,
267                               bool is_force_shutdown)
268 {
269     if (is_shutdown) {
270         thread_pool->Shutdown(is_force_shutdown);
271     }
272 }
273 
TestThreadPoolWithControllers(size_t number_of_threads_initial,size_t number_of_threads_scaled,bool is_shutdown,bool is_force_shutdown)274 void TestThreadPoolWithControllers(size_t number_of_threads_initial, size_t number_of_threads_scaled, bool is_shutdown,
275                                    bool is_force_shutdown)
276 {
277     constexpr size_t NUMBER_OF_TASKS = MockThreadPoolTest::TASK_NUMBER / 4;
278     constexpr size_t QUEUE_SIZE = 16;
279 
280     auto allocator = Runtime::GetCurrent()->GetInternalAllocator();
281     auto queue = allocator->New<MockQueue>(allocator, QUEUE_SIZE);
282     auto controller = allocator->New<MockTaskController>();
283     auto thread_pool = allocator->New<ThreadPool<MockTask, MockProcessor, MockTaskController *>>(
284         allocator, queue, controller, number_of_threads_initial, "Test thread");
285 
286     std::thread controller_thread_put_task_1(ControllerThreadPutTask, thread_pool, NUMBER_OF_TASKS);
287     std::thread controller_thread_put_task_2(ControllerThreadPutTask, thread_pool, NUMBER_OF_TASKS);
288     std::thread controller_thread_try_put_task_1(ControllerThreadTryPutTask, thread_pool, NUMBER_OF_TASKS);
289     std::thread controller_thread_try_put_task_2(ControllerThreadTryPutTask, thread_pool, NUMBER_OF_TASKS);
290     std::thread controller_thread_scale_1(ControllerThreadScale, thread_pool, number_of_threads_scaled);
291     std::thread controller_thread_scale_2(ControllerThreadScale, thread_pool,
292                                           number_of_threads_scaled + number_of_threads_initial);
293     std::thread controller_thread_shutdown_1(ControllerThreadShutdown, thread_pool, is_shutdown, is_force_shutdown);
294     std::thread controller_thread_shutdown_2(ControllerThreadShutdown, thread_pool, is_shutdown, is_force_shutdown);
295 
296     // Wait for tasks completion.
297     for (;;) {
298         auto solved_tasks = controller->GetSolvedTasks();
299         size_t rate = static_cast<size_t>((static_cast<float>(solved_tasks) / MockThreadPoolTest::TASK_NUMBER) * 100);
300         LOG(DEBUG, RUNTIME) << "Number of solved tasks is " << solved_tasks << " (" << rate << "%)";
301         if (solved_tasks == MockThreadPoolTest::TASK_NUMBER || !thread_pool->IsActive()) {
302             break;
303         }
304         std::this_thread::sleep_for(std::chrono::milliseconds(10));
305     }
306     controller_thread_put_task_1.join();
307     controller_thread_put_task_2.join();
308     controller_thread_try_put_task_1.join();
309     controller_thread_try_put_task_2.join();
310     controller_thread_scale_1.join();
311     controller_thread_scale_2.join();
312     controller_thread_shutdown_1.join();
313     controller_thread_shutdown_2.join();
314 
315     allocator->Delete(thread_pool);
316     allocator->Delete(controller);
317     allocator->Delete(queue);
318 }
319 
TEST_F(MockThreadPoolTest,Controllers)320 TEST_F(MockThreadPoolTest, Controllers)
321 {
322     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
323     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
324     constexpr bool IS_SHUTDOWN = false;
325     constexpr bool IS_FORCE_SHUTDOWN = false;
326     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
327 }
328 
TEST_F(MockThreadPoolTest,ControllersShutdown)329 TEST_F(MockThreadPoolTest, ControllersShutdown)
330 {
331     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
332     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
333     constexpr bool IS_SHUTDOWN = true;
334     constexpr bool IS_FORCE_SHUTDOWN = false;
335     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
336 }
337 
TEST_F(MockThreadPoolTest,ControllersForceShutdown)338 TEST_F(MockThreadPoolTest, ControllersForceShutdown)
339 {
340     constexpr size_t NUMBER_OF_THREADS_INITIAL = 8;
341     constexpr size_t NUMBER_OF_THREADS_SCALED = 4;
342     constexpr bool IS_SHUTDOWN = true;
343     constexpr bool IS_FORCE_SHUTDOWN = true;
344     TestThreadPoolWithControllers(NUMBER_OF_THREADS_INITIAL, NUMBER_OF_THREADS_SCALED, IS_SHUTDOWN, IS_FORCE_SHUTDOWN);
345 }
346 
347 }  // namespace panda::test
348