1 /*
2 * Copyright (C) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_queue.h"
17 #include "media_log.h"
18 #include "media_errors.h"
19
20 namespace {
21 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = {LOG_CORE, LOG_DOMAIN, "TaskQueue"};
22 }
23
24 namespace OHOS {
25 namespace Media {
~TaskQueue()26 TaskQueue::~TaskQueue()
27 {
28 (void)Stop();
29 }
30
Start()31 int32_t TaskQueue::Start()
32 {
33 std::unique_lock<std::mutex> lock(mutex_);
34 if (thread_ != nullptr) {
35 MEDIA_LOGW("Started already, ignore ! [%{public}s]", name_.c_str());
36 return MSERR_OK;
37 }
38 isExit_ = false;
39 thread_ = std::make_unique<std::thread>(&TaskQueue::TaskProcessor, this);
40 constexpr uint32_t nameSizeMax = 15;
41 pthread_setname_np(thread_->native_handle(), name_.substr(0, nameSizeMax).c_str());
42 MEDIA_LOGI("thread started, ignore ! [%{public}s]", name_.c_str());
43 return MSERR_OK;
44 }
45
Stop()46 int32_t TaskQueue::Stop() noexcept
47 {
48 std::unique_lock<std::mutex> lock(mutex_);
49 if (isExit_) {
50 MEDIA_LOGI("Stopped already, ignore ! [%{public}s]", name_.c_str());
51 return MSERR_OK;
52 }
53
54 if (std::this_thread::get_id() == thread_->get_id()) {
55 MEDIA_LOGI("Stop at the task thread, reject");
56 return MSERR_INVALID_OPERATION;
57 }
58
59 std::unique_ptr<std::thread> t;
60 isExit_ = true;
61 cond_.notify_all();
62 std::swap(thread_, t);
63 lock.unlock();
64
65 if (t != nullptr && t->joinable()) {
66 t->join();
67 }
68
69 lock.lock();
70 CancelNotExecutedTaskLocked();
71 return MSERR_OK;
72 }
73
74 // cancelNotExecuted = false, delayUs = 0ULL.
EnqueueTask(const std::shared_ptr<ITaskHandler> & task,bool cancelNotExecuted,uint64_t delayUs)75 int32_t TaskQueue::EnqueueTask(const std::shared_ptr<ITaskHandler> &task, bool cancelNotExecuted, uint64_t delayUs)
76 {
77 constexpr uint64_t MAX_DELAY_US = 10000000ULL; // max delay.
78
79 CHECK_AND_RETURN_RET_LOG(task != nullptr, MSERR_INVALID_VAL,
80 "Enqueue task when taskqueue task is nullptr.[%{public}s]", name_.c_str());
81
82 task->Clear();
83
84 CHECK_AND_RETURN_RET_LOG(delayUs < MAX_DELAY_US, MSERR_INVALID_VAL,
85 "Enqueue task when taskqueue delayUs[%{public}" PRIu64 "] is >= max delayUs[ %{public}" PRIu64
86 "], invalid! [%{public}s]",
87 delayUs, MAX_DELAY_US, name_.c_str());
88
89 std::unique_lock<std::mutex> lock(mutex_);
90 CHECK_AND_RETURN_RET_LOG(!isExit_, MSERR_INVALID_OPERATION,
91 "Enqueue task when taskqueue is stopped, failed ! [%{public}s]", name_.c_str());
92
93 if (cancelNotExecuted) {
94 CancelNotExecutedTaskLocked();
95 }
96
97 // 1000 is ns to us.
98 constexpr uint32_t US_TO_NS = 1000;
99 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
100 CHECK_AND_RETURN_RET_LOG(curTimeNs < UINT64_MAX - delayUs * US_TO_NS, MSERR_INVALID_OPERATION,
101 "Enqueue task but timestamp is overflow, why? [%{public}s]", name_.c_str());
102
103 uint64_t executeTimeNs = delayUs * US_TO_NS + curTimeNs;
104 auto iter = std::find_if(taskList_.begin(), taskList_.end(), [executeTimeNs](const TaskHandlerItem &item) {
105 return (item.executeTimeNs_ > executeTimeNs);
106 });
107 (void)taskList_.insert(iter, {task, executeTimeNs});
108 cond_.notify_all();
109
110 return 0;
111 }
112
CancelNotExecutedTaskLocked()113 void TaskQueue::CancelNotExecutedTaskLocked()
114 {
115 MEDIA_LOGI("All task not executed are being cancelled..........[%{public}s]", name_.c_str());
116 while (!taskList_.empty()) {
117 std::shared_ptr<ITaskHandler> task = taskList_.front().task_;
118 taskList_.pop_front();
119 if (task != nullptr) {
120 task->Cancel();
121 }
122 }
123 }
124
TaskProcessor()125 void TaskQueue::TaskProcessor()
126 {
127 MEDIA_LOGI("Enter TaskProcessor [%{public}s]", name_.c_str());
128 while (true) {
129 std::unique_lock<std::mutex> lock(mutex_);
130 cond_.wait(lock, [this] { return isExit_ || !taskList_.empty(); });
131 if (isExit_) {
132 MEDIA_LOGI("Exit TaskProcessor [%{public}s]", name_.c_str());
133 return;
134 }
135 TaskHandlerItem item = taskList_.front();
136 uint64_t curTimeNs = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
137 if (curTimeNs >= item.executeTimeNs_) {
138 taskList_.pop_front();
139 } else {
140 uint64_t diff = item.executeTimeNs_ - curTimeNs;
141 (void)cond_.wait_for(lock, std::chrono::nanoseconds(diff));
142 continue;
143 }
144 lock.unlock();
145
146 if (item.task_ == nullptr || item.task_->IsCanceled()) {
147 MEDIA_LOGD("task is nullptr or task canceled. [%{public}s]", name_.c_str());
148 continue;
149 }
150
151 item.task_->Execute();
152 if (item.task_->GetAttribute().periodicTimeUs_ == UINT64_MAX) {
153 continue;
154 }
155 int32_t res = EnqueueTask(item.task_, false, item.task_->GetAttribute().periodicTimeUs_);
156 if (res != MSERR_OK) {
157 MEDIA_LOGW("enqueue periodic task failed:%d, why? [%{public}s]", res, name_.c_str());
158 }
159 }
160 MEDIA_LOGI("Leave TaskProcessor [%{public}s]", name_.c_str());
161 }
162 } // namespace Media
163 } // namespace OHOS
164