• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21 
22 namespace {
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)23 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
24 {
25     uint64_t minTime = std::numeric_limits<uint64_t>::max();
26 
27     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
28         if (!whenMapVec[idx].empty()) {
29             auto it = whenMapVec[idx].begin();
30             if (it->first < minTime) {
31                 minTime = it->first;
32             }
33         }
34     }
35     return minTime;
36 }
37 
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)38 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
39 {
40     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
41         if (!whenMapVec[idx].empty()) {
42             return false;
43         }
44     }
45     return true;
46 }
47 }
48 
49 namespace ffrt {
DelayTaskCb(void * task)50 static void DelayTaskCb(void* task)
51 {
52     static_cast<QueueTask*>(task)->Execute();
53 }
54 
~ConcurrentQueue()55 ConcurrentQueue::~ConcurrentQueue()
56 {
57     FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
58 }
59 
Push(QueueTask * task)60 int ConcurrentQueue::Push(QueueTask* task)
61 {
62     std::unique_lock lock(mutex_);
63     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
64     ffrt_queue_priority_t taskPriority = task->GetPriority();
65     if (taskPriority > ffrt_queue_priority_idle) {
66         task->SetPriority(ffrt_queue_priority_low);
67         taskPriority = task->GetPriority();
68     }
69 
70     if (waitingAll_) {
71         waitingMap_.insert({task->GetUptime(), task});
72         return SUCC;
73     }
74 
75     if (loop_ != nullptr) {
76         if (task->GetDelay() == 0) {
77             whenMapVec_[taskPriority].insert({task->GetUptime(), task});
78             loop_->WakeUp();
79             return SUCC;
80         }
81         return PushDelayTaskToTimer(task);
82     }
83     return PushAndCalConcurrency(task, taskPriority, lock, true);
84 }
85 
Pull()86 QueueTask* ConcurrentQueue::Pull()
87 {
88     std::unique_lock lock(mutex_);
89     // wait for delay task
90     uint64_t now = GetNow();
91     if (loop_ != nullptr) {
92         if (!WhenMapVecEmpty(whenMapVec_) && now >= GetMinMapTime(whenMapVec_) && !isExit_) {
93             return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
94         }
95         return nullptr;
96     }
97 
98     uint64_t minMaptime = GetMinMapTime(whenMapVec_);
99     while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
100         uint64_t diff = minMaptime - now;
101         FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
102         delayStatus_.store(true);
103         cond_.wait_for(lock, std::chrono::microseconds(diff));
104         delayStatus_.store(false);
105         FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
106         now = GetNow();
107         minMaptime = GetMinMapTime(whenMapVec_);
108     }
109 
110     // abort dequeue in abnormal scenarios
111     if (WhenMapVecEmpty(whenMapVec_)) {
112         int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
113         FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
114         if (oldValue == 1) {
115             cond_.notify_all();
116         }
117         return nullptr;
118     }
119     FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
120 
121     // dequeue next expired task by priority
122     return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
123 }
124 
Remove()125 int ConcurrentQueue::Remove()
126 {
127     std::lock_guard lock(mutex_);
128     uint64_t removeCount = 0;
129     for (auto& currentMap : whenMapVec_) {
130         removeCount += BaseQueue::Remove(currentMap);
131     }
132     return removeCount + BaseQueue::Remove(waitingMap_);
133 }
134 
Remove(const char * name)135 int ConcurrentQueue::Remove(const char* name)
136 {
137     std::lock_guard lock(mutex_);
138     uint64_t removeCount = 0;
139     for (auto& currentMap : whenMapVec_) {
140         removeCount += BaseQueue::Remove(name, currentMap);
141     }
142     return removeCount + BaseQueue::Remove(name, waitingMap_);
143 }
144 
Remove(const QueueTask * task)145 int ConcurrentQueue::Remove(const QueueTask* task)
146 {
147     std::lock_guard lock(mutex_);
148     for (auto& currentMap : whenMapVec_) {
149         if (BaseQueue::Remove(task, currentMap) == SUCC) {
150             return SUCC;
151         }
152     }
153     return BaseQueue::Remove(task, waitingMap_);
154 }
155 
Stop()156 void ConcurrentQueue::Stop()
157 {
158     std::lock_guard lock(mutex_);
159     isExit_ = true;
160 
161     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
162         for (auto it = whenMapVec_[idx].begin(); it != whenMapVec_[idx].end(); it++) {
163             if (it->second) {
164                 it->second->Notify();
165                 it->second->Destroy();
166             }
167         }
168         whenMapVec_[idx].clear();
169     }
170     Stop(waitingMap_);
171     if (loop_ == nullptr) {
172         cond_.notify_all();
173     }
174 
175     FFRT_LOGI("clear [queueId=%u] succ", queueId_);
176 }
177 
WaitAll()178 int ConcurrentQueue::WaitAll()
179 {
180     std::unique_lock lock(mutex_);
181     FFRT_COND_DO_ERR(isOnLoop_, return -1, "loop does not support");
182     FFRT_COND_DO_ERR(isExit_, return -1, "cannot waiting task, [queueId=%u] is exiting", queueId_);
183 
184     if (waitingAll_) {
185         return 1;
186     }
187 
188     waitingAll_ = true;
189     cond_.wait(lock, [this] { return concurrency_.load() == 0;}); // 是否需要加上whenMap_empty()
190 
191     if (waitingMap_.empty()) {
192         waitingAll_ = false;
193         return 0 ;
194     }
195 
196     // resubmit tasks to the queue and wake up workers
197     for (const auto& iter : waitingMap_) {
198         QueueTask* task = iter.second;
199         QueueHandler* handler = task->GetHandler();
200         int ret = PushAndCalConcurrency(task, task->GetPriority(), lock, false);
201         if (ret == CONCURRENT) {
202             if (task->GetDelay() == 0) {
203                 handler->TransferTask(task);
204             } else {
205                 handler->TransferInitTask();
206             }
207         }
208     }
209     waitingAll_ = false;
210     waitingMap_.clear();
211     return 0;
212 }
213 
SetLoop(Loop * loop)214 bool ConcurrentQueue::SetLoop(Loop* loop)
215 {
216     if (loop == nullptr || loop_ != nullptr) {
217         FFRT_SYSEVENT_LOGE("queueId %s should bind to loop invalid", queueId_);
218         return false;
219     }
220 
221     loop_ = loop;
222     isOnLoop_.store(true);
223     return true;
224 }
225 
PushDelayTaskToTimer(QueueTask * task)226 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
227 {
228     uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
229     int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
230     if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
231         FFRT_SYSEVENT_LOGE("push delay queue task to timer fail");
232         return FAILED;
233     }
234     return SUCC;
235 }
236 
PushAndCalConcurrency(QueueTask * task,ffrt_queue_priority_t taskPriority,std::unique_lock<ffrt::mutex> & lock,bool needUnlock)237 int ConcurrentQueue::PushAndCalConcurrency(QueueTask* task, ffrt_queue_priority_t taskPriority,
238     std::unique_lock<ffrt::mutex>& lock, bool needUnlock)
239 {
240     if (concurrency_.load() < maxConcurrency_) {
241         int oldValue = concurrency_.fetch_add(1);
242         FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
243 
244         if (task->GetDelay() > 0) {
245             whenMapVec_[taskPriority].insert({task->GetUptime(), task});
246         }
247 
248         return CONCURRENT;
249     }
250 
251     whenMapVec_[taskPriority].insert({task->GetUptime(), task});
252     if (task == whenMapVec_[taskPriority].begin()->second) {
253         if (needUnlock) {
254             lock.unlock();
255         }
256         cond_.notify_all();
257     }
258     return SUCC;
259 }
260 
Stop(std::multimap<uint64_t,QueueTask * > & whenMap)261 void ConcurrentQueue::Stop(std::multimap<uint64_t, QueueTask*>& whenMap)
262 {
263     for (auto it = whenMap.begin(); it != whenMap.end(); it++) {
264         if (it->second) {
265             it->second->Notify();
266             it->second->Destroy();
267         }
268     }
269 
270     whenMap.clear();
271 }
272 
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)273 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
274 {
275     int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
276     return std::make_unique<ConcurrentQueue>(maxConcurrency);
277 }
278 
HasTask(const char * name)279 bool ConcurrentQueue::HasTask(const char* name)
280 {
281     std::lock_guard lock(mutex_);
282     for (auto& currentMap : whenMapVec_) {
283         if (BaseQueue::HasTask(name, currentMap)) {
284             return true;
285         }
286     }
287     return false;
288 }
289 
GetHeadTask()290 std::vector<QueueTask*> ConcurrentQueue::GetHeadTask()
291 {
292     std::lock_guard lock(mutex_);
293     if (WhenMapVecEmpty(whenMapVec_)) {
294         return {};
295     }
296 
297     allWhenmapTask.clear();
298 
299     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
300         if (!whenMapVec_[idx].empty()) {
301             for (const auto& [upTime, qtask] : whenMapVec_[idx]) {
302                 allWhenmapTask.emplace_back(upTime, qtask);
303             }
304         }
305     }
306 
307     std::sort(allWhenmapTask.begin(), allWhenmapTask.end(),
308                 [](const auto& lhs, const auto& rhs) {
309                     return lhs.first < rhs.first;
310                 });
311 
312     for (size_t i = 0; i < maxConcurrency_; i++) {
313         if (i < allWhenmapTask.size()) {
314             headTaskVec_[i] = allWhenmapTask[i].second;
315         } else {
316             headTaskVec_[i] = nullptr;
317         }
318     }
319     return headTaskVec_;
320 }
321 } // namespace ffrt
322