• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Shenzhen Kaihong Digital Industry Development Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "taskpool.h"
17 namespace OHOS {
18 namespace Sharing {
19 constexpr uint32_t MAX_THREAD_NUM = 50;
20 
TaskPool()21 TaskPool::TaskPool()
22 {
23     SHARING_LOGD("trace.");
24 }
25 
~TaskPool()26 TaskPool::~TaskPool()
27 {
28     SHARING_LOGD("trace.");
29     if (isRunning_) {
30         Stop();
31     }
32 }
33 
Start(int32_t threadsNum)34 int32_t TaskPool::Start(int32_t threadsNum)
35 {
36     SHARING_LOGD("trace.");
37     if (!threads_.empty()) {
38         SHARING_LOGE("Before start, theads is not empty.");
39         return -1;
40     }
41 
42     if (threadsNum <= 0 || threadsNum >= MAX_THREAD_NUM) {
43         SHARING_LOGE("threadNum is illegal, %{public}d.", threadsNum);
44         return -1;
45     }
46 
47     isRunning_ = true;
48     threads_.reserve(threadsNum);
49     for (int32_t i = 0; i < threadsNum; ++i) {
50         threads_.push_back(std::thread(&TaskPool::TaskMainWorker, this));
51         std::string name = "taskpool" + std::to_string(i);
52         pthread_setname_np(threads_.back().native_handle(), name.c_str());
53     }
54 
55     return 0;
56 }
57 
Stop()58 void TaskPool::Stop()
59 {
60     SHARING_LOGD("trace.");
61     {
62         std::unique_lock<std::mutex> lock(taskMutex_);
63         isRunning_ = false;
64         hasTask_.notify_all();
65     }
66 
67     for (auto &e : threads_) {
68         e.join();
69     }
70 }
71 
PushTask(std::packaged_task<BindedTask> & task)72 void TaskPool::PushTask(std::packaged_task<BindedTask> &task)
73 {
74     SHARING_LOGD("trace.");
75     if (threads_.empty()) {
76     } else {
77         std::unique_lock<std::mutex> lock(taskMutex_);
78         while (IsOverload()) {
79             SHARING_LOGE("task pool is over load.");
80             acceptNewTask_.wait(lock);
81         }
82         tasks_.emplace_back(std::move(task));
83         hasTask_.notify_one();
84     }
85 }
86 
IsOverload() const87 bool TaskPool::IsOverload() const
88 {
89     return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
90 }
91 
TaskMainWorker()92 void TaskPool::TaskMainWorker()
93 {
94     SHARING_LOGD("trace.");
95     std::unique_lock<std::mutex> lock(taskMutex_);
96     while (isRunning_) {
97         if (tasks_.empty() && isRunning_) {
98             hasTask_.wait(lock);
99         } else {
100             std::packaged_task<BindedTask> task = std::move(tasks_.front());
101             tasks_.pop_front();
102             acceptNewTask_.notify_one();
103             lock.unlock();
104             SHARING_LOGD("task main worker unlocked.");
105             task();
106             lock.lock();
107             SHARING_LOGD("task main worker locked.");
108         }
109     }
110 }
111 
SetTimeoutInterval(uint32_t ms)112 void TaskPool::SetTimeoutInterval(uint32_t ms)
113 {
114     SHARING_LOGD("trace.");
115     timeoutInterval_ = std::chrono::milliseconds(ms);
116 }
117 
118 } // namespace Sharing
119 } // namespace OHOS
120