1 /*
2 * Copyright (C) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "threading/dispatcher_task_queue.h"
17
18 #include <algorithm>
19 #include <mutex>
20
21 #include <base/containers/vector.h>
22 #include <core/log.h>
23 #include <core/namespace.h>
24
25 #include "os/platform.h"
26
27 CORE_BEGIN_NAMESPACE()
28 using BASE_NS::vector;
29
30 // -- Dispatcher task queue.
DispatcherTaskQueue(const IThreadPool::Ptr & threadPool)31 DispatcherTaskQueue::DispatcherTaskQueue(const IThreadPool::Ptr& threadPool) : TaskQueue(threadPool) {}
32
~DispatcherTaskQueue()33 DispatcherTaskQueue::~DispatcherTaskQueue()
34 {
35 Wait();
36 }
37
Remove(uint64_t taskIdentifier)38 void DispatcherTaskQueue::Remove(uint64_t taskIdentifier)
39 {
40 std::lock_guard lock(queueLock_);
41
42 auto it = std::find(tasks_.begin(), tasks_.end(), taskIdentifier);
43 if (it != tasks_.end()) {
44 tasks_.erase(it);
45 }
46 }
47
Clear()48 void DispatcherTaskQueue::Clear()
49 {
50 Wait();
51 {
52 std::lock_guard lock(queueLock_);
53
54 tasks_.clear();
55 }
56 }
57
Submit(uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)58 void DispatcherTaskQueue::Submit(uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
59 {
60 std::lock_guard lock(queueLock_);
61
62 tasks_.emplace_back(taskIdentifier, std::move(task));
63 }
64
SubmitAfter(uint64_t afterIdentifier,uint64_t taskIdentifier,IThreadPool::ITask::Ptr && task)65 void DispatcherTaskQueue::SubmitAfter(uint64_t afterIdentifier, uint64_t taskIdentifier, IThreadPool::ITask::Ptr&& task)
66 {
67 std::lock_guard lock(queueLock_);
68
69 auto it = std::find(tasks_.begin(), tasks_.end(), afterIdentifier);
70 if (it != tasks_.end()) {
71 tasks_.emplace(++it, taskIdentifier, std::move(task));
72 } else {
73 tasks_.emplace_back(taskIdentifier, std::move(task));
74 }
75 }
76
Execute()77 void DispatcherTaskQueue::Execute()
78 {
79 Entry entry;
80 bool hasTaskEntry = false;
81
82 {
83 // Retrieve first task in task queue.
84 std::lock_guard lock(queueLock_);
85
86 if (!tasks_.empty()) {
87 entry = std::move(tasks_.front());
88 tasks_.pop_front();
89 hasTaskEntry = true;
90 }
91 }
92
93 if (hasTaskEntry) {
94 // Execute.
95 (*entry.task)();
96
97 {
98 // Move to completed list and finish.
99 std::lock_guard lock(queueLock_);
100 finishedTasks_.emplace_back(std::move(entry));
101 }
102 }
103 }
104
CollectFinishedTasks()105 vector<uint64_t> DispatcherTaskQueue::CollectFinishedTasks()
106 {
107 std::lock_guard lock(queueLock_);
108
109 vector<uint64_t> result;
110 result.reserve(finishedTasks_.size());
111 for (auto& entry : finishedTasks_) {
112 result.emplace_back(entry.identifier);
113 }
114
115 finishedTasks_.clear();
116
117 return result;
118 }
119 CORE_END_NAMESPACE()
120