• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_RUNTIME_THREAD_POOL_H_
17 #define PANDA_RUNTIME_THREAD_POOL_H_
18 
19 #include "libpandabase/os/mutex.h"
20 #include "runtime/include/mem/allocator.h"
21 #include "runtime/include/mem/panda_containers.h"
22 #include "runtime/thread_pool_queue.h"
23 
24 static constexpr uint64_t TASK_WAIT_TIMEOUT = 500U;
25 
26 namespace panda {
27 
28 template <typename Task, typename ProcArg>
29 class ProcessorInterface {
30 public:
31     NO_COPY_SEMANTIC(ProcessorInterface);
32     NO_MOVE_SEMANTIC(ProcessorInterface);
33 
34     ProcessorInterface() = default;
35     virtual ~ProcessorInterface() = default;
36 
37     explicit ProcessorInterface(ProcArg args);
38     virtual bool Process(Task) = 0;
39     // before main loop
Init()40     virtual bool Init()
41     {
42         return true;
43     }
44     // before thread exit
Destroy()45     virtual bool Destroy()
46     {
47         return true;
48     }
49 };
50 
51 class WorkerCreationInterface {
52 public:
53     NO_COPY_SEMANTIC(WorkerCreationInterface);
54     NO_MOVE_SEMANTIC(WorkerCreationInterface);
55     WorkerCreationInterface() = default;
56     virtual ~WorkerCreationInterface() = default;
AttachWorker(bool helper_thread)57     virtual void AttachWorker([[maybe_unused]] bool helper_thread)
58     {
59         // do nothing here
60     }
DetachWorker(bool helper_thread)61     virtual void DetachWorker([[maybe_unused]] bool helper_thread)
62     {
63         // do nothing here
64     }
65 };
66 
67 template <typename Task, typename Proc, typename ProcArg>
68 class ThreadPool {
69 public:
70     NO_COPY_SEMANTIC(ThreadPool);
71     NO_MOVE_SEMANTIC(ThreadPool);
72 
73     explicit ThreadPool(mem::InternalAllocatorPtr allocator, TaskQueueInterface<Task> *queue, ProcArg args,
74                         size_t n_threads = 1, const char *thread_name = nullptr,
75                         WorkerCreationInterface *worker_creation_interface = nullptr)
allocator_(allocator)76         : allocator_(allocator),
77           queue_(queue),
78           workers_(allocator_->Adapter()),
79           procs_(allocator_->Adapter()),
80           args_(args),
81           is_thread_active_(allocator_->Adapter()),
82           worker_creation_interface_(worker_creation_interface)
83     {
84         is_active_ = true;
85         thread_name_ = thread_name;
86         Scale(n_threads);
87     }
88 
~ThreadPool()89     ~ThreadPool()
90     {
91         os::memory::LockHolder lock(scale_lock_);
92         DeactivateWorkers();
93         WaitForWorkers();
94     }
95 
Scale(size_t new_n_threads)96     void Scale(size_t new_n_threads)
97     {
98         os::memory::LockHolder scale_lock(scale_lock_);
99         if (!IsActive()) {
100             return;
101         }
102         LOG(DEBUG, RUNTIME) << "Scale thread pool for " << new_n_threads << " new threads";
103         if (new_n_threads <= 0) {
104             LOG(ERROR, RUNTIME) << "Incorrect number of threads " << new_n_threads << " for thread pool";
105             return;
106         }
107         if (new_n_threads > threads_counter_) {
108             // Need to add new threads.
109             {
110                 os::memory::LockHolder queue_lock(queue_lock_);
111                 is_thread_active_.resize(new_n_threads);
112             }
113             for (size_t i = threads_counter_; i < new_n_threads; i++) {
114                 CreateNewThread(i);
115             }
116         } else if (new_n_threads < threads_counter_) {
117             // Need to remove threads.
118             for (size_t i = threads_counter_ - 1; i >= new_n_threads; i--) {
119                 StopWorker(workers_.back(), i);
120                 workers_.pop_back();
121                 allocator_->Delete(procs_.back());
122                 procs_.pop_back();
123             }
124             {
125                 os::memory::LockHolder queue_lock(queue_lock_);
126                 is_thread_active_.resize(new_n_threads);
127             }
128         } else {
129             // Same number of threads - do nothing.
130         }
131         threads_counter_ = new_n_threads;
132         LOG(DEBUG, RUNTIME) << "Scale has been completed";
133     }
134 
Help()135     void Help()
136     {
137         // Disallow scaling while the main thread processes the queue
138         os::memory::LockHolder scale_lock(scale_lock_);
139         if (!IsActive()) {
140             return;
141         }
142         auto *proc = allocator_->New<Proc>(args_);
143         ASSERT(proc != nullptr);
144         WorkerCreationInterface *iface = GetWorkerCreationInterface();
145         if (iface != nullptr) {
146             iface->AttachWorker(true);
147         }
148         if (!proc->Init()) {
149             LOG(FATAL, RUNTIME) << "Cannot initialize worker thread";
150         }
151         while (true) {
152             Task task;
153             {
154                 os::memory::LockHolder lock(queue_lock_);
155                 task = queue_->GetTask();
156             }
157             if (task.IsEmpty()) {
158                 break;
159             }
160             SignalTask();
161             proc->Process(task);
162         }
163         if (!proc->Destroy()) {
164             LOG(FATAL, RUNTIME) << "Cannot destroy worker thread";
165         }
166         if (iface != nullptr) {
167             iface->DetachWorker(true);
168         }
169         allocator_->Delete(proc);
170     }
171 
TryPutTask(Task task)172     bool TryPutTask(Task task)
173     {
174         bool res = false;
175         {
176             os::memory::LockHolder lock(queue_lock_);
177             if (!is_active_) {
178                 return false;
179             }
180             res = queue_->TryAddTask(std::move(task));
181         }
182         if (res) {
183             // Task was added.
184             SignalTask();
185         }
186         return res;
187     }
188 
PutTask(Task task)189     bool PutTask(Task task)
190     {
191         {
192             os::memory::LockHolder lock(queue_lock_);
193             if (!is_active_) {
194                 return false;
195             }
196             while (queue_->IsFull()) {
197                 WaitTask();
198             }
199             queue_->AddTask(std::move(task));
200         }
201         SignalTask();
202         return true;
203     }
204 
IsActive()205     bool IsActive()
206     {
207         os::memory::LockHolder lock(queue_lock_);
208         return is_active_;
209     }
210 
211     void Shutdown(bool force = false)
212     {
213         os::memory::LockHolder lock(scale_lock_);
214         DeactivateWorkers();
215         if (force) {
216             // Sync.
217             WaitForWorkers();
218         }
219     }
220 
WaitTask()221     void WaitTask()
222     {
223         cond_var_.TimedWait(&queue_lock_, TASK_WAIT_TIMEOUT);
224     }
225 
WorkerEntry(ThreadPool<Task,Proc,ProcArg> * thread_pool,Proc * proc,int i)226     static void WorkerEntry(ThreadPool<Task, Proc, ProcArg> *thread_pool, Proc *proc, int i)
227     {
228         WorkerCreationInterface *iface = thread_pool->GetWorkerCreationInterface();
229         if (iface != nullptr) {
230             iface->AttachWorker(false);
231         }
232         if (!proc->Init()) {
233             LOG(FATAL, RUNTIME) << "Cannot initialize worker thread";
234         }
235         while (true) {
236             Task task;
237             {
238                 os::memory::LockHolder lock(thread_pool->queue_lock_);
239                 if (!thread_pool->IsActive(i)) {
240                     break;
241                 }
242                 task = std::move(thread_pool->queue_->GetTask());
243                 if (task.IsEmpty()) {
244                     thread_pool->WaitTask();
245                     continue;
246                 }
247             }
248             thread_pool->SignalTask();
249             LOG(DEBUG, RUNTIME) << "Worker " << i << " started to process task";
250             proc->Process(task);
251         }
252         if (!proc->Destroy()) {
253             LOG(FATAL, RUNTIME) << "Cannot destroy worker thread";
254         }
255         if (iface != nullptr) {
256             iface->DetachWorker(false);
257         }
258         LOG(DEBUG, RUNTIME) << "Worker " << i << " is finished";
259     }
260 
261 private:
SignalTask()262     void SignalTask()
263     {
264         cond_var_.Signal();
265     }
266 
SignalAllTasks()267     void SignalAllTasks()
268     {
269         cond_var_.SignalAll();
270     }
271 
DeactivateWorkers()272     void DeactivateWorkers()
273     {
274         os::memory::LockHolder lock(queue_lock_);
275         is_active_ = false;
276         queue_->Finalize();
277         SignalAllTasks();
278         for (size_t i = 0; i < is_thread_active_.size(); i++) {
279             is_thread_active_.at(i) = false;
280         }
281     }
282 
IsActive(int i)283     bool IsActive(int i) REQUIRES(queue_lock_)
284     {
285         return is_thread_active_.at(i);
286     }
287 
WaitForWorkers()288     void WaitForWorkers() REQUIRES(scale_lock_)
289     {
290         for (auto worker : workers_) {
291             StopWorker(worker);
292         }
293         {
294             os::memory::LockHolder lock(queue_lock_);
295             is_thread_active_.clear();
296         }
297         workers_.clear();
298         for (auto proc : procs_) {
299             allocator_->Delete(proc);
300         }
301         procs_.clear();
302     }
303 
REQUIRES(scale_lock_)304     void StopWorker(std::thread *worker, size_t thread_id = 0) REQUIRES(scale_lock_)
305     {
306         if (worker != nullptr) {
307             if (thread_id != 0) {
308                 os::memory::LockHolder lock(queue_lock_);
309                 is_thread_active_.at(thread_id) = false;
310             }
311             SignalAllTasks();
312             worker->join();
313             allocator_->Delete(worker);
314             worker = nullptr;
315         }
316     }
317 
CreateNewThread(int i)318     void CreateNewThread(int i) REQUIRES(scale_lock_)
319     {
320         {
321             os::memory::LockHolder lock(queue_lock_);
322             is_thread_active_.at(i) = true;
323         }
324         auto proc = allocator_->New<Proc>(args_);
325         auto worker = allocator_->New<std::thread>(WorkerEntry, this, proc, i);
326         if (worker == nullptr) {
327             LOG(FATAL, RUNTIME) << "Cannot create a worker thread";
328         }
329         if (thread_name_ != nullptr) {
330             int res = os::thread::SetThreadName(worker->native_handle(), thread_name_);
331             if (res != 0) {
332                 LOG(ERROR, RUNTIME) << "Failed to set a name for the worker thread";
333             }
334         }
335         workers_.emplace_back(worker);
336         procs_.emplace_back(proc);
337     }
338 
GetWorkerCreationInterface()339     WorkerCreationInterface *GetWorkerCreationInterface()
340     {
341         return worker_creation_interface_;
342     }
343 
344     mem::InternalAllocatorPtr allocator_;
345     os::memory::ConditionVariable cond_var_;
346     TaskQueueInterface<Task> *queue_ GUARDED_BY(queue_lock_);
347     PandaList<std::thread *> workers_ GUARDED_BY(scale_lock_);
348     size_t threads_counter_ GUARDED_BY(scale_lock_) = 0;
349     PandaList<Proc *> procs_ GUARDED_BY(scale_lock_);
350     ProcArg args_;
351     bool is_active_ GUARDED_BY(queue_lock_) = false;
352     os::memory::Mutex queue_lock_;
353     os::memory::Mutex scale_lock_;
354     PandaVector<bool> is_thread_active_ GUARDED_BY(queue_lock_);
355     WorkerCreationInterface *worker_creation_interface_;
356     const char *thread_name_;
357 };
358 
359 }  // namespace panda
360 
361 #endif  // PANDA_RUNTIME_THREAD_POOL_H_
362