1 /*
2 * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17 namespace OHOS {
18 namespace Sharing {
19
TaskPool()20 TaskPool::TaskPool()
21 {
22 SHARING_LOGD("trace.");
23 }
24
~TaskPool()25 TaskPool::~TaskPool()
26 {
27 SHARING_LOGD("trace.");
28 if (isRunning_) {
29 Stop();
30 }
31 }
32
Start(int32_t threadsNum)33 int32_t TaskPool::Start(int32_t threadsNum)
34 {
35 SHARING_LOGD("trace.");
36 if (!threads_.empty()) {
37 SHARING_LOGE("Before start, theads is not empty.");
38 return -1;
39 }
40
41 if (threadsNum <= 0) {
42 SHARING_LOGE("threadNum is illegal, %{public}d.", threadsNum);
43 return -1;
44 }
45
46 isRunning_ = true;
47 threads_.reserve(threadsNum);
48 for (int32_t i = 0; i < threadsNum; ++i) {
49 threads_.push_back(std::thread(&TaskPool::TaskMainWorker, this));
50 std::string name = "taskpool" + std::to_string(i);
51 pthread_setname_np(threads_.back().native_handle(), name.c_str());
52 }
53
54 return 0;
55 }
56
Stop()57 void TaskPool::Stop()
58 {
59 SHARING_LOGD("trace.");
60 {
61 std::unique_lock<std::mutex> lock(taskMutex_);
62 isRunning_ = false;
63 hasTask_.notify_all();
64 }
65
66 for (auto &e : threads_) {
67 e.join();
68 }
69 }
70
PushTask(std::packaged_task<BindedTask> & task)71 void TaskPool::PushTask(std::packaged_task<BindedTask> &task)
72 {
73 SHARING_LOGD("trace.");
74 if (threads_.empty()) {
75 } else {
76 std::unique_lock<std::mutex> lock(taskMutex_);
77 while (IsOverload()) {
78 SHARING_LOGE("task pool is over load.");
79 acceptNewTask_.wait(lock);
80 }
81 tasks_.emplace_back(std::move(task));
82 hasTask_.notify_one();
83 }
84 }
85
IsOverload() const86 bool TaskPool::IsOverload() const
87 {
88 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
89 }
90
TaskMainWorker()91 void TaskPool::TaskMainWorker()
92 {
93 SHARING_LOGD("trace.");
94 std::unique_lock<std::mutex> lock(taskMutex_);
95 while (isRunning_) {
96 if (tasks_.empty() && isRunning_) {
97 hasTask_.wait(lock);
98 } else {
99 std::packaged_task<BindedTask> task = std::move(tasks_.front());
100 tasks_.pop_front();
101 acceptNewTask_.notify_one();
102 lock.unlock();
103 SHARING_LOGD("task main worker unlocked.");
104 task();
105 lock.lock();
106 SHARING_LOGD("task main worker locked.");
107 }
108 }
109 }
110
SetTimeoutInterval(uint32_t ms)111 void TaskPool::SetTimeoutInterval(uint32_t ms)
112 {
113 SHARING_LOGD("trace.");
114 timeoutInterval_ = std::chrono::milliseconds(ms);
115 }
116
117 } // namespace Sharing
118 } // namespace OHOS
119