• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <unordered_map>
17 #include "dfx/log/ffrt_log_api.h"
18 #include "tm/queue_task.h"
19 #include "queue/serial_queue.h"
20 #include "concurrent_queue.h"
21 #include "eventhandler_adapter_queue.h"
22 #include "eventhandler_interactive_queue.h"
23 #include "util/time_format.h"
24 #include "queue/base_queue.h"
25 
26 namespace {
27 // 0预留为非法值
28 std::atomic_uint32_t g_queueId(1);
29 using CreateFunc = std::unique_ptr<ffrt::BaseQueue>(*)(const ffrt_queue_attr_t*);
30 const std::unordered_map<int, CreateFunc> CREATE_FUNC_MAP = {
31     { ffrt_queue_serial, ffrt::CreateSerialQueue },
32     { ffrt_queue_concurrent, ffrt::CreateConcurrentQueue },
33     { ffrt_queue_eventhandler_interactive, ffrt::CreateEventHandlerInteractiveQueue },
34     { ffrt_queue_eventhandler_adapter, ffrt::CreateEventHandlerAdapterQueue },
35 };
36 
ClearWhenMap(std::multimap<uint64_t,ffrt::QueueTask * > & whenMap,ffrt::condition_variable & cond)37 int ClearWhenMap(std::multimap<uint64_t, ffrt::QueueTask*>& whenMap, ffrt::condition_variable& cond)
38 {
39     for (auto it = whenMap.begin(); it != whenMap.end(); it++) {
40         if (it->second) {
41             it->second->Cancel();
42             it->second = nullptr;
43         }
44     }
45     int mapSize = static_cast<int>(whenMap.size());
46     whenMap.clear();
47     cond.notify_one();
48     return mapSize;
49 }
50 }
51 
52 namespace ffrt {
BaseQueue()53 BaseQueue::BaseQueue() : queueId_(g_queueId++)
54 {
55     headTaskVec_.resize(1);
56 }
57 
Stop()58 void BaseQueue::Stop()
59 {
60     std::lock_guard lock(mutex_);
61     Stop(whenMap_);
62     FFRT_LOGI("clear [queueId=%u] succ", queueId_);
63 }
64 
Stop(std::multimap<uint64_t,QueueTask * > & whenMap)65 void BaseQueue::Stop(std::multimap<uint64_t, QueueTask*>& whenMap)
66 {
67     isExit_ = true;
68     ClearWhenMap(whenMap, cond_);
69 }
70 
Remove()71 int BaseQueue::Remove()
72 {
73     std::lock_guard lock(mutex_);
74     return Remove(whenMap_);
75 }
76 
Remove(std::multimap<uint64_t,QueueTask * > & whenMap)77 int BaseQueue::Remove(std::multimap<uint64_t, QueueTask*>& whenMap)
78 {
79     FFRT_COND_DO_ERR(isExit_, return 0, "cannot remove task, [queueId=%u] is exiting", queueId_);
80 
81     int mapSize = ClearWhenMap(whenMap, cond_);
82     FFRT_LOGD("cancel [queueId=%u] all tasks succ, count %u", queueId_, mapSize);
83     return mapSize;
84 }
85 
Remove(const char * name)86 int BaseQueue::Remove(const char* name)
87 {
88     std::lock_guard lock(mutex_);
89     return Remove(name, whenMap_);
90 }
91 
Remove(const char * name,std::multimap<uint64_t,QueueTask * > & whenMap)92 int BaseQueue::Remove(const char* name, std::multimap<uint64_t, QueueTask*>& whenMap)
93 {
94     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot remove task, [queueId=%u] is exiting", queueId_);
95 
96     int removedCount = 0;
97     for (auto iter = whenMap.begin(); iter != whenMap.end();) {
98         if (iter->second->IsMatch(name)) {
99             FFRT_LOGD("cancel task[%llu] %s succ", iter->second->gid, iter->second->GetLabel().c_str());
100             iter->second->Cancel();
101             iter = whenMap.erase(iter);
102             removedCount++;
103         } else {
104             ++iter;
105         }
106     }
107 
108     return removedCount;
109 }
110 
Remove(const QueueTask * task)111 int BaseQueue::Remove(const QueueTask* task)
112 {
113     std::lock_guard lock(mutex_);
114     return Remove(task, whenMap_);
115 }
116 
Remove(const QueueTask * task,std::multimap<uint64_t,QueueTask * > & whenMap)117 int BaseQueue::Remove(const QueueTask* task, std::multimap<uint64_t, QueueTask*>& whenMap)
118 {
119     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot remove task, [queueId=%u] is exiting", queueId_);
120 
121     auto range = whenMap.equal_range(task->GetUptime());
122     for (auto it = range.first; it != range.second; it++) {
123         if (it->second == task) {
124             whenMap.erase(it);
125             return SUCC;
126         }
127     }
128 
129     return FAILED;
130 }
131 
HasTask(const char * name)132 bool BaseQueue::HasTask(const char* name)
133 {
134     std::lock_guard lock(mutex_);
135     return HasTask(name, whenMap_);
136 }
137 
HasTask(const char * name,std::multimap<uint64_t,QueueTask * > whenMap)138 bool BaseQueue::HasTask(const char* name, std::multimap<uint64_t, QueueTask*> whenMap)
139 {
140     auto iter = std::find_if(whenMap.cbegin(), whenMap.cend(),
141         [name](const auto& pair) { return pair.second->IsMatch(name); });
142     return iter != whenMap.cend();
143 }
144 
CreateQueue(int queueType,const ffrt_queue_attr_t * attr)145 std::unique_ptr<BaseQueue> CreateQueue(int queueType, const ffrt_queue_attr_t* attr)
146 {
147     const auto iter = CREATE_FUNC_MAP.find(queueType);
148     FFRT_COND_DO_ERR((iter == CREATE_FUNC_MAP.end()), return nullptr, "invalid queue type");
149 
150     return iter->second(attr);
151 }
152 
GetDueTaskCount()153 uint64_t BaseQueue::GetDueTaskCount()
154 {
155     std::lock_guard lock(mutex_);
156     uint64_t count = GetDueTaskCount(whenMap_);
157     if (count != 0) {
158         FFRT_LOGD("qid = %llu Current Due Task Count %llu", GetQueueId(), count);
159     }
160     return count;
161 }
162 
GetDueTaskCount(std::multimap<uint64_t,QueueTask * > & whenMap)163 uint64_t BaseQueue::GetDueTaskCount(std::multimap<uint64_t, QueueTask*>& whenMap)
164 {
165     const uint64_t& time = static_cast<uint64_t>(std::chrono::time_point_cast<std::chrono::microseconds>(
166             std::chrono::steady_clock::now()).time_since_epoch().count());
167     auto it = whenMap.lower_bound(time);
168     uint64_t count = static_cast<uint64_t>(std::distance(whenMap.begin(), it));
169     return count;
170 }
171 
GetHeadTask()172 std::vector<QueueTask*> BaseQueue::GetHeadTask()
173 {
174     std::lock_guard lock(mutex_);
175     if (whenMap_.empty()) {
176         return {};
177     }
178     headTaskVec_[0] = whenMap_.begin()->second;
179     return headTaskVec_;
180 }
181 } // namespace ffrt
182