• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "thread_pool.h"
17 #include <sys/time.h>
18 #include <algorithm>
19 #include "thread_utils.h"
20 #include "dp_log.h"
21 
22 namespace OHOS {
23 namespace CameraStandard {
24 namespace DeferredProcessing {
Create(const std::string & name,uint32_t numThreads)25 std::unique_ptr<ThreadPool> ThreadPool::Create(const std::string& name, uint32_t numThreads)
26 {
27     auto pool = std::make_unique<ThreadPool>(name, numThreads);
28     if (pool) {
29         pool->Initialize();
30     }
31     return pool;
32 }
33 
ThreadPool(const std::string & name,uint32_t numThreads)34 ThreadPool::ThreadPool(const std::string& name, uint32_t numThreads)
35     : name_(name), numThreads_(numThreads), workers_(), isStopped_(false), mutex_(), taskCv_(), tasks_()
36 {
37     if (numThreads_ == 0) {
38         numThreads_ = 1;
39     }
40     numThreads_ = std::min(numThreads_, static_cast<uint32_t>(std::thread::hardware_concurrency()));
41     DP_DEBUG_LOG("name: %s, numThreads, orig: %u, new: %u.", name.c_str(), numThreads, numThreads_);
42 }
43 
~ThreadPool()44 ThreadPool::~ThreadPool()
45 {
46     CAMERA_DP_SYNC_TRACE;
47     DP_DEBUG_LOG("name: %s.", name_.c_str());
48     isStopped_ = true;
49     taskCv_.notify_all();
50     for (auto& threadInfo : workers_) {
51         if (threadInfo.thread.joinable()) {
52             DP_DEBUG_LOG("joining thread (%s).", threadInfo.name.c_str());
53             threadInfo.thread.join();
54         }
55     }
56 }
57 
Initialize()58 void ThreadPool::Initialize()
59 {
60     DP_DEBUG_LOG("entered.");
61     workers_.reserve(numThreads_);
62     std::string threadNamePrefix = "DPS_Worker_";
63     for (uint32_t i = 0; i < numThreads_; ++i) {
64         auto threadName = threadNamePrefix + std::to_string(i);
65         workers_.emplace_back(threadName, [this, threadName]() { WorkerLoop(threadName); });
66         SetThreadName(workers_.back().thread.native_handle(), workers_.back().name);
67     }
68     PrintThreadInfo();
69 }
70 
WorkerLoop(const std::string & threadName)71 void ThreadPool::WorkerLoop(const std::string& threadName)
72 {
73     DP_DEBUG_LOG("(%s) entered.", threadName.c_str());
74     while (!isStopped_.load()) {
75         DP_DEBUG_LOG("(%s) task excute start entered.", threadName.c_str());
76         auto task = GetTask();
77         if (task) {
78             DP_DEBUG_LOG("(%s) task excuting entered.", threadName.c_str());
79             task();
80         } else {
81             DP_DEBUG_LOG("empty task.");
82         }
83     }
84     DP_DEBUG_LOG("(%s) exited.", threadName.c_str());
85 }
86 
BeginBackgroundTasks() const87 void ThreadPool::BeginBackgroundTasks() const
88 {
89     DP_DEBUG_LOG("entered.");
90     for (auto& workerInfo : workers_) {
91         SetThreadPriority(workerInfo.thread.native_handle(), PRIORITY_BACKGROUND);
92     }
93 }
94 
EndBackgroundTasks() const95 void ThreadPool::EndBackgroundTasks() const
96 {
97     DP_DEBUG_LOG("entered.");
98     for (auto& workerInfo : workers_) {
99         SetThreadPriority(workerInfo.thread.native_handle(), PRIORITY_NORMAL);
100     }
101 }
102 
SetThreadPoolPriority(int priority)103 void ThreadPool::SetThreadPoolPriority(int priority)
104 {
105     DP_DEBUG_LOG("entered.");
106     for (auto& workerInfo : workers_) {
107         SetThreadPriority(workerInfo.thread.native_handle(), priority);
108     }
109 }
110 
GetThreadPoolPriority() const111 int ThreadPool::GetThreadPoolPriority() const
112 {
113     return GetThreadPriority(workers_[0].thread.native_handle());
114 }
115 
PrintThreadInfo()116 void ThreadPool::PrintThreadInfo()
117 {
118     struct sched_param sch;
119     int policy = -1;
120     for (auto& workerInfo : workers_) {
121         int ret = pthread_getschedparam(workerInfo.thread.native_handle(), &policy, &sch);
122         if (ret == 0) {
123             DP_DEBUG_LOG("thread (%s) priority: %{public}d, policy = %{public}d(0:OTHER, 1:FIFO, 2:RR)",
124                 workerInfo.name.c_str(), sch.sched_priority, policy);
125         } else {
126             DP_DEBUG_LOG("thread (%s) pthread_getschedparam failed, ret = %{public}d.", workerInfo.name.c_str(), ret);
127         }
128     }
129 }
130 
GetTask() const131 ThreadPool::Task ThreadPool::GetTask() const
132 {
133     std::unique_lock<std::mutex> lock(mutex_);
134     taskCv_.wait(lock, [this] { return isStopped_.load() || !tasks_.empty(); });
135     if (isStopped_.load()) {
136         return {};
137     }
138     auto task = std::move(tasks_.front());
139     tasks_.pop_front();
140     return task;
141 }
142 
HasPendingTasks() const143 bool ThreadPool::HasPendingTasks() const
144 {
145     std::unique_lock<std::mutex> lock(mutex_);
146     return !tasks_.empty();
147 }
148 
Submit(Task func,bool isUrgent) const149 bool ThreadPool::Submit(Task func, bool isUrgent) const
150 {
151     DP_DEBUG_LOG("entered.");
152     if (!isStopped_.load()) {
153         {
154             std::unique_lock<std::mutex> lock(mutex_);
155             if (isUrgent) {
156                 tasks_.emplace_front([task = std::move(func)] { task(); });
157             } else {
158                 tasks_.emplace_back([task = std::move(func)] { task(); });
159             }
160         }
161         taskCv_.notify_one();
162     } else {
163         DP_ERR_LOG("failed due to thread pool has been stopped.");
164         return false;
165     }
166     return true;
167 }
168 } // namespace DeferredProcessing
169 } // namespace CameraStandard
170 } // namespace OHOS
171