1 /** 2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_RUNTIME_THREAD_POOL_H_ 17 #define PANDA_RUNTIME_THREAD_POOL_H_ 18 19 #include <atomic> 20 #include "libpandabase/os/mutex.h" 21 #include "libpandabase/os/thread.h" 22 #include "runtime/include/mem/allocator.h" 23 #include "runtime/include/mem/panda_containers.h" 24 #include "runtime/thread_pool_queue.h" 25 26 static constexpr uint64_t TASK_WAIT_TIMEOUT = 500U; 27 28 namespace ark { 29 30 template <typename Task, typename ProcArg> 31 class ProcessorInterface { 32 public: 33 NO_COPY_SEMANTIC(ProcessorInterface); 34 NO_MOVE_SEMANTIC(ProcessorInterface); 35 36 ProcessorInterface() = default; 37 virtual ~ProcessorInterface() = default; 38 39 explicit ProcessorInterface(ProcArg args); 40 virtual bool Process(Task &&) = 0; 41 // before main loop Init()42 virtual bool Init() 43 { 44 return true; 45 } 46 // before thread exit Destroy()47 virtual bool Destroy() 48 { 49 return true; 50 } 51 SetTid(int tid)52 void SetTid(int tid) 53 { 54 // Atomic with release order reason: threads should see correct initialization 55 tid_.store(tid, std::memory_order_release); 56 } 57 IsStarted()58 bool IsStarted() const 59 { 60 return tid_ != -1; 61 } 62 GetTid()63 int GetTid() const 64 { 65 // Atomic with acquire order reason: load should get the correct value 66 return tid_.load(std::memory_order_acquire); 67 } 68 69 private: 70 std::atomic_int tid_ {-1}; 71 }; 72 73 class WorkerCreationInterface { 74 public: 75 NO_COPY_SEMANTIC(WorkerCreationInterface); 76 NO_MOVE_SEMANTIC(WorkerCreationInterface); 77 WorkerCreationInterface() = default; 78 virtual ~WorkerCreationInterface() = default; AttachWorker(bool helperThread)79 virtual void AttachWorker([[maybe_unused]] bool helperThread) 80 { 81 // do nothing here 82 } DetachWorker(bool helperThread)83 virtual void DetachWorker([[maybe_unused]] bool helperThread) 84 { 85 // do nothing here 86 } 87 }; 88 89 template <typename Task, typename Proc, typename ProcArg> 90 class ThreadPool { 91 public: 92 NO_COPY_SEMANTIC(ThreadPool); 93 NO_MOVE_SEMANTIC(ThreadPool); 94 95 explicit ThreadPool(mem::InternalAllocatorPtr allocator, TaskQueueInterface<Task> *queue, ProcArg args, 96 size_t nThreads = 1, const char *threadName = nullptr, 97 WorkerCreationInterface *workerCreationInterface = nullptr) allocator_(allocator)98 : allocator_(allocator), 99 queue_(queue), 100 workers_(allocator_->Adapter()), 101 procs_(allocator_->Adapter()), 102 args_(args), 103 isThreadActive_(allocator_->Adapter()), 104 workerCreationInterface_(workerCreationInterface) 105 { 106 isActive_ = true; 107 threadName_ = threadName; 108 Scale(nThreads); 109 } 110 ~ThreadPool()111 ~ThreadPool() 112 { 113 os::memory::LockHolder lock(scaleLock_); 114 DeactivateWorkers(); 115 WaitForWorkers(); 116 } 117 Scale(size_t newNThreads)118 void Scale(size_t newNThreads) 119 { 120 os::memory::LockHolder scaleLock(scaleLock_); 121 if (!IsActive()) { 122 return; 123 } 124 LOG(DEBUG, RUNTIME) << "Scale thread pool for " << newNThreads << " new threads"; 125 if (newNThreads <= 0) { 126 LOG(ERROR, RUNTIME) << "Incorrect number of threads " << newNThreads << " for thread pool"; 127 return; 128 } 129 if (newNThreads > threadsCounter_) { 130 // Need to add new threads. 131 { 132 os::memory::LockHolder queueLock(queueLock_); 133 isThreadActive_.resize(newNThreads); 134 } 135 for (size_t i = threadsCounter_; i < newNThreads; i++) { 136 CreateNewThread(i); 137 } 138 } else if (newNThreads < threadsCounter_) { 139 // Need to remove threads. 140 for (size_t i = threadsCounter_ - 1; i >= newNThreads; i--) { 141 StopWorker(workers_.back(), i); 142 workers_.pop_back(); 143 allocator_->Delete(procs_.back()); 144 procs_.pop_back(); 145 } 146 { 147 os::memory::LockHolder queueLock(queueLock_); 148 isThreadActive_.resize(newNThreads); 149 } 150 } else { 151 // Same number of threads - do nothing. 152 } 153 threadsCounter_ = newNThreads; 154 LOG(DEBUG, RUNTIME) << "Scale has been completed"; 155 } 156 Help()157 void Help() 158 { 159 // Disallow scaling while the main thread processes the queue 160 os::memory::LockHolder scaleLock(scaleLock_); 161 if (!IsActive()) { 162 return; 163 } 164 auto *proc = allocator_->New<Proc>(args_); 165 ASSERT(proc != nullptr); 166 WorkerCreationInterface *iface = GetWorkerCreationInterface(); 167 if (iface != nullptr) { 168 iface->AttachWorker(true); 169 } 170 if (!proc->Init()) { 171 LOG(FATAL, RUNTIME) << "Cannot initialize worker thread"; 172 } 173 proc->SetTid(os::thread::GetCurrentThreadId()); 174 while (true) { 175 Task task; 176 { 177 os::memory::LockHolder lock(queueLock_); 178 task = std::move(queue_->GetTask()); 179 } 180 if (task.IsEmpty()) { 181 break; 182 } 183 SignalTask(); 184 proc->Process(std::move(task)); 185 } 186 if (!proc->Destroy()) { 187 LOG(FATAL, RUNTIME) << "Cannot destroy worker thread"; 188 } 189 if (iface != nullptr) { 190 iface->DetachWorker(true); 191 } 192 allocator_->Delete(proc); 193 } 194 TryPutTask(Task && task)195 bool TryPutTask(Task &&task) 196 { 197 bool res = false; 198 { 199 os::memory::LockHolder lock(queueLock_); 200 if (!isActive_) { 201 return false; 202 } 203 res = queue_->TryAddTask(std::move(task)); 204 } 205 if (res) { 206 // Task was added. 207 SignalTask(); 208 } 209 return res; 210 } 211 PutTask(Task && task)212 bool PutTask(Task &&task) 213 { 214 { 215 os::memory::LockHolder lock(queueLock_); 216 if (!isActive_) { 217 return false; 218 } 219 while (queue_->IsFull()) { 220 WaitTask(); 221 } 222 queue_->AddTask(std::move(task)); 223 } 224 SignalTask(); 225 return true; 226 } 227 IsActive()228 bool IsActive() 229 { 230 os::memory::LockHolder lock(queueLock_); 231 return isActive_; 232 } 233 234 void Shutdown(bool force = false) 235 { 236 os::memory::LockHolder lock(scaleLock_); 237 DeactivateWorkers(); 238 if (force) { 239 // Sync. 240 WaitForWorkers(); 241 } 242 } 243 WaitTask()244 void WaitTask() 245 { 246 condVar_.TimedWait(&queueLock_, TASK_WAIT_TIMEOUT); 247 } 248 WorkerEntry(ThreadPool<Task,Proc,ProcArg> * threadPool,Proc * proc,int i)249 static void WorkerEntry(ThreadPool<Task, Proc, ProcArg> *threadPool, Proc *proc, int i) 250 { 251 WorkerCreationInterface *iface = threadPool->GetWorkerCreationInterface(); 252 if (iface != nullptr) { 253 iface->AttachWorker(false); 254 } 255 if (!proc->Init()) { 256 LOG(FATAL, RUNTIME) << "Cannot initialize worker thread"; 257 } 258 proc->SetTid(os::thread::GetCurrentThreadId()); 259 while (true) { 260 Task task; 261 { 262 os::memory::LockHolder lock(threadPool->queueLock_); 263 if (!threadPool->IsActive(i)) { 264 break; 265 } 266 task = std::move(threadPool->queue_->GetTask()); 267 if (task.IsEmpty()) { 268 threadPool->WaitTask(); 269 continue; 270 } 271 } 272 threadPool->SignalTask(); 273 LOG(DEBUG, RUNTIME) << "Worker " << i << " started to process task"; 274 proc->Process(std::move(task)); 275 } 276 if (!proc->Destroy()) { 277 LOG(FATAL, RUNTIME) << "Cannot destroy worker thread"; 278 } 279 if (iface != nullptr) { 280 iface->DetachWorker(false); 281 } 282 LOG(DEBUG, RUNTIME) << "Worker " << i << " is finished"; 283 } 284 285 using ProcVisitor = void (*)(Proc *p, size_t data); 286 EnumerateProcs(ProcVisitor visitor,size_t data)287 void EnumerateProcs(ProcVisitor visitor, size_t data) 288 { 289 os::memory::LockHolder lock(scaleLock_); 290 for (auto it : procs_) { 291 visitor(it, data); 292 } 293 } 294 295 private: SignalTask()296 void SignalTask() 297 { 298 condVar_.Signal(); 299 } 300 SignalAllTasks()301 void SignalAllTasks() 302 { 303 condVar_.SignalAll(); 304 } 305 DeactivateWorkers()306 void DeactivateWorkers() 307 { 308 os::memory::LockHolder lock(queueLock_); 309 isActive_ = false; 310 queue_->Finalize(); 311 SignalAllTasks(); 312 // NOLINTNEXTLINE(modernize-loop-convert) 313 for (size_t i = 0; i < isThreadActive_.size(); i++) { 314 isThreadActive_.at(i) = false; 315 } 316 } 317 IsActive(int i)318 bool IsActive(int i) REQUIRES(queueLock_) 319 { 320 return isThreadActive_.at(i); 321 } 322 WaitForWorkers()323 void WaitForWorkers() REQUIRES(scaleLock_) 324 { 325 for (auto worker : workers_) { 326 StopWorker(worker); 327 } 328 { 329 os::memory::LockHolder lock(queueLock_); 330 isThreadActive_.clear(); 331 } 332 workers_.clear(); 333 for (auto proc : procs_) { 334 allocator_->Delete(proc); 335 } 336 procs_.clear(); 337 } 338 REQUIRES(scaleLock_)339 void StopWorker(std::thread *worker, size_t threadId = 0) REQUIRES(scaleLock_) 340 { 341 if (worker != nullptr) { 342 if (threadId != 0) { 343 os::memory::LockHolder lock(queueLock_); 344 isThreadActive_.at(threadId) = false; 345 } 346 SignalAllTasks(); 347 worker->join(); 348 allocator_->Delete(worker); 349 worker = nullptr; 350 } 351 } 352 CreateNewThread(int i)353 void CreateNewThread(int i) REQUIRES(scaleLock_) 354 { 355 { 356 os::memory::LockHolder lock(queueLock_); 357 isThreadActive_.at(i) = true; 358 } 359 auto proc = allocator_->New<Proc>(args_); 360 auto worker = allocator_->New<std::thread>(WorkerEntry, this, proc, i); 361 if (worker == nullptr) { 362 LOG(FATAL, RUNTIME) << "Cannot create a worker thread"; 363 } 364 if (threadName_ != nullptr) { 365 int res = os::thread::SetThreadName(worker->native_handle(), threadName_); 366 if (res != 0) { 367 LOG(ERROR, RUNTIME) << "Failed to set a name for the worker thread"; 368 } 369 } 370 workers_.emplace_back(worker); 371 procs_.emplace_back(proc); 372 } 373 GetWorkerCreationInterface()374 WorkerCreationInterface *GetWorkerCreationInterface() 375 { 376 return workerCreationInterface_; 377 } 378 379 mem::InternalAllocatorPtr allocator_; 380 os::memory::ConditionVariable condVar_; 381 TaskQueueInterface<Task> *queue_ GUARDED_BY(queueLock_); 382 PandaList<std::thread *> workers_ GUARDED_BY(scaleLock_); 383 size_t threadsCounter_ GUARDED_BY(scaleLock_) = 0; 384 PandaList<Proc *> procs_ GUARDED_BY(scaleLock_); 385 ProcArg args_; 386 bool isActive_ GUARDED_BY(queueLock_) = false; 387 os::memory::Mutex queueLock_; 388 os::memory::Mutex scaleLock_; 389 PandaVector<bool> isThreadActive_ GUARDED_BY(queueLock_); 390 WorkerCreationInterface *workerCreationInterface_; 391 const char *threadName_; 392 }; 393 394 } // namespace ark 395 396 #endif // PANDA_RUNTIME_THREAD_POOL_H_ 397