1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21
22 namespace {
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)23 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
24 {
25 uint64_t minTime = std::numeric_limits<uint64_t>::max();
26
27 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
28 if (!whenMapVec[idx].empty()) {
29 auto it = whenMapVec[idx].begin();
30 if (it->first < minTime) {
31 minTime = it->first;
32 }
33 }
34 }
35 return minTime;
36 }
37
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)38 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
39 {
40 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
41 if (!whenMapVec[idx].empty()) {
42 return false;
43 }
44 }
45 return true;
46 }
47 }
48
49 namespace ffrt {
DelayTaskCb(void * task)50 static void DelayTaskCb(void* task)
51 {
52 static_cast<QueueTask*>(task)->Execute();
53 }
54
~ConcurrentQueue()55 ConcurrentQueue::~ConcurrentQueue()
56 {
57 FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
58 }
59
Push(QueueTask * task)60 int ConcurrentQueue::Push(QueueTask* task)
61 {
62 std::unique_lock lock(mutex_);
63 FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
64 ffrt_queue_priority_t taskPriority = task->GetPriority();
65 if (taskPriority > ffrt_queue_priority_idle) {
66 task->SetPriority(ffrt_queue_priority_low);
67 taskPriority = task->GetPriority();
68 }
69
70 if (loop_ != nullptr) {
71 if (task->GetDelay() == 0) {
72 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
73 loop_->WakeUp();
74 return SUCC;
75 }
76 return PushDelayTaskToTimer(task);
77 }
78 FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
79
80 if (concurrency_.load() < maxConcurrency_) {
81 int oldValue = concurrency_.fetch_add(1);
82 FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
83
84 if (task->GetDelay() > 0) {
85 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
86 }
87
88 return CONCURRENT;
89 }
90
91 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
92 if (task == whenMapVec_[taskPriority].begin()->second) {
93 cond_.notify_all();
94 }
95
96 return SUCC;
97 }
98
Pull()99 QueueTask* ConcurrentQueue::Pull()
100 {
101 std::unique_lock lock(mutex_);
102 // wait for delay task
103 uint64_t now = GetNow();
104 if (loop_ != nullptr) {
105 if (!WhenMapVecEmpty(whenMapVec_) && now >= GetMinMapTime(whenMapVec_) && !isExit_) {
106 return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
107 }
108 return nullptr;
109 }
110
111 uint64_t minMaptime = GetMinMapTime(whenMapVec_);
112 while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
113 uint64_t diff = minMaptime - now;
114 FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
115 cond_.wait_for(lock, std::chrono::microseconds(diff));
116 FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
117 now = GetNow();
118 minMaptime = GetMinMapTime(whenMapVec_);
119 }
120
121 // abort dequeue in abnormal scenarios
122 if (WhenMapVecEmpty(whenMapVec_)) {
123 int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
124 FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
125 return nullptr;
126 }
127 FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
128
129 // dequeue next expired task by priority
130 return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
131 }
132
Stop()133 void ConcurrentQueue::Stop()
134 {
135 std::unique_lock lock(mutex_);
136 isExit_ = true;
137
138 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
139 for (auto it = whenMapVec_[idx].begin(); it != whenMapVec_[idx].end(); it++) {
140 if (it->second) {
141 it->second->Notify();
142 it->second->Destroy();
143 }
144 }
145 whenMapVec_[idx].clear();
146 }
147 if (loop_ == nullptr) {
148 cond_.notify_all();
149 }
150
151 FFRT_LOGI("clear [queueId=%u] succ", queueId_);
152 }
153
SetLoop(Loop * loop)154 bool ConcurrentQueue::SetLoop(Loop* loop)
155 {
156 if (loop == nullptr || loop_ != nullptr) {
157 FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
158 return false;
159 }
160
161 loop_ = loop;
162 isOnLoop_.store(true);
163 return true;
164 }
165
PushDelayTaskToTimer(QueueTask * task)166 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
167 {
168 uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
169 int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
170 if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
171 FFRT_LOGE("push delay queue task to timer fail");
172 return FAILED;
173 }
174 return SUCC;
175 }
176
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)177 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
178 {
179 int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
180 return std::make_unique<ConcurrentQueue>(maxConcurrency);
181 }
182
Remove()183 void ConcurrentQueue::Remove()
184 {
185 std::unique_lock lock(mutex_);
186 for (auto& currentMap : whenMapVec_) {
187 BaseQueue::Remove(currentMap);
188 }
189 }
190
Remove(const char * name)191 int ConcurrentQueue::Remove(const char* name)
192 {
193 std::unique_lock lock(mutex_);
194 int count = 0;
195 for (auto& currentMap : whenMapVec_) {
196 count += BaseQueue::Remove(name, currentMap);
197 }
198 return count > 0 ? SUCC : FAILED;
199 }
200
Remove(const QueueTask * task)201 int ConcurrentQueue::Remove(const QueueTask* task)
202 {
203 std::unique_lock lock(mutex_);
204 for (auto& currentMap : whenMapVec_) {
205 if (BaseQueue::Remove(task, currentMap) == SUCC) {
206 return SUCC;
207 }
208 }
209 return FAILED;
210 }
211
HasTask(const char * name)212 bool ConcurrentQueue::HasTask(const char* name)
213 {
214 std::unique_lock lock(mutex_);
215 for (auto& currentMap : whenMapVec_) {
216 if (BaseQueue::HasTask(name, currentMap)) {
217 return true;
218 }
219 }
220 return false;
221 }
222 } // namespace ffrt
223