• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PANDA_RUNTIME_THREAD_POOL_H_
17 #define PANDA_RUNTIME_THREAD_POOL_H_
18 
19 #include <atomic>
20 #include "libpandabase/os/mutex.h"
21 #include "libpandabase/os/thread.h"
22 #include "runtime/include/mem/allocator.h"
23 #include "runtime/include/mem/panda_containers.h"
24 #include "runtime/thread_pool_queue.h"
25 
26 static constexpr uint64_t TASK_WAIT_TIMEOUT = 500U;
27 
28 namespace ark {
29 
30 template <typename Task, typename ProcArg>
31 class ProcessorInterface {
32 public:
33     NO_COPY_SEMANTIC(ProcessorInterface);
34     NO_MOVE_SEMANTIC(ProcessorInterface);
35 
36     ProcessorInterface() = default;
37     virtual ~ProcessorInterface() = default;
38 
39     explicit ProcessorInterface(ProcArg args);
40     virtual bool Process(Task &&) = 0;
41     // before main loop
Init()42     virtual bool Init()
43     {
44         return true;
45     }
46     // before thread exit
Destroy()47     virtual bool Destroy()
48     {
49         return true;
50     }
51 
SetTid(int tid)52     void SetTid(int tid)
53     {
54         // Atomic with release order reason: threads should see correct initialization
55         tid_.store(tid, std::memory_order_release);
56     }
57 
IsStarted()58     bool IsStarted() const
59     {
60         return tid_ != -1;
61     }
62 
GetTid()63     int GetTid() const
64     {
65         // Atomic with acquire order reason: load should get the correct value
66         return tid_.load(std::memory_order_acquire);
67     }
68 
69 private:
70     std::atomic_int tid_ {-1};
71 };
72 
73 class WorkerCreationInterface {
74 public:
75     NO_COPY_SEMANTIC(WorkerCreationInterface);
76     NO_MOVE_SEMANTIC(WorkerCreationInterface);
77     WorkerCreationInterface() = default;
78     virtual ~WorkerCreationInterface() = default;
AttachWorker(bool helperThread)79     virtual void AttachWorker([[maybe_unused]] bool helperThread)
80     {
81         // do nothing here
82     }
DetachWorker(bool helperThread)83     virtual void DetachWorker([[maybe_unused]] bool helperThread)
84     {
85         // do nothing here
86     }
87 };
88 
89 template <typename Task, typename Proc, typename ProcArg>
90 class ThreadPool {
91 public:
92     NO_COPY_SEMANTIC(ThreadPool);
93     NO_MOVE_SEMANTIC(ThreadPool);
94 
95     explicit ThreadPool(mem::InternalAllocatorPtr allocator, TaskQueueInterface<Task> *queue, ProcArg args,
96                         size_t nThreads = 1, const char *threadName = nullptr,
97                         WorkerCreationInterface *workerCreationInterface = nullptr)
allocator_(allocator)98         : allocator_(allocator),
99           queue_(queue),
100           workers_(allocator_->Adapter()),
101           procs_(allocator_->Adapter()),
102           args_(args),
103           isThreadActive_(allocator_->Adapter()),
104           workerCreationInterface_(workerCreationInterface)
105     {
106         isActive_ = true;
107         threadName_ = threadName;
108         Scale(nThreads);
109     }
110 
~ThreadPool()111     ~ThreadPool()
112     {
113         os::memory::LockHolder lock(scaleLock_);
114         DeactivateWorkers();
115         WaitForWorkers();
116     }
117 
Scale(size_t newNThreads)118     void Scale(size_t newNThreads)
119     {
120         os::memory::LockHolder scaleLock(scaleLock_);
121         if (!IsActive()) {
122             return;
123         }
124         LOG(DEBUG, RUNTIME) << "Scale thread pool for " << newNThreads << " new threads";
125         if (newNThreads <= 0) {
126             LOG(ERROR, RUNTIME) << "Incorrect number of threads " << newNThreads << " for thread pool";
127             return;
128         }
129         if (newNThreads > threadsCounter_) {
130             // Need to add new threads.
131             {
132                 os::memory::LockHolder queueLock(queueLock_);
133                 isThreadActive_.resize(newNThreads);
134             }
135             for (size_t i = threadsCounter_; i < newNThreads; i++) {
136                 CreateNewThread(i);
137             }
138         } else if (newNThreads < threadsCounter_) {
139             // Need to remove threads.
140             for (size_t i = threadsCounter_ - 1; i >= newNThreads; i--) {
141                 StopWorker(workers_.back(), i);
142                 workers_.pop_back();
143                 allocator_->Delete(procs_.back());
144                 procs_.pop_back();
145             }
146             {
147                 os::memory::LockHolder queueLock(queueLock_);
148                 isThreadActive_.resize(newNThreads);
149             }
150         } else {
151             // Same number of threads - do nothing.
152         }
153         threadsCounter_ = newNThreads;
154         LOG(DEBUG, RUNTIME) << "Scale has been completed";
155     }
156 
Help()157     void Help()
158     {
159         // Disallow scaling while the main thread processes the queue
160         os::memory::LockHolder scaleLock(scaleLock_);
161         if (!IsActive()) {
162             return;
163         }
164         auto *proc = allocator_->New<Proc>(args_);
165         ASSERT(proc != nullptr);
166         WorkerCreationInterface *iface = GetWorkerCreationInterface();
167         if (iface != nullptr) {
168             iface->AttachWorker(true);
169         }
170         if (!proc->Init()) {
171             LOG(FATAL, RUNTIME) << "Cannot initialize worker thread";
172         }
173         proc->SetTid(os::thread::GetCurrentThreadId());
174         while (true) {
175             Task task;
176             {
177                 os::memory::LockHolder lock(queueLock_);
178                 task = std::move(queue_->GetTask());
179             }
180             if (task.IsEmpty()) {
181                 break;
182             }
183             SignalTask();
184             proc->Process(std::move(task));
185         }
186         if (!proc->Destroy()) {
187             LOG(FATAL, RUNTIME) << "Cannot destroy worker thread";
188         }
189         if (iface != nullptr) {
190             iface->DetachWorker(true);
191         }
192         allocator_->Delete(proc);
193     }
194 
TryPutTask(Task && task)195     bool TryPutTask(Task &&task)
196     {
197         bool res = false;
198         {
199             os::memory::LockHolder lock(queueLock_);
200             if (!isActive_) {
201                 return false;
202             }
203             res = queue_->TryAddTask(std::move(task));
204         }
205         if (res) {
206             // Task was added.
207             SignalTask();
208         }
209         return res;
210     }
211 
PutTask(Task && task)212     bool PutTask(Task &&task)
213     {
214         {
215             os::memory::LockHolder lock(queueLock_);
216             if (!isActive_) {
217                 return false;
218             }
219             while (queue_->IsFull()) {
220                 WaitTask();
221             }
222             queue_->AddTask(std::move(task));
223         }
224         SignalTask();
225         return true;
226     }
227 
IsActive()228     bool IsActive()
229     {
230         os::memory::LockHolder lock(queueLock_);
231         return isActive_;
232     }
233 
234     void Shutdown(bool force = false)
235     {
236         os::memory::LockHolder lock(scaleLock_);
237         DeactivateWorkers();
238         if (force) {
239             // Sync.
240             WaitForWorkers();
241         }
242     }
243 
WaitTask()244     void WaitTask()
245     {
246         condVar_.TimedWait(&queueLock_, TASK_WAIT_TIMEOUT);
247     }
248 
WorkerEntry(ThreadPool<Task,Proc,ProcArg> * threadPool,Proc * proc,int i)249     static void WorkerEntry(ThreadPool<Task, Proc, ProcArg> *threadPool, Proc *proc, int i)
250     {
251         WorkerCreationInterface *iface = threadPool->GetWorkerCreationInterface();
252         if (iface != nullptr) {
253             iface->AttachWorker(false);
254         }
255         if (!proc->Init()) {
256             LOG(FATAL, RUNTIME) << "Cannot initialize worker thread";
257         }
258         proc->SetTid(os::thread::GetCurrentThreadId());
259         while (true) {
260             Task task;
261             {
262                 os::memory::LockHolder lock(threadPool->queueLock_);
263                 if (!threadPool->IsActive(i)) {
264                     break;
265                 }
266                 task = std::move(threadPool->queue_->GetTask());
267                 if (task.IsEmpty()) {
268                     threadPool->WaitTask();
269                     continue;
270                 }
271             }
272             threadPool->SignalTask();
273             LOG(DEBUG, RUNTIME) << "Worker " << i << " started to process task";
274             proc->Process(std::move(task));
275         }
276         if (!proc->Destroy()) {
277             LOG(FATAL, RUNTIME) << "Cannot destroy worker thread";
278         }
279         if (iface != nullptr) {
280             iface->DetachWorker(false);
281         }
282         LOG(DEBUG, RUNTIME) << "Worker " << i << " is finished";
283     }
284 
285     using ProcVisitor = void (*)(Proc *p, size_t data);
286 
EnumerateProcs(ProcVisitor visitor,size_t data)287     void EnumerateProcs(ProcVisitor visitor, size_t data)
288     {
289         os::memory::LockHolder lock(scaleLock_);
290         for (auto it : procs_) {
291             visitor(it, data);
292         }
293     }
294 
295 private:
SignalTask()296     void SignalTask()
297     {
298         condVar_.Signal();
299     }
300 
SignalAllTasks()301     void SignalAllTasks()
302     {
303         condVar_.SignalAll();
304     }
305 
DeactivateWorkers()306     void DeactivateWorkers()
307     {
308         os::memory::LockHolder lock(queueLock_);
309         isActive_ = false;
310         queue_->Finalize();
311         SignalAllTasks();
312         // NOLINTNEXTLINE(modernize-loop-convert)
313         for (size_t i = 0; i < isThreadActive_.size(); i++) {
314             isThreadActive_.at(i) = false;
315         }
316     }
317 
IsActive(int i)318     bool IsActive(int i) REQUIRES(queueLock_)
319     {
320         return isThreadActive_.at(i);
321     }
322 
WaitForWorkers()323     void WaitForWorkers() REQUIRES(scaleLock_)
324     {
325         for (auto worker : workers_) {
326             StopWorker(worker);
327         }
328         {
329             os::memory::LockHolder lock(queueLock_);
330             isThreadActive_.clear();
331         }
332         workers_.clear();
333         for (auto proc : procs_) {
334             allocator_->Delete(proc);
335         }
336         procs_.clear();
337     }
338 
REQUIRES(scaleLock_)339     void StopWorker(std::thread *worker, size_t threadId = 0) REQUIRES(scaleLock_)
340     {
341         if (worker != nullptr) {
342             if (threadId != 0) {
343                 os::memory::LockHolder lock(queueLock_);
344                 isThreadActive_.at(threadId) = false;
345             }
346             SignalAllTasks();
347             worker->join();
348             allocator_->Delete(worker);
349             worker = nullptr;
350         }
351     }
352 
CreateNewThread(int i)353     void CreateNewThread(int i) REQUIRES(scaleLock_)
354     {
355         {
356             os::memory::LockHolder lock(queueLock_);
357             isThreadActive_.at(i) = true;
358         }
359         auto proc = allocator_->New<Proc>(args_);
360         auto worker = allocator_->New<std::thread>(WorkerEntry, this, proc, i);
361         if (worker == nullptr) {
362             LOG(FATAL, RUNTIME) << "Cannot create a worker thread";
363         }
364         if (threadName_ != nullptr) {
365             int res = os::thread::SetThreadName(worker->native_handle(), threadName_);
366             if (res != 0) {
367                 LOG(ERROR, RUNTIME) << "Failed to set a name for the worker thread";
368             }
369         }
370         workers_.emplace_back(worker);
371         procs_.emplace_back(proc);
372     }
373 
GetWorkerCreationInterface()374     WorkerCreationInterface *GetWorkerCreationInterface()
375     {
376         return workerCreationInterface_;
377     }
378 
379     mem::InternalAllocatorPtr allocator_;
380     os::memory::ConditionVariable condVar_;
381     TaskQueueInterface<Task> *queue_ GUARDED_BY(queueLock_);
382     PandaList<std::thread *> workers_ GUARDED_BY(scaleLock_);
383     size_t threadsCounter_ GUARDED_BY(scaleLock_) = 0;
384     PandaList<Proc *> procs_ GUARDED_BY(scaleLock_);
385     ProcArg args_;
386     bool isActive_ GUARDED_BY(queueLock_) = false;
387     os::memory::Mutex queueLock_;
388     os::memory::Mutex scaleLock_;
389     PandaVector<bool> isThreadActive_ GUARDED_BY(queueLock_);
390     WorkerCreationInterface *workerCreationInterface_;
391     const char *threadName_;
392 };
393 
394 }  // namespace ark
395 
396 #endif  // PANDA_RUNTIME_THREAD_POOL_H_
397