• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "thread_pool.h"
17 
18 #include "restool_errors.h"
19 #include <iostream>
20 #include <string>
21 
22 namespace OHOS {
23 namespace Global {
24 namespace Restool {
25 using namespace std;
26 
ThreadPool()27 ThreadPool::ThreadPool()
28 {}
29 
GetInstance()30 ThreadPool &ThreadPool::GetInstance()
31 {
32     static ThreadPool pool;
33     return pool;
34 }
35 
Start(const size_t & threadCount)36 uint32_t ThreadPool::Start(const size_t &threadCount)
37 {
38     if (!workerThreads_.empty()) {
39         cout << "Warning: ThreadPool is already started." << endl;
40         return RESTOOL_SUCCESS;
41     }
42     size_t hardwareCount = std::thread::hardware_concurrency();
43     cout << "Info: hardware concurrency count is : " << hardwareCount << endl;
44     size_t count = threadCount <= 0 ? (hardwareCount <= 0 ? DEFAULT_POOL_SIZE : hardwareCount) : threadCount;
45     if (count == 1) {
46         count++;
47     }
48     cout << "Info: thread count is : " << count << endl;
49     running_ = true;
50     workerThreads_.reserve(count);
51     for (size_t i = 0; i < count; ++i) {
52         workerThreads_.emplace_back([this] { this->WorkInThread(); });
53     }
54     cout << "Info: thread pool is started" << endl;
55     return RESTOOL_SUCCESS;
56 }
57 
Stop()58 void ThreadPool::Stop()
59 {
60     {
61         std::unique_lock<std::mutex> lock(queueMutex_);
62         running_ = false;
63     }
64     condition_.notify_all();
65     for (std::thread &worker : workerThreads_) {
66         if (worker.joinable()) {
67             worker.join();
68         }
69     }
70     cout << "Info: thread pool is stopped" << endl;
71 }
72 
73 
~ThreadPool()74 ThreadPool::~ThreadPool()
75 {
76     if (running_) {
77         Stop();
78     }
79 }
80 
WorkInThread()81 void ThreadPool::WorkInThread()
82 {
83     while (this->running_) {
84         std::function<void()> task;
85         {
86             std::unique_lock<std::mutex> lock(this->queueMutex_);
87             // wake up when there's a task or when the pool is stopped
88             this->condition_.wait(lock, [this] { return !this->running_ || !this->tasks_.empty(); });
89             if (!this->running_) {
90                 // exit thread when the pool is stopped
91                 return;
92             }
93             if (!this->tasks_.empty()) {
94                 task = std::move(this->tasks_.front());
95                 this->tasks_.pop();
96             }
97         }
98         if (task) {
99             task();
100         }
101     }
102 }
103 } // namespace Restool
104 } // namespace Global
105 } // namespace OHOS
106