1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_queue.h"
17 #include <malloc.h>
18 #include "media_log.h"
19 #include "media_errors.h"
20
21 namespace {
22 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "TaskQueue"};
23 }
24
25 namespace OHOS {
26 namespace Media {
~TaskQueue()27 TaskQueue::~TaskQueue()
28 {
29 (void)Stop();
30 }
31
Start()32 int32_t TaskQueue::Start()
33 {
34 std::unique_lock<std::mutex> lock(mutex_);
35 CHECK_AND_RETURN_RET_LOG(thread_ == nullptr,
36 MSERR_OK, "Started already, ignore ! [%{public}s]", name_.c_str());
37 isExit_ = false;
38 thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
39 MEDIA_LOGI("thread started [%{public}s]", name_.c_str());
40 return MSERR_OK;
41 }
42
Stop()43 int32_t TaskQueue::Stop() noexcept
44 {
45 std::unique_lock<std::mutex> lock(mutex_);
46 if (isExit_) {
47 MEDIA_LOGI("Stopped already, ignore ! [%{public}s]", name_.c_str());
48 return MSERR_OK;
49 }
50
51 if (std::this_thread::get_id() == thread_->get_id()) {
52 MEDIA_LOGI("Stop at the task thread, reject");
53 return MSERR_INVALID_OPERATION;
54 }
55
56 std::unique_ptr<std::thread> t;
57 isExit_ = true;
58 cond_.notify_all();
59 std::swap(thread_, t);
60 lock.unlock();
61
62 if (t != nullptr && t->joinable()) {
63 t->join();
64 }
65
66 lock.lock();
67 CancelNotExecutedTaskLocked();
68 return MSERR_OK;
69 }
70
71 // cancelNotExecuted = false, delayUs = 0ULL.
EnqueueTask(const std::shared_ptr<ITaskHandler> & task,bool cancelNotExecuted,uint64_t delayUs)72 __attribute__((no_sanitize("cfi"))) int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
73 bool cancelNotExecuted, uint64_t delayUs)
74 {
75 constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
76
77 CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
78 "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
79
80 task->Clear();
81
82 CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
83 "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
84 "], invalid! [%{public}s]",
85 delayUs, MAX_DELAY_US, name_.c_str());
86
87 std::unique_lock<std::mutex> lock(mutex_);
88 CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
89 "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
90
91 if (cancelNotExecuted) {
92 CancelNotExecutedTaskLocked();
93 }
94
95 // 1000 is ns to us.
96 constexpr uint32_t US_TO_NS = 1000;
97 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
98 CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
99 "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
100
101 uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
102 auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
103 return (item.executeTimeNs_ > executeTimeNs);
104 });
105 (void)taskList_.insert(iter, {task, executeTimeNs});
106 cond_.notify_all();
107
108 return 0;
109 }
110
CancelNotExecutedTaskLocked()111 __attribute__((no_sanitize("cfi"))) void TaskQueue::CancelNotExecutedTaskLocked()
112 {
113 MEDIA_LOGI("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
114 while (!taskList_.empty()) {
115 std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
116 taskList_.pop_front();
117 if (task != nullptr) {
118 task->Cancel();
119 }
120 }
121 }
122
TaskProcessor()123 __attribute__((no_sanitize("cfi"))) void TaskQueue::TaskProcessor()
124 {
125 MEDIA_LOGI("Enter TaskProcessor [%{public}s]", name_.c_str());
126 constexpr uint32_t nameSizeMax = 15;
127 pthread_setname_np(pthread_self(), name_.substr(0, nameSizeMax).c_str());
128 (void)mallopt(M_DELAYED_FREE, M_DELAYED_FREE_DISABLE);
129 while (true) {
130 std::unique_lock<std::mutex> lock(mutex_);
131 cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
132 if (isExit_) {
133 MEDIA_LOGI("Exit TaskProcessor [%{public}s]", name_.c_str());
134 return;
135 }
136 TaskHandlerItem item = taskList_.front();
137 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
138 if (curTimeNs >= item.executeTimeNs_) {
139 taskList_.pop_front();
140 } else {
141 uint64_t diff = item.executeTimeNs_ - curTimeNs;
142 (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
143 continue;
144 }
145 lock.unlock();
146
147 if (item.task_ == nullptr || item.task_->IsCanceled()) {
148 MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
149 continue;
150 }
151
152 item.task_->Execute();
153 if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
154 continue;
155 }
156 int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
157 if (res != MSERR_OK) {
158 MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
159 }
160 }
161 (void)mallopt(M_FLUSH_THREAD_CACHE, 0);
162 MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
163 }
164 } // namespace Media
165 } // namespace OHOS
166