1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "thread_pool.h" 17 #include "errors.h" 18 #include "utils_log.h" 19 20 namespace OHOS { 21 ThreadPool(const std::string & name)22ThreadPool::ThreadPool(const std::string& name) 23 : myName_(name), maxTaskNum_(0), running_(false) 24 { 25 } 26 ~ThreadPool()27ThreadPool::~ThreadPool() 28 { 29 if (running_) { 30 Stop(); 31 } 32 } 33 Start(int numThreads)34uint32_t ThreadPool::Start(int numThreads) 35 { 36 if (!threads_.empty()) { 37 return ERR_INVALID_OPERATION; 38 } 39 40 if (numThreads <= 0) { 41 return ERR_INVALID_VALUE; 42 } 43 running_ = true; 44 threads_.reserve(numThreads); 45 46 for (int i = 0; i < numThreads; ++i) { 47 std::thread t([this] { this->WorkInThread(); }); 48 // Give the name of ThreadPool to threads created by the ThreadPool. 49 int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str()); 50 if (err != 0) { 51 UTILS_LOGD("Failed to set name to thread. %{public}s", strerror(err)); 52 } 53 threads_.push_back(std::move(t)); 54 } 55 return ERR_OK; 56 } 57 Stop()58void ThreadPool::Stop() 59 { 60 { 61 std::unique_lock<std::mutex> lock(mutex_); 62 running_ = false; 63 hasTaskToDo_.notify_all(); 64 } 65 66 for (auto& e : threads_) { 67 e.join(); 68 } 69 } 70 AddTask(const Task & f)71void ThreadPool::AddTask(const Task &f) 72 { 73 if (threads_.empty()) { 74 f(); 75 } else { 76 std::unique_lock<std::mutex> lock(mutex_); 77 while (Overloaded()) { 78 acceptNewTask_.wait(lock); 79 } 80 81 tasks_.push_back(f); 82 hasTaskToDo_.notify_one(); 83 } 84 } 85 GetCurTaskNum()86size_t ThreadPool::GetCurTaskNum() 87 { 88 std::unique_lock<std::mutex> lock(mutex_); 89 return tasks_.size(); 90 } 91 92 ScheduleTask()93ThreadPool::Task ThreadPool::ScheduleTask() 94 { 95 std::unique_lock<std::mutex> lock(mutex_); 96 while (tasks_.empty() && running_) { 97 hasTaskToDo_.wait(lock); 98 } 99 100 Task task; 101 if (!tasks_.empty()) { 102 task = tasks_.front(); 103 tasks_.pop_front(); 104 105 if (maxTaskNum_ > 0) { 106 acceptNewTask_.notify_one(); 107 } 108 } 109 return task; 110 } 111 Overloaded() const112bool ThreadPool::Overloaded() const 113 { 114 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_); 115 } 116 WorkInThread()117void ThreadPool::WorkInThread() 118 { 119 while (running_) { 120 Task task = ScheduleTask(); 121 if (task) { 122 task(); 123 } 124 } 125 } 126 127 } // namespace OHOS 128