1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eventhandler_adapter_queue.h"
17 #include <securec.h>
18 #include <sstream>
19 #include "dfx/log/ffrt_log_api.h"
20 #include "util/time_format.h"
21
22 namespace {
23 constexpr int MAX_DUMP_SIZE = 500;
24 constexpr uint8_t HISTORY_TASK_NUM_POWER = 32;
25
DumpRunningTaskInfo(const char * tag,const ffrt::HistoryTask & currentRunningTask,std::ostringstream & oss)26 void DumpRunningTaskInfo(const char* tag, const ffrt::HistoryTask& currentRunningTask, std::ostringstream& oss)
27 {
28 oss << tag << " Current Running: ";
29 if (currentRunningTask.beginTime_ == std::numeric_limits<uint64_t>::max()) {
30 oss << "{}";
31 } else {
32 oss << "start at " << ffrt::FormatDateString4SteadyClock(currentRunningTask.beginTime_) << ", ";
33 oss << "Event { ";
34 oss << "send thread = " << currentRunningTask.senderKernelThreadId_;
35 oss << ", send time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.sendTime_);
36 oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(currentRunningTask.handleTime_);
37 oss << ", task name = " << currentRunningTask.taskName_;
38 oss << " }\n";
39 }
40 }
41
DumpHistoryTaskInfo(const char * tag,const std::vector<ffrt::HistoryTask> & historyTasks,std::ostringstream & oss)42 void DumpHistoryTaskInfo(const char* tag, const std::vector<ffrt::HistoryTask>& historyTasks, std::ostringstream& oss)
43 {
44 oss << tag << " History event queue information:\n";
45 for (uint8_t i = 0; i < HISTORY_TASK_NUM_POWER; i++) {
46 auto historyTask = historyTasks[i];
47 if (historyTask.senderKernelThreadId_ == 0) {
48 continue;
49 }
50
51 oss << tag << " No. " << (i + 1) << " : Event { ";
52 oss << "send thread = " << historyTask.senderKernelThreadId_;
53 oss << ", send time = " << ffrt::FormatDateString4SteadyClock(historyTask.sendTime_);
54 oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(historyTask.handleTime_);
55 oss << ", trigger time = " << ffrt::FormatDateString4SteadyClock(historyTask.triggerTime_);
56 oss << ", complete time = " << ffrt::FormatDateString4SteadyClock(historyTask.completeTime_);
57 oss << ", task name = " << historyTask.taskName_;
58 oss << " }\n";
59 }
60 }
61
DumpUnexecutedTaskInfo(const char * tag,const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec,std::ostringstream & oss)62 void DumpUnexecutedTaskInfo(const char* tag,
63 const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec, std::ostringstream& oss)
64 {
65 static std::pair<ffrt_inner_queue_priority_t, std::string> priorityPairArr[] = {
66 {ffrt_inner_queue_priority_immediate, "Immediate"}, {ffrt_inner_queue_priority_high, "High"},
67 {ffrt_inner_queue_priority_low, "Low"}, {ffrt_inner_queue_priority_idle, "Idle"},
68 {ffrt_inner_queue_priority_vip, "Vip"}
69 };
70 uint32_t total = 0;
71 uint32_t dumpSize = MAX_DUMP_SIZE;
72
73 std::multimap<ffrt_inner_queue_priority_t, ffrt::QueueTask*> priorityMap;
74 for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
75 for (auto it = whenMapVec[idx].begin(); it != whenMapVec[idx].end(); it++) {
76 priorityMap.insert({static_cast<ffrt_inner_queue_priority_t>(idx), it->second});
77 }
78 }
79
80 auto taskDumpFun = [&](int n, ffrt::QueueTask* task) {
81 oss << tag << " No. " << n << " : Event { ";
82 oss << "send thread = " << task->fromTid;
83 oss << ", send time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime() - task->GetDelay());
84 oss << ", handle time = " << ffrt::FormatDateString4SteadyClock(task->GetUptime());
85 oss << ", task name = " << task->label;
86 oss << " }\n";
87 dumpSize--;
88 };
89
90 for (auto pair : priorityPairArr) {
91 auto range = priorityMap.equal_range(pair.first);
92 oss << tag << " " << pair.second << " priority event queue information:\n";
93 int n = 0;
94 for (auto it = range.first; it != range.second; ++it) {
95 total++;
96 n++;
97 if (dumpSize > 0) {
98 taskDumpFun(n, it->second);
99 }
100 }
101 oss << tag << " Total size of " << pair.second << " events : " << n << "\n";
102 }
103 oss << tag << " Total event size : " << total << "\n";
104 }
105
GetMinMapTime(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)106 uint64_t GetMinMapTime(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
107 {
108 uint64_t minTime = std::numeric_limits<uint64_t>::max();
109
110 for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
111 if (!whenMapVec[idx].empty()) {
112 auto it = whenMapVec[idx].begin();
113 if (it->first < minTime) {
114 minTime = it->first;
115 }
116 }
117 }
118 return minTime;
119 }
120
WhenMapVecEmpty(const std::multimap<uint64_t,ffrt::QueueTask * > * whenMapVec)121 bool WhenMapVecEmpty(const std::multimap<uint64_t, ffrt::QueueTask*>* whenMapVec)
122 {
123 for (int idx = 0; idx <= ffrt_inner_queue_priority_idle; idx++) {
124 if (!whenMapVec[idx].empty()) {
125 return false;
126 }
127 }
128 return true;
129 }
130 }
131
132 namespace ffrt {
EventHandlerAdapterQueue()133 EventHandlerAdapterQueue::EventHandlerAdapterQueue() : EventHandlerInteractiveQueue()
134 {
135 dequeFunc_ = QueueStrategy<QueueTask>::DequeSingleAgainstStarvation;
136 historyTasks_ = std::vector<HistoryTask>(HISTORY_TASK_NUM_POWER);
137 pulledTaskCount_ = std::vector<int>(ffrt_inner_queue_priority_idle + 1, 0);
138 }
139
~EventHandlerAdapterQueue()140 EventHandlerAdapterQueue::~EventHandlerAdapterQueue()
141 {
142 FFRT_LOGI("destruct eventhandler adapter queueId=%u leave", queueId_);
143 }
144
Stop()145 void EventHandlerAdapterQueue::Stop()
146 {
147 std::unique_lock lock(mutex_);
148 for (auto& currentMap : whenMapVec_) {
149 BaseQueue::Stop(currentMap);
150 }
151 FFRT_LOGI("clear [queueId=%u] succ", queueId_);
152 }
153
Push(QueueTask * task)154 int EventHandlerAdapterQueue::Push(QueueTask* task)
155 {
156 std::unique_lock lock(mutex_);
157 FFRT_COND_DO_ERR(isExit_, return FAILED, "cannot push task, [queueId=%u] is exiting", queueId_);
158
159 int taskPriority = task->GetPriority();
160 if (!isActiveState_.load()) {
161 pulledTaskCount_[taskPriority]++;
162 isActiveState_.store(true);
163 return INACTIVE;
164 }
165
166 if (task->InsertHead()) {
167 std::multimap<uint64_t, QueueTask*> tmpWhenMap {{0, task}};
168 tmpWhenMap.insert(whenMapVec_[taskPriority].begin(), whenMapVec_[taskPriority].end());
169 whenMapVec_[taskPriority].swap(tmpWhenMap);
170 } else {
171 whenMapVec_[taskPriority].insert({task->GetUptime(), task});
172 }
173 if (task == whenMapVec_[taskPriority].begin()->second) {
174 cond_.notify_one();
175 }
176
177 return SUCC;
178 }
179
Pull()180 QueueTask* EventHandlerAdapterQueue::Pull()
181 {
182 std::unique_lock lock(mutex_);
183 // wait for delay task
184 uint64_t now = GetNow();
185 uint64_t minMaptime = GetMinMapTime(whenMapVec_);
186 while (!WhenMapVecEmpty(whenMapVec_) && now < minMaptime && !isExit_) {
187 uint64_t diff = minMaptime - now;
188 FFRT_LOGD("[queueId=%u] stuck in %llu us wait", queueId_, diff);
189 cond_.wait_for(lock, std::chrono::microseconds(diff));
190 FFRT_LOGD("[queueId=%u] wakeup from wait", queueId_);
191 now = GetNow();
192 minMaptime = GetMinMapTime(whenMapVec_);
193 }
194
195 // abort dequeue in abnormal scenarios
196 if (WhenMapVecEmpty(whenMapVec_)) {
197 FFRT_LOGD("[queueId=%u] switch into inactive", queueId_);
198 isActiveState_.store(false);
199 return nullptr;
200 }
201 FFRT_COND_DO_ERR(isExit_, return nullptr, "cannot pull task, [queueId=%u] is exiting", queueId_);
202
203 // dequeue due tasks in batch
204 return dequeFunc_(queueId_, now, whenMapVec_, &pulledTaskCount_);
205 }
206
Remove()207 void EventHandlerAdapterQueue::Remove()
208 {
209 std::unique_lock lock(mutex_);
210 for (auto& currentMap : whenMapVec_) {
211 BaseQueue::Remove(currentMap);
212 }
213 }
214
Remove(const char * name)215 int EventHandlerAdapterQueue::Remove(const char* name)
216 {
217 std::unique_lock lock(mutex_);
218 int count = 0;
219 for (auto& currentMap : whenMapVec_) {
220 count += BaseQueue::Remove(name, currentMap);
221 }
222 return count;
223 }
224
Remove(const QueueTask * task)225 int EventHandlerAdapterQueue::Remove(const QueueTask* task)
226 {
227 std::unique_lock lock(mutex_);
228 int count = 0;
229 for (auto& currentMap : whenMapVec_) {
230 count += BaseQueue::Remove(task, currentMap);
231 }
232 return count;
233 }
234
HasTask(const char * name)235 bool EventHandlerAdapterQueue::HasTask(const char* name)
236 {
237 std::unique_lock lock(mutex_);
238 for (auto& currentMap : whenMapVec_) {
239 if (BaseQueue::HasTask(name, currentMap)) {
240 return true;
241 }
242 }
243 return false;
244 }
245
IsIdle()246 bool EventHandlerAdapterQueue::IsIdle()
247 {
248 std::unique_lock lock(mutex_);
249
250 for (int idx = 0; idx <= ffrt_queue_priority_idle; idx++) {
251 if (!whenMapVec_[idx].empty()) {
252 return false;
253 }
254 }
255 return true;
256 }
257
Dump(const char * tag,char * buf,uint32_t len,bool historyInfo)258 int EventHandlerAdapterQueue::Dump(const char* tag, char* buf, uint32_t len, bool historyInfo)
259 {
260 std::unique_lock lock(mutex_);
261 std::ostringstream oss;
262 if (historyInfo) {
263 DumpRunningTaskInfo(tag, currentRunningTask_, oss);
264 DumpHistoryTaskInfo(tag, historyTasks_, oss);
265 }
266 DumpUnexecutedTaskInfo(tag, whenMapVec_, oss);
267 return snprintf_s(buf, len, len - 1, "%s", oss.str().c_str());
268 }
269
DumpSize(ffrt_inner_queue_priority_t priority)270 int EventHandlerAdapterQueue::DumpSize(ffrt_inner_queue_priority_t priority)
271 {
272 std::unique_lock lock(mutex_);
273 return static_cast<int>(whenMapVec_[priority].size());
274 }
275
SetCurrentRunningTask(QueueTask * task)276 void EventHandlerAdapterQueue::SetCurrentRunningTask(QueueTask* task)
277 {
278 currentRunningTask_ = HistoryTask(GetNow(), task);
279 }
280
PushHistoryTask(QueueTask * task,uint64_t triggerTime,uint64_t completeTime)281 void EventHandlerAdapterQueue::PushHistoryTask(QueueTask* task, uint64_t triggerTime, uint64_t completeTime)
282 {
283 HistoryTask historyTask;
284 historyTask.senderKernelThreadId_ = task->fromTid;
285 historyTask.taskName_ = task->label;
286 historyTask.sendTime_ = task->GetUptime() - task->GetDelay();
287 historyTask.handleTime_ = task->GetUptime();
288 historyTask.triggerTime_ = triggerTime;
289 historyTask.completeTime_ = completeTime;
290 historyTasks_[historyTaskIndex_.fetch_add(1) & (HISTORY_TASK_NUM_POWER - 1)] = historyTask;
291 }
292
CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t * attr)293 std::unique_ptr<BaseQueue> CreateEventHandlerAdapterQueue(const ffrt_queue_attr_t* attr)
294 {
295 (void)attr;
296 return std::make_unique<EventHandlerAdapterQueue>();
297 }
298 } // namespace ffrt
299