• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_thread_pool.h"
16 
17 #include <memory>
18 
19 #include "logger.h"
20 #include "memory_util.h"
21 #include "thread_util.h"
22 
23 namespace OHOS {
24 namespace HiviewDFX {
25 DEFINE_LOG_TAG("HiView-EventThreadPool");
EventThreadPool(int maxCout,const std::string & name)26 EventThreadPool::EventThreadPool(int maxCout, const std::string& name): maxCout_(maxCout), name_(name), runing_(false)
27 {
28     pool_ = std::vector<std::thread>(maxCout);
29 }
30 
~EventThreadPool()31 EventThreadPool::~EventThreadPool()
32 {
33     Stop();
34 }
35 
Start()36 void EventThreadPool::Start()
37 {
38     if (runing_) {
39         return;
40     }
41     runing_ = true;
42     for (int i = 0; i < maxCout_; ++i) {
43         pool_[i] = std::thread(std::bind(&EventThreadPool::TaskCallback, this));
44     }
45 }
46 
Stop()47 void EventThreadPool::Stop()
48 {
49     if (!runing_) {
50         return;
51     }
52     runing_ = false;
53     cvSync_.notify_all();
54     for (int i = 0; i < maxCout_; ++i) {
55         pool_[i].join();
56     }
57 }
58 
AddTask(Task task,const std::string & name,uint64_t delay,uint8_t priority)59 void EventThreadPool::AddTask(Task task, const std::string &name, uint64_t delay, uint8_t priority)
60 {
61     std::unique_lock<std::mutex> lock(mutex_);
62     uint64_t targetTime = TimeUtil::GetMilliseconds() + delay;
63     HIVIEW_LOGD("AddEvent: targetTime is %{public}s\n", std::to_string(targetTime).c_str());
64     taskQueue_.push(TaskEvent(priority, targetTime, task, name));
65     taskQueue_.ShrinkIfNeedLocked();
66     cvSync_.notify_all();
67     if (taskQueue_.size() > 1000) { // 1000: 积压超过1000条预警
68         HIVIEW_LOGW("%{public}s AddTask. runTask size is %{public}d", name_.c_str(), taskQueue_.size());
69     }
70 }
71 
ObtainTask(uint64_t & targetTime)72 TaskEvent EventThreadPool::ObtainTask(uint64_t &targetTime)
73 {
74     if (taskQueue_.empty()) {
75         targetTime = UINT64_MAX;
76         return TaskEvent(Priority::IDLE_PRIORITY, targetTime, nullptr, "nullptr");
77     }
78     auto tmp = taskQueue_.top();
79     targetTime = tmp.targetTime_;
80     return tmp;
81 }
82 
TaskCallback()83 void EventThreadPool::TaskCallback()
84 {
85     if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
86         HIVIEW_LOGW("Failed to optimize memory for current thread");
87     }
88 
89     std::string tid = std::to_string(Thread::GetTid());
90     const int maxLength = 10;
91     if (name_.length() > maxLength) {
92         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
93         name_ = name_.substr(0, maxLength - 1);
94     }
95     std::string name = name_ + "@" + tid;
96     Thread::SetThreadDescription(name);
97     while (runing_) {
98         Task task = nullptr;
99         {
100             std::unique_lock<std::mutex> lock(mutex_);
101             uint64_t targetTime = UINT64_MAX;
102             TaskEvent taskEvent = ObtainTask(targetTime);
103             uint64_t now = TimeUtil::GetMilliseconds();
104             HIVIEW_LOGD("name is %{public}s, targetTime is %{public}s, now is %{public}s\n",
105                 taskEvent.name_.c_str(), std::to_string(targetTime).c_str(), std::to_string(now).c_str());
106             if (!runing_) {
107                 break;
108             }
109             if (targetTime == UINT64_MAX) {
110                 cvSync_.wait(lock);
111                 continue;
112             } else if (targetTime > now) {
113                 cvSync_.wait_for(lock, std::chrono::milliseconds(targetTime - now));
114                 continue;
115             }
116             task = taskEvent.task_;
117             taskQueue_.pop();
118             if (task == nullptr) {
119                 HIVIEW_LOGW("task == nullptr. %{public}s runTask size is %{public}d", name.c_str(), taskQueue_.size());
120                 continue;
121             }
122             Thread::SetThreadDescription(taskEvent.name_);
123         }
124         task();
125         Thread::SetThreadDescription(name);
126     }
127     HIVIEW_LOGI("%{public}s exit.", name.c_str());
128 }
129 }  // namespace HiviewDFX
130 }  // namespace OHOS