1 /** 2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef PANDA_RUNTIME_THREAD_POOL_H_ 17 #define PANDA_RUNTIME_THREAD_POOL_H_ 18 19 #include "libpandabase/os/mutex.h" 20 #include "runtime/include/mem/allocator.h" 21 #include "runtime/include/mem/panda_containers.h" 22 #include "runtime/thread_pool_queue.h" 23 24 static constexpr uint64_t TASK_WAIT_TIMEOUT = 500U; 25 26 namespace panda { 27 28 template <typename Task, typename ProcArg> 29 class ProcessorInterface { 30 public: 31 NO_COPY_SEMANTIC(ProcessorInterface); 32 NO_MOVE_SEMANTIC(ProcessorInterface); 33 34 ProcessorInterface() = default; 35 virtual ~ProcessorInterface() = default; 36 37 explicit ProcessorInterface(ProcArg args); 38 virtual bool Process(Task) = 0; 39 // before main loop Init()40 virtual bool Init() 41 { 42 return true; 43 } 44 // before thread exit Destroy()45 virtual bool Destroy() 46 { 47 return true; 48 } 49 }; 50 51 class WorkerCreationInterface { 52 public: 53 NO_COPY_SEMANTIC(WorkerCreationInterface); 54 NO_MOVE_SEMANTIC(WorkerCreationInterface); 55 WorkerCreationInterface() = default; 56 virtual ~WorkerCreationInterface() = default; AttachWorker(bool helper_thread)57 virtual void AttachWorker([[maybe_unused]] bool helper_thread) 58 { 59 // do nothing here 60 } DetachWorker(bool helper_thread)61 virtual void DetachWorker([[maybe_unused]] bool helper_thread) 62 { 63 // do nothing here 64 } 65 }; 66 67 template <typename Task, typename Proc, typename ProcArg> 68 class ThreadPool { 69 public: 70 NO_COPY_SEMANTIC(ThreadPool); 71 NO_MOVE_SEMANTIC(ThreadPool); 72 73 explicit ThreadPool(mem::InternalAllocatorPtr allocator, TaskQueueInterface<Task> *queue, ProcArg args, 74 size_t n_threads = 1, const char *thread_name = nullptr, 75 WorkerCreationInterface *worker_creation_interface = nullptr) allocator_(allocator)76 : allocator_(allocator), 77 queue_(queue), 78 workers_(allocator_->Adapter()), 79 procs_(allocator_->Adapter()), 80 args_(args), 81 is_thread_active_(allocator_->Adapter()), 82 worker_creation_interface_(worker_creation_interface) 83 { 84 is_active_ = true; 85 thread_name_ = thread_name; 86 Scale(n_threads); 87 } 88 ~ThreadPool()89 ~ThreadPool() 90 { 91 os::memory::LockHolder lock(scale_lock_); 92 DeactivateWorkers(); 93 WaitForWorkers(); 94 } 95 Scale(size_t new_n_threads)96 void Scale(size_t new_n_threads) 97 { 98 os::memory::LockHolder scale_lock(scale_lock_); 99 if (!IsActive()) { 100 return; 101 } 102 LOG(DEBUG, RUNTIME) << "Scale thread pool for " << new_n_threads << " new threads"; 103 if (new_n_threads <= 0) { 104 LOG(ERROR, RUNTIME) << "Incorrect number of threads " << new_n_threads << " for thread pool"; 105 return; 106 } 107 if (new_n_threads > threads_counter_) { 108 // Need to add new threads. 109 { 110 os::memory::LockHolder queue_lock(queue_lock_); 111 is_thread_active_.resize(new_n_threads); 112 } 113 for (size_t i = threads_counter_; i < new_n_threads; i++) { 114 CreateNewThread(i); 115 } 116 } else if (new_n_threads < threads_counter_) { 117 // Need to remove threads. 118 for (size_t i = threads_counter_ - 1; i >= new_n_threads; i--) { 119 StopWorker(workers_.back(), i); 120 workers_.pop_back(); 121 allocator_->Delete(procs_.back()); 122 procs_.pop_back(); 123 } 124 { 125 os::memory::LockHolder queue_lock(queue_lock_); 126 is_thread_active_.resize(new_n_threads); 127 } 128 } else { 129 // Same number of threads - do nothing. 130 } 131 threads_counter_ = new_n_threads; 132 LOG(DEBUG, RUNTIME) << "Scale has been completed"; 133 } 134 Help()135 void Help() 136 { 137 // Disallow scaling while the main thread processes the queue 138 os::memory::LockHolder scale_lock(scale_lock_); 139 if (!IsActive()) { 140 return; 141 } 142 auto *proc = allocator_->New<Proc>(args_); 143 ASSERT(proc != nullptr); 144 WorkerCreationInterface *iface = GetWorkerCreationInterface(); 145 if (iface != nullptr) { 146 iface->AttachWorker(true); 147 } 148 if (!proc->Init()) { 149 LOG(FATAL, RUNTIME) << "Cannot initialize worker thread"; 150 } 151 while (true) { 152 Task task; 153 { 154 os::memory::LockHolder lock(queue_lock_); 155 task = queue_->GetTask(); 156 } 157 if (task.IsEmpty()) { 158 break; 159 } 160 SignalTask(); 161 proc->Process(task); 162 } 163 if (!proc->Destroy()) { 164 LOG(FATAL, RUNTIME) << "Cannot destroy worker thread"; 165 } 166 if (iface != nullptr) { 167 iface->DetachWorker(true); 168 } 169 allocator_->Delete(proc); 170 } 171 TryPutTask(Task task)172 bool TryPutTask(Task task) 173 { 174 bool res = false; 175 { 176 os::memory::LockHolder lock(queue_lock_); 177 if (!is_active_) { 178 return false; 179 } 180 res = queue_->TryAddTask(std::move(task)); 181 } 182 if (res) { 183 // Task was added. 184 SignalTask(); 185 } 186 return res; 187 } 188 PutTask(Task task)189 bool PutTask(Task task) 190 { 191 { 192 os::memory::LockHolder lock(queue_lock_); 193 if (!is_active_) { 194 return false; 195 } 196 while (queue_->IsFull()) { 197 WaitTask(); 198 } 199 queue_->AddTask(std::move(task)); 200 } 201 SignalTask(); 202 return true; 203 } 204 IsActive()205 bool IsActive() 206 { 207 os::memory::LockHolder lock(queue_lock_); 208 return is_active_; 209 } 210 211 void Shutdown(bool force = false) 212 { 213 os::memory::LockHolder lock(scale_lock_); 214 DeactivateWorkers(); 215 if (force) { 216 // Sync. 217 WaitForWorkers(); 218 } 219 } 220 WaitTask()221 void WaitTask() 222 { 223 cond_var_.TimedWait(&queue_lock_, TASK_WAIT_TIMEOUT); 224 } 225 WorkerEntry(ThreadPool<Task,Proc,ProcArg> * thread_pool,Proc * proc,int i)226 static void WorkerEntry(ThreadPool<Task, Proc, ProcArg> *thread_pool, Proc *proc, int i) 227 { 228 WorkerCreationInterface *iface = thread_pool->GetWorkerCreationInterface(); 229 if (iface != nullptr) { 230 iface->AttachWorker(false); 231 } 232 if (!proc->Init()) { 233 LOG(FATAL, RUNTIME) << "Cannot initialize worker thread"; 234 } 235 while (true) { 236 Task task; 237 { 238 os::memory::LockHolder lock(thread_pool->queue_lock_); 239 if (!thread_pool->IsActive(i)) { 240 break; 241 } 242 task = std::move(thread_pool->queue_->GetTask()); 243 if (task.IsEmpty()) { 244 thread_pool->WaitTask(); 245 continue; 246 } 247 } 248 thread_pool->SignalTask(); 249 LOG(DEBUG, RUNTIME) << "Worker " << i << " started to process task"; 250 proc->Process(task); 251 } 252 if (!proc->Destroy()) { 253 LOG(FATAL, RUNTIME) << "Cannot destroy worker thread"; 254 } 255 if (iface != nullptr) { 256 iface->DetachWorker(false); 257 } 258 LOG(DEBUG, RUNTIME) << "Worker " << i << " is finished"; 259 } 260 261 private: SignalTask()262 void SignalTask() 263 { 264 cond_var_.Signal(); 265 } 266 SignalAllTasks()267 void SignalAllTasks() 268 { 269 cond_var_.SignalAll(); 270 } 271 DeactivateWorkers()272 void DeactivateWorkers() 273 { 274 os::memory::LockHolder lock(queue_lock_); 275 is_active_ = false; 276 queue_->Finalize(); 277 SignalAllTasks(); 278 for (size_t i = 0; i < is_thread_active_.size(); i++) { 279 is_thread_active_.at(i) = false; 280 } 281 } 282 IsActive(int i)283 bool IsActive(int i) REQUIRES(queue_lock_) 284 { 285 return is_thread_active_.at(i); 286 } 287 WaitForWorkers()288 void WaitForWorkers() REQUIRES(scale_lock_) 289 { 290 for (auto worker : workers_) { 291 StopWorker(worker); 292 } 293 { 294 os::memory::LockHolder lock(queue_lock_); 295 is_thread_active_.clear(); 296 } 297 workers_.clear(); 298 for (auto proc : procs_) { 299 allocator_->Delete(proc); 300 } 301 procs_.clear(); 302 } 303 REQUIRES(scale_lock_)304 void StopWorker(std::thread *worker, size_t thread_id = 0) REQUIRES(scale_lock_) 305 { 306 if (worker != nullptr) { 307 if (thread_id != 0) { 308 os::memory::LockHolder lock(queue_lock_); 309 is_thread_active_.at(thread_id) = false; 310 } 311 SignalAllTasks(); 312 worker->join(); 313 allocator_->Delete(worker); 314 worker = nullptr; 315 } 316 } 317 CreateNewThread(int i)318 void CreateNewThread(int i) REQUIRES(scale_lock_) 319 { 320 { 321 os::memory::LockHolder lock(queue_lock_); 322 is_thread_active_.at(i) = true; 323 } 324 auto proc = allocator_->New<Proc>(args_); 325 auto worker = allocator_->New<std::thread>(WorkerEntry, this, proc, i); 326 if (worker == nullptr) { 327 LOG(FATAL, RUNTIME) << "Cannot create a worker thread"; 328 } 329 if (thread_name_ != nullptr) { 330 int res = os::thread::SetThreadName(worker->native_handle(), thread_name_); 331 if (res != 0) { 332 LOG(ERROR, RUNTIME) << "Failed to set a name for the worker thread"; 333 } 334 } 335 workers_.emplace_back(worker); 336 procs_.emplace_back(proc); 337 } 338 GetWorkerCreationInterface()339 WorkerCreationInterface *GetWorkerCreationInterface() 340 { 341 return worker_creation_interface_; 342 } 343 344 mem::InternalAllocatorPtr allocator_; 345 os::memory::ConditionVariable cond_var_; 346 TaskQueueInterface<Task> *queue_ GUARDED_BY(queue_lock_); 347 PandaList<std::thread *> workers_ GUARDED_BY(scale_lock_); 348 size_t threads_counter_ GUARDED_BY(scale_lock_) = 0; 349 PandaList<Proc *> procs_ GUARDED_BY(scale_lock_); 350 ProcArg args_; 351 bool is_active_ GUARDED_BY(queue_lock_) = false; 352 os::memory::Mutex queue_lock_; 353 os::memory::Mutex scale_lock_; 354 PandaVector<bool> is_thread_active_ GUARDED_BY(queue_lock_); 355 WorkerCreationInterface *worker_creation_interface_; 356 const char *thread_name_; 357 }; 358 359 } // namespace panda 360 361 #endif // PANDA_RUNTIME_THREAD_POOL_H_ 362