1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "common_components/taskpool/task_queue.h"
17
18 namespace common {
PostTask(std::unique_ptr<Task> task)19 void TaskQueue::PostTask(std::unique_ptr<Task> task)
20 {
21 std::lock_guard<std::mutex> guard(mtx_);
22 DCHECK_CC(!terminate_);
23 tasks_.push_back(std::move(task));
24 cv_.notify_one();
25 }
26
PostDelayedTask(std::unique_ptr<Task> task,uint64_t delayMilliseconds)27 void TaskQueue::PostDelayedTask(std::unique_ptr<Task> task, uint64_t delayMilliseconds)
28 {
29 std::lock_guard<std::mutex> guard(mtx_);
30 DCHECK_CC(!terminate_);
31 auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(delayMilliseconds);
32 delayedTasks_.insert({deadline, std::move(task)});
33 cv_.notify_one();
34 }
35
PopTask()36 std::unique_ptr<Task> TaskQueue::PopTask()
37 {
38 std::unique_lock<std::mutex> lock(mtx_);
39 while (true) {
40 MoveExpiredTask(lock);
41 if (!tasks_.empty()) {
42 std::unique_ptr<Task> task = std::move(tasks_.front());
43 tasks_.pop_front();
44 return task;
45 }
46 if (terminate_) {
47 cv_.notify_all();
48 return nullptr;
49 }
50 WaitForTask(lock);
51 }
52 }
53
TerminateTask(int32_t id,TaskType type)54 void TaskQueue::TerminateTask(int32_t id, TaskType type)
55 {
56 std::lock_guard<std::mutex> guard(mtx_);
57 for (auto &task : tasks_) {
58 if (id != ALL_TASK_ID && id != task->GetId()) {
59 continue;
60 }
61 if (type != TaskType::ALL && type != task->GetTaskType()) {
62 continue;
63 }
64 task->Terminated();
65 }
66 for (auto &taskItem : delayedTasks_) {
67 if (id != ALL_TASK_ID && id != (taskItem.second)->GetId()) {
68 continue;
69 }
70 if (type != TaskType::ALL && type != (taskItem.second)->GetTaskType()) {
71 continue;
72 }
73 (taskItem.second)->Terminated();
74 }
75 }
76
Terminate()77 void TaskQueue::Terminate()
78 {
79 std::lock_guard<std::mutex> guard(mtx_);
80 terminate_ = true;
81 cv_.notify_all();
82 }
83
ForEachTask(const std::function<void (Task *)> & f)84 void TaskQueue::ForEachTask(const std::function<void(Task*)> &f)
85 {
86 std::lock_guard<std::mutex> guard(mtx_);
87 for (auto &task : tasks_) {
88 if (task.get() != nullptr) {
89 f(task.get());
90 }
91 }
92 }
93
MoveExpiredTask(std::unique_lock<std::mutex> & lock)94 void TaskQueue::MoveExpiredTask(std::unique_lock<std::mutex> &lock)
95 {
96 DCHECK_CC(!mtx_.try_lock());
97 while (!delayedTasks_.empty()) {
98 auto it = delayedTasks_.begin();
99 auto currentTime = std::chrono::steady_clock::now();
100 if ((std::chrono::duration_cast<std::chrono::duration<double>>(it->first - currentTime)).count() > 0) {
101 return;
102 }
103 tasks_.push_back(std::move(it->second));
104 delayedTasks_.erase(it);
105 }
106 }
107
WaitForTask(std::unique_lock<std::mutex> & lock)108 void TaskQueue::WaitForTask(std::unique_lock<std::mutex> &lock)
109 {
110 DCHECK_CC(!mtx_.try_lock());
111 if (!delayedTasks_.empty()) {
112 auto it = delayedTasks_.begin();
113 auto currentTime = std::chrono::steady_clock::now();
114 if ((std::chrono::duration_cast<std::chrono::duration<double>>(it->first - currentTime)).count() < 0) {
115 return;
116 }
117 auto waitingTime = std::chrono::duration_cast<std::chrono::milliseconds>(it->first - currentTime);
118 cv_.wait_for(lock, waitingTime);
119 } else {
120 cv_.wait(lock);
121 }
122 }
123 } // namespace common
124