1 /* 2 * Copyright (c) 2021-2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "thread_pool.h" 17 18 #include "restool_errors.h" 19 #include <iostream> 20 #include <string> 21 22 namespace OHOS { 23 namespace Global { 24 namespace Restool { 25 using namespace std; 26 ThreadPool()27ThreadPool::ThreadPool() 28 {} 29 GetInstance()30ThreadPool &ThreadPool::GetInstance() 31 { 32 static ThreadPool pool; 33 return pool; 34 } 35 Start(const size_t & threadCount)36uint32_t ThreadPool::Start(const size_t &threadCount) 37 { 38 if (!workerThreads_.empty()) { 39 cout << "Warning: ThreadPool is already started." << endl; 40 return RESTOOL_SUCCESS; 41 } 42 size_t hardwareCount = std::thread::hardware_concurrency(); 43 cout << "Info: hardware concurrency count is : " << hardwareCount << endl; 44 size_t count = threadCount <= 0 ? (hardwareCount <= 0 ? DEFAULT_POOL_SIZE : hardwareCount) : threadCount; 45 if (count == 1) { 46 count++; 47 } 48 cout << "Info: thread count is : " << count << endl; 49 running_ = true; 50 workerThreads_.reserve(count); 51 for (size_t i = 0; i < count; ++i) { 52 workerThreads_.emplace_back([this] { this->WorkInThread(); }); 53 } 54 cout << "Info: thread pool is started" << endl; 55 return RESTOOL_SUCCESS; 56 } 57 Stop()58void ThreadPool::Stop() 59 { 60 { 61 std::unique_lock<std::mutex> lock(queueMutex_); 62 running_ = false; 63 } 64 condition_.notify_all(); 65 for (std::thread &worker : workerThreads_) { 66 if (worker.joinable()) { 67 worker.join(); 68 } 69 } 70 cout << "Info: thread pool is stopped" << endl; 71 } 72 73 ~ThreadPool()74ThreadPool::~ThreadPool() 75 { 76 if (running_) { 77 Stop(); 78 } 79 } 80 WorkInThread()81void ThreadPool::WorkInThread() 82 { 83 while (this->running_) { 84 std::function<void()> task; 85 { 86 std::unique_lock<std::mutex> lock(this->queueMutex_); 87 // wake up when there's a task or when the pool is stopped 88 this->condition_.wait(lock, [this] { return !this->running_ || !this->tasks_.empty(); }); 89 if (!this->running_) { 90 // exit thread when the pool is stopped 91 return; 92 } 93 if (!this->tasks_.empty()) { 94 task = std::move(this->tasks_.front()); 95 this->tasks_.pop(); 96 } 97 } 98 if (task) { 99 task(); 100 } 101 } 102 } 103 } // namespace Restool 104 } // namespace Global 105 } // namespace OHOS 106