1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <malloc.h>
19 #include "task_queue.h"
20 #include "media_log.h"
21 #include "media_errors.h"
22
23 using namespace OHOS::QOS;
24
25 namespace {
26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "TaskQueue"};
27 }
28
29 namespace OHOS {
30 namespace Media {
~TaskQueue()31 TaskQueue::~TaskQueue()
32 {
33 (void)Stop();
34 }
35
Start()36 int32_t TaskQueue::Start()
37 {
38 std::unique_lock<std::mutex> lock(mutex_);
39 CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
40 MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
41 isExit_ = false;
42 thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
43 MEDIA_LOGI("0x%{public}06" PRIXPTR " Instance thread started [%{public}s]", FAKE_POINTER(this), name_.c_str());
44 return MSERR_OK;
45 }
46
Stop()47 int32_t TaskQueue::Stop() noexcept
48 {
49 std::unique_lock<std::mutex> lock(mutex_);
50 if (isExit_) {
51 MEDIA_LOGD("Stopped already, ignore ! [%{public}s]", name_.c_str());
52 return MSERR_OK;
53 }
54
55 if (std::this_thread::get_id() == thread_->get_id()) {
56 MEDIA_LOGI("Stop at the task thread, reject");
57 return MSERR_INVALID_OPERATION;
58 }
59
60 std::unique_ptr<std::thread> t;
61 isExit_ = true;
62 cond_.notify_all();
63 std::swap(thread_, t);
64 lock.unlock();
65
66 if (t != nullptr && t->joinable()) {
67 t->join();
68 }
69
70 lock.lock();
71 CancelNotExecutedTaskLocked();
72 return MSERR_OK;
73 }
74
SetQos(const QosLevel level)75 void TaskQueue::SetQos(const QosLevel level)
76 {
77 if (tid_ == -1) {
78 MEDIA_LOGW("SetQos thread level failed, tid invalid");
79 return;
80 }
81 MEDIA_LOGI("SetQos thread [%{public}d] level [%{public}d]", static_cast<int>(tid_), static_cast<int>(level));
82 SetQosForOtherThread(level, tid_);
83 }
84
ResetQos()85 void TaskQueue::ResetQos()
86 {
87 if (tid_ == -1) {
88 MEDIA_LOGW("ResetQos thread level failed, tid invalid");
89 return;
90 }
91 ResetQosForOtherThread(tid_);
92 MEDIA_LOGI("ResetQos thread [%{public}d] ok", static_cast<int>(tid_));
93 }
94
95 // cancelNotExecuted = false, delayUs = 0ULL.
EnqueueTask(const std::shared_ptr<ITaskHandler> & task,bool cancelNotExecuted,uint64_t delayUs)96 __attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
97 bool cancelNotExecuted, uint64_t delayUs)
98 {
99 constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
100
101 CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
102 "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
103
104 task->Clear();
105
106 CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
107 "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
108 "], invalid! [%{public}s]",
109 delayUs, MAX_DELAY_US, name_.c_str());
110
111 std::unique_lock<std::mutex> lock(mutex_);
112 CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
113 "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
114
115 if (cancelNotExecuted) {
116 CancelNotExecutedTaskLocked();
117 }
118
119 // 1000 is ns to us.
120 constexpr uint32_t US_TO_NS = 1000;
121 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
122 CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
123 "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
124
125 uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
126 auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
127 return (item.executeTimeNs_ > executeTimeNs);
128 });
129 (void)taskList_.insert(iter, {task, executeTimeNs});
130 cond_.notify_all();
131
132 return 0;
133 }
134
CancelNotExecutedTaskLocked()135 __attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
136 {
137 MEDIA_LOGD("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
138 while (!taskList_.empty()) {
139 std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
140 taskList_.pop_front();
141 if (task != nullptr) {
142 task->Cancel();
143 }
144 }
145 }
146
TaskProcessor()147 __attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
148 {
149 constexpr uint32_t nameSizeMax = 15;
150 tid_ = gettid();
151 MEDIA_LOGI("Enter TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
152 pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
153 (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
154 while (true) {
155 std::unique_lock<std::mutex> lock(mutex_);
156 cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
157 if (isExit_) {
158 MEDIA_LOGI("Exit TaskProcessor [%{public}s], tid_: (%{public}d)", name_.c_str(), tid_);
159 return;
160 }
161 TaskHandlerItem item = taskList_.front();
162 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
163 if (curTimeNs >= item.executeTimeNs_) {
164 taskList_.pop_front();
165 } else {
166 uint64_t diff = item.executeTimeNs_ - curTimeNs;
167 (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
168 continue;
169 }
170 isTaskExecuting_ = true;
171 lock.unlock();
172
173 if (item.task_ == nullptr || item.task_->IsCanceled()) {
174 MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
175 lock.lock();
176 isTaskExecuting_ = false;
177 lock.unlock();
178 continue;
179 }
180
181 item.task_->Execute();
182 lock.lock();
183 isTaskExecuting_ = false;
184 lock.unlock();
185 if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
186 continue;
187 }
188 int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
189 if (res != MSERR_OK) {
190 MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
191 }
192 }
193 (void)mallopt(M_FLUSH_THREAD_CACHE, 0);
194 MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
195 }
196
IsTaskExecuting()197 bool TaskQueue::IsTaskExecuting()
198 {
199 std::unique_lock<std::mutex> lock(mutex_);
200 return isTaskExecuting_;
201 }
202 } // namespace Media
203 } // namespace OHOS
204