• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "concurrent_queue.h"
17 #include <climits>
18 #include "dfx/log/ffrt_log_api.h"
19 #include "tm/queue_task.h"
20 #include "eu/loop.h"
21 
22 namespace {
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)23 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
24 {
25     uint64_t minTime = std::numeric_limits<uint64_t>::max();
26 
27     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
28         if (!whenMapVec[idx].empty()) {
29             auto it = whenMapVec[idx].begin();
30             if (it->first < minTime) {
31                 minTime = it->first;
32             }
33         }
34     }
35     return minTime;
36 }
37 
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)38 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
39 {
40     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
41         if (!whenMapVec[idx].empty()) {
42             return false;
43         }
44     }
45     return true;
46 }
47 }
48 
49 namespace ffrt {
DelayTaskCb(void * task)50 static void DelayTaskCb(void* task)
51 {
52     static_cast<QueueTask*>(task)->Execute();
53 }
54 
~ConcurrentQueue()55 ConcurrentQueue::~ConcurrentQueue()
56 {
57     FFRT_LOGI("destruct concurrent queueId=%u leave", queueId_);
58 }
59 
Push(QueueTask * task)60 int ConcurrentQueue::Push(QueueTask* task)
61 {
62     std::unique_lock lock(mutex_);
63     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
64     ffrt_queue_priority_t taskPriority = task->GetPriority();
65     if (taskPriority > ffrt_queue_priority_idle) {
66         task->SetPriority(ffrt_queue_priority_low);
67         taskPriority = task->GetPriority();
68     }
69 
70     if (loop_ != nullptr) {
71         if (task->GetDelay() == 0) {
72             whenMapVec_[taskPriority].insert({task->GetUptime(), task});
73             loop_->WakeUp();
74             return SUCC;
75         }
76         return PushDelayTaskToTimer(task);
77     }
78     FFRT_COND_DO_ERR(IsOnLoop(), return FAILED, "cannot push task, [queueId=%u] loop empty", queueId_);
79 
80     if (concurrency_.load() < maxConcurrency_) {
81         int oldValue = concurrency_.fetch_add(1);
82         FFRT_LOGD("task [gid=%llu] concurrency[%u] + 1 [queueId=%u]", task->gid, oldValue, queueId_);
83 
84         if (task->GetDelay() > 0) {
85             whenMapVec_[taskPriority].insert({task->GetUptime(), task});
86         }
87 
88         return CONCURRENT;
89     }
90 
91     whenMapVec_[taskPriority].insert({task->GetUptime(), task});
92     if (task == whenMapVec_[taskPriority].begin()->second) {
93         cond_.notify_all();
94     }
95 
96     return SUCC;
97 }
98 
Pull()99 QueueTask* ConcurrentQueue::Pull()
100 {
101     std::unique_lock lock(mutex_);
102     // wait for delay task
103     uint64_t now = GetNow();
104     if (loop_ != nullptr) {
105         if (!WhenMapVecEmpty(whenMapVec_) && now >= GetMinMapTime(whenMapVec_) && !isExit_) {
106             return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
107         }
108         return nullptr;
109     }
110 
111     uint64_t minMaptime = GetMinMapTime(whenMapVec_);
112     while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
113         uint64_t diff = minMaptime - now;
114         FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
115         cond_.wait_for(lock, std::chrono::microseconds(diff));
116         FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
117         now = GetNow();
118         minMaptime = GetMinMapTime(whenMapVec_);
119     }
120 
121     // abort dequeue in abnormal scenarios
122     if (WhenMapVecEmpty(whenMapVec_)) {
123         int oldValue = concurrency_.fetch_sub(1); // 取不到后继的task,当前这个task正式退出
124         FFRT_LOGD("concurrency[%d] - 1 [queueId=%u] switch into inactive", oldValue, queueId_);
125         return nullptr;
126     }
127     FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
128 
129     // dequeue next expired task by priority
130     return dequeFunc_(queueId_, now, whenMapVec_, nullptr);
131 }
132 
Stop()133 void ConcurrentQueue::Stop()
134 {
135     std::unique_lock lock(mutex_);
136     isExit_ = true;
137 
138     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
139         for (auto it = whenMapVec_[idx].begin(); it != whenMapVec_[idx].end(); it++) {
140             if (it->second) {
141                 it->second->Notify();
142                 it->second->Destroy();
143             }
144         }
145         whenMapVec_[idx].clear();
146     }
147     if (loop_ == nullptr) {
148         cond_.notify_all();
149     }
150 
151     FFRT_LOGI("clear [queueId=%u] succ", queueId_);
152 }
153 
SetLoop(Loop * loop)154 bool ConcurrentQueue::SetLoop(Loop* loop)
155 {
156     if (loop == nullptr || loop_ != nullptr) {
157         FFRT_LOGE("queueId %s should bind to loop invalid", queueId_);
158         return false;
159     }
160 
161     loop_ = loop;
162     isOnLoop_.store(true);
163     return true;
164 }
165 
PushDelayTaskToTimer(QueueTask * task)166 int ConcurrentQueue::PushDelayTaskToTimer(QueueTask* task)
167 {
168     uint64_t delayMs = (task->GetDelay() - 1) / 1000 + 1;
169     int timeout = delayMs > INT_MAX ? INT_MAX : delayMs;
170     if (loop_->TimerStart(timeout, task, DelayTaskCb, false) < 0) {
171         FFRT_LOGE("push delay queue task to timer fail");
172         return FAILED;
173     }
174     return SUCC;
175 }
176 
CreateConcurrentQueue(const ffrt_queue_attr_t * attr)177 std::unique_ptr<BaseQueue> CreateConcurrentQueue(const ffrt_queue_attr_t* attr)
178 {
179     int maxConcurrency = ffrt_queue_attr_get_max_concurrency(attr) <= 0 ? 1 : ffrt_queue_attr_get_max_concurrency(attr);
180     return std::make_unique<ConcurrentQueue>(maxConcurrency);
181 }
182 
Remove()183 void ConcurrentQueue::Remove()
184 {
185     std::unique_lock lock(mutex_);
186     for (auto& currentMap : whenMapVec_) {
187         BaseQueue::Remove(currentMap);
188     }
189 }
190 
Remove(const char * name)191 int ConcurrentQueue::Remove(const char* name)
192 {
193     std::unique_lock lock(mutex_);
194     int count = 0;
195     for (auto& currentMap : whenMapVec_) {
196         count += BaseQueue::Remove(name, currentMap);
197     }
198     return count > 0 ? SUCC : FAILED;
199 }
200 
Remove(const QueueTask * task)201 int ConcurrentQueue::Remove(const QueueTask* task)
202 {
203     std::unique_lock lock(mutex_);
204     for (auto& currentMap : whenMapVec_) {
205         if (BaseQueue::Remove(task, currentMap) == SUCC) {
206             return SUCC;
207         }
208     }
209     return FAILED;
210 }
211 
HasTask(const char * name)212 bool ConcurrentQueue::HasTask(const char* name)
213 {
214     std::unique_lock lock(mutex_);
215     for (auto& currentMap : whenMapVec_) {
216         if (BaseQueue::HasTask(name, currentMap)) {
217             return true;
218         }
219     }
220     return false;
221 }
222 } // namespace ffrt
223