1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17 namespace OHOS {
18 namespace Sharing {
19 constexpr uint32_t MAX_THREAD_NUM = 50;
20
TaskPool()21 TaskPool::TaskPool()
22 {
23 SHARING_LOGD("trace.");
24 }
25
~TaskPool()26 TaskPool::~TaskPool()
27 {
28 SHARING_LOGD("trace.");
29 if (isRunning_) {
30 Stop();
31 }
32 }
33
Start(int32_t threadsNum)34 int32_t TaskPool::Start(int32_t threadsNum)
35 {
36 SHARING_LOGD("trace.");
37 if (!threads_.empty()) {
38 SHARING_LOGE("Before start, theads is not empty.");
39 return -1;
40 }
41
42 if (threadsNum <= 0 || threadsNum >= MAX_THREAD_NUM) {
43 SHARING_LOGE("threadNum is illegal, %{public}d.", threadsNum);
44 return -1;
45 }
46
47 isRunning_ = true;
48 threads_.reserve(threadsNum);
49 for (int32_t i = 0; i < threadsNum; ++i) {
50 threads_.push_back(std::thread(&TaskPool::TaskMainWorker, this));
51 std::string name = "taskpool" + std::to_string(i);
52 pthread_setname_np(threads_.back().native_handle(), name.c_str());
53 }
54
55 return 0;
56 }
57
Stop()58 void TaskPool::Stop()
59 {
60 SHARING_LOGD("trace.");
61 {
62 std::unique_lock<std::mutex> lock(taskMutex_);
63 isRunning_ = false;
64 hasTask_.notify_all();
65 }
66
67 for (auto &e : threads_) {
68 e.join();
69 }
70 }
71
PushTask(std::packaged_task<BindedTask> & task)72 void TaskPool::PushTask(std::packaged_task<BindedTask> &task)
73 {
74 SHARING_LOGD("trace.");
75 if (threads_.empty()) {
76 } else {
77 std::unique_lock<std::mutex> lock(taskMutex_);
78 while (IsOverload()) {
79 SHARING_LOGE("task pool is over load.");
80 acceptNewTask_.wait(lock);
81 }
82 tasks_.emplace_back(std::move(task));
83 hasTask_.notify_one();
84 }
85 }
86
IsOverload() const87 bool TaskPool::IsOverload() const
88 {
89 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
90 }
91
TaskMainWorker()92 void TaskPool::TaskMainWorker()
93 {
94 SHARING_LOGD("trace.");
95 std::unique_lock<std::mutex> lock(taskMutex_);
96 while (isRunning_) {
97 if (tasks_.empty() && isRunning_) {
98 hasTask_.wait(lock);
99 } else {
100 std::packaged_task<BindedTask> task = std::move(tasks_.front());
101 tasks_.pop_front();
102 acceptNewTask_.notify_one();
103 lock.unlock();
104 SHARING_LOGD("task main worker unlocked.");
105 task();
106 lock.lock();
107 SHARING_LOGD("task main worker locked.");
108 }
109 }
110 }
111
SetTimeoutInterval(uint32_t ms)112 void TaskPool::SetTimeoutInterval(uint32_t ms)
113 {
114 SHARING_LOGD("trace.");
115 timeoutInterval_ = std::chrono::milliseconds(ms);
116 }
117
118 } // namespace Sharing
119 } // namespace OHOS
120