• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eventhandler_adapter_queue.h"
17 #include <securec.h>
18 #include <sstream>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "util/time_format.h"
21 
22 namespace {
23 constexpr int MAX_DUMP_SIZE = 500;
24 constexpr uint8_t HISTORY_TASK_NUM_POWER = 32;
25 
DumpRunningTaskInfo(const char * tag,const ffrt::HistoryTask & currentRunningTask,std::ostringstream & oss)26 void DumpRunningTaskInfo(const char* tag, const ffrt::HistoryTask& currentRunningTask, std::ostringstream& oss)
27 {
28     oss << tag << " Current Running: ";
29     if (currentRunningTask.beginTime_ == std::numeric_limits<uint64_t>::max()) {
30         oss << "{}";
31     } else {
32         oss << "start at " << ffrt::FormatDateString4SteadyClock(currentRunningTask.beginTime_) << ", ";
33         oss << "Event { ";
34         oss << "send thread = " << currentRunningTask.senderKernelThreadId_;
35         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.sendTime_);
36         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.handleTime_);
37         oss << ", task name = " << currentRunningTask.taskName_;
38         oss << " }\n";
39     }
40 }
41 
DumpHistoryTaskInfo(const char * tag,const std::vector<ffrt::HistoryTask> & historyTasks,std::ostringstream & oss)42 void DumpHistoryTaskInfo(const char* tag, const std::vector<ffrt::HistoryTask>& historyTasks, std::ostringstream& oss)
43 {
44     oss << tag << " History event queue information:\n";
45     for (uint8_t i = 0; i < HISTORY_TASK_NUM_POWER; i++) {
46         auto historyTask = historyTasks[i];
47         if (historyTask.senderKernelThreadId_ == 0) {
48             continue;
49         }
50 
51         oss << tag << " No. " << (i + 1) << " : Event { ";
52         oss << "send thread = " << historyTask.senderKernelThreadId_;
53         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(historyTask.sendTime_);
54         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(historyTask.handleTime_);
55         oss << ", trigger time = " << ffrt::FormatDateString4SteadyClock(historyTask.triggerTime_);
56         oss << ", complete time = " << ffrt::FormatDateString4SteadyClock(historyTask.completeTime_);
57         oss << ", task name = " << historyTask.taskName_;
58         oss << " }\n";
59     }
60 }
61 
DumpUnexecutedTaskInfo(const char * tag,const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec,std::ostringstream & oss)62 void DumpUnexecutedTaskInfo(const char* tag,
63     const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec, std::ostringstream& oss)
64 {
65     static std::pair<ffrt_inner_queue_priority_t, std::string> priorityPairArr[] = {
66         {ffrt_inner_queue_priority_immediate, "Immediate"}, {ffrt_inner_queue_priority_high, "High"},
67         {ffrt_inner_queue_priority_low, "Low"}, {ffrt_inner_queue_priority_idle, "Idle"},
68         {ffrt_inner_queue_priority_vip, "Vip"}
69     };
70     uint32_t total = 0;
71     uint32_t dumpSize = MAX_DUMP_SIZE;
72 
73     std::multimap<ffrt_inner_queue_priority_t, ffrt::QueueTask*> priorityMap;
74     for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
75         for (auto it = whenMapVec[idx].begin(); it != whenMapVec[idx].end(); it++) {
76             priorityMap.insert({static_cast<ffrt_inner_queue_priority_t>(idx), it->second});
77         }
78     }
79 
80     auto taskDumpFun = [&](int n, ffrt::QueueTask* task) {
81         oss << tag << " No. " << n << " : Event { ";
82         oss << "send thread = " << task->fromTid;
83         oss << ", send time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime() - task->GetDelay());
84         oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime());
85         oss << ", task name = " << task->label;
86         oss << " }\n";
87         dumpSize--;
88     };
89 
90     for (auto pair : priorityPairArr) {
91         auto range = priorityMap.equal_range(pair.first);
92         oss << tag << " " << pair.second << " priority event queue information:\n";
93         int n = 0;
94         for (auto it = range.first; it != range.second; ++it) {
95             total++;
96             n++;
97             if (dumpSize > 0) {
98                 taskDumpFun(n, it->second);
99             }
100         }
101         oss << tag << " Total size of " << pair.second << " events : " << n << "\n";
102     }
103     oss << tag << " Total event size : " << total << "\n";
104 }
105 
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)106 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
107 {
108     uint64_t minTime = std::numeric_limits<uint64_t>::max();
109 
110     for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
111         if (!whenMapVec[idx].empty()) {
112             auto it = whenMapVec[idx].begin();
113             if (it->first < minTime) {
114                 minTime = it->first;
115             }
116         }
117     }
118     return minTime;
119 }
120 
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)121 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
122 {
123     for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
124         if (!whenMapVec[idx].empty()) {
125             return false;
126         }
127     }
128     return true;
129 }
130 }
131 
132 namespace ffrt {
EventHandlerAdapterQueue()133 EventHandlerAdapterQueue::EventHandlerAdapterQueue() : EventHandlerInteractiveQueue()
134 {
135     dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleAgainstStarvation;
136     historyTasks_ = std::vector<HistoryTask>(HISTORY_TASK_NUM_POWER);
137     pulledTaskCount_ = std::vector<int>(ffrt_inner_queue_priority_idle + 1, 0);
138 }
139 
~EventHandlerAdapterQueue()140 EventHandlerAdapterQueue::~EventHandlerAdapterQueue()
141 {
142     FFRT_LOGD("destruct eventhandler adapter queueId=%u leave", queueId_);
143 }
144 
Stop()145 void EventHandlerAdapterQueue::Stop()
146 {
147     std::lock_guard lock(mutex_);
148     for (auto& currentMap : whenMapVec_) {
149         BaseQueue::Stop(currentMap);
150     }
151     FFRT_LOGI("clear [queueId=%u] succ", queueId_);
152 }
153 
Push(QueueTask * task)154 int EventHandlerAdapterQueue::Push(QueueTask* task)
155 {
156     std::lock_guard lock(mutex_);
157     FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
158 
159     int taskPriority = task->GetPriority();
160     if (!isActiveState_.load()) {
161         pulledTaskCount_[taskPriority]++;
162         isActiveState_.store(true);
163         return INACTIVE;
164     }
165 
166     if (task->InsertHead()) {
167         std::multimap<uint64_t, QueueTask*> tmpWhenMap {{0, task}};
168         tmpWhenMap.insert(whenMapVec_[taskPriority].begin(), whenMapVec_[taskPriority].end());
169         whenMapVec_[taskPriority].swap(tmpWhenMap);
170     } else {
171         whenMapVec_[taskPriority].insert({task->GetUptime(), task});
172     }
173     if (task == whenMapVec_[taskPriority].begin()->second) {
174         cond_.notify_one();
175     }
176 
177     return SUCC;
178 }
179 
Pull()180 QueueTask* EventHandlerAdapterQueue::Pull()
181 {
182     std::unique_lock lock(mutex_);
183     // wait for delay task
184     uint64_t now = GetNow();
185     uint64_t minMaptime = GetMinMapTime(whenMapVec_);
186     while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
187         uint64_t diff = minMaptime - now;
188         FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
189         delayStatus_.store(true);
190         cond_.wait_for(lock, std::chrono::microseconds(diff));
191         delayStatus_.store(false);
192         FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
193         now = GetNow();
194         minMaptime = GetMinMapTime(whenMapVec_);
195     }
196 
197     // abort dequeue in abnormal scenarios
198     if (WhenMapVecEmpty(whenMapVec_)) {
199         FFRT_LOGD("[queueId=%u] switch into inactive", queueId_);
200         isActiveState_.store(false);
201         return nullptr;
202     }
203     FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
204 
205     // dequeue due tasks in batch
206     return dequeFunc_(queueId_, now, whenMapVec_, &pulledTaskCount_);
207 }
208 
Remove()209 int EventHandlerAdapterQueue::Remove()
210 {
211     std::lock_guard lock(mutex_);
212     int count = 0;
213     for (auto& currentMap : whenMapVec_) {
214         count += BaseQueue::Remove(currentMap);
215     }
216     return count;
217 }
218 
Remove(const char * name)219 int EventHandlerAdapterQueue::Remove(const char* name)
220 {
221     std::lock_guard lock(mutex_);
222     int count = 0;
223     for (auto& currentMap : whenMapVec_) {
224         count += BaseQueue::Remove(name, currentMap);
225     }
226     return count;
227 }
228 
Remove(const QueueTask * task)229 int EventHandlerAdapterQueue::Remove(const QueueTask* task)
230 {
231     std::lock_guard lock(mutex_);
232     int count = 0;
233     for (auto& currentMap : whenMapVec_) {
234         count += BaseQueue::Remove(task, currentMap);
235     }
236     return count;
237 }
238 
HasTask(const char * name)239 bool EventHandlerAdapterQueue::HasTask(const char* name)
240 {
241     std::lock_guard lock(mutex_);
242     for (auto& currentMap : whenMapVec_) {
243         if (BaseQueue::HasTask(name, currentMap)) {
244             return true;
245         }
246     }
247     return false;
248 }
249 
IsIdle()250 bool EventHandlerAdapterQueue::IsIdle()
251 {
252     std::lock_guard lock(mutex_);
253 
254     for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
255         if (!whenMapVec_[idx].empty()) {
256             return false;
257         }
258     }
259     return true;
260 }
261 
GetDueTaskCount()262 uint64_t EventHandlerAdapterQueue::GetDueTaskCount()
263 {
264     std::lock_guard lock(mutex_);
265     uint64_t count = 0;
266     for (auto& currentMap : whenMapVec_) {
267         count += BaseQueue::GetDueTaskCount(currentMap);
268     }
269     if (count != 0) {
270         FFRT_LOGI("qid = %llu Current eh queueDue Task Count %u", GetQueueId(), count);
271     }
272     return count;
273 }
274 
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)275 int EventHandlerAdapterQueue::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
276 {
277     std::lock_guard lock(mutex_);
278     std::ostringstream oss;
279     if (historyInfo) {
280         DumpRunningTaskInfo(tag, currentRunningTask_, oss);
281         DumpHistoryTaskInfo(tag, historyTasks_, oss);
282     }
283     DumpUnexecutedTaskInfo(tag, whenMapVec_, oss);
284     return snprintf_s(buf, len, len - 1, "%s", oss.str().c_str());
285 }
286 
DumpSize(ffrt_inner_queue_priority_t priority)287 int EventHandlerAdapterQueue::DumpSize(ffrt_inner_queue_priority_t priority)
288 {
289     std::lock_guard lock(mutex_);
290     return static_cast<int>(whenMapVec_[priority].size());
291 }
292 
SetCurrentRunningTask(QueueTask * task)293 void EventHandlerAdapterQueue::SetCurrentRunningTask(QueueTask* task)
294 {
295     currentRunningTask_ = HistoryTask(GetNow(), task);
296 }
297 
PushHistoryTask(QueueTask * task,uint64_t triggerTime,uint64_t completeTime)298 void EventHandlerAdapterQueue::PushHistoryTask(QueueTask* task, uint64_t triggerTime, uint64_t completeTime)
299 {
300     HistoryTask historyTask;
301     historyTask.senderKernelThreadId_ = task->fromTid;
302     historyTask.taskName_ = task->label;
303     historyTask.sendTime_ = task->GetUptime() - task->GetDelay();
304     historyTask.handleTime_ = task->GetUptime();
305     historyTask.triggerTime_ = triggerTime;
306     historyTask.completeTime_ = completeTime;
307     historyTasks_[historyTaskIndex_.fetch_add(1) & (HISTORY_TASK_NUM_POWER - 1)] = historyTask;
308 }
309 
CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t * attr)310 std::unique_ptr<BaseQueue> CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t* attr)
311 {
312     (void)attr;
313     return std::make_unique<EventHandlerAdapterQueue>();
314 }
315 } // namespace ffrt
316