1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_queue.h"
17
18 #include <algorithm>
19
20 #include "epoll_io_waiter.h"
21 #include "event_handler.h"
22 #include "event_handler_utils.h"
23 #include "none_io_waiter.h"
24
25 DEFINE_HILOG_LABEL("EventQueue");
26
27 namespace OHOS {
28 namespace AppExecFwk {
29 namespace {
30 // Help to insert events into the event queue sorted by handle time.
InsertEventsLocked(std::list<InnerEvent::Pointer> & events,InnerEvent::Pointer & event)31 inline void InsertEventsLocked(std::list<InnerEvent::Pointer> &events, InnerEvent::Pointer &event)
32 {
33 auto f = [](const InnerEvent::Pointer &first, const InnerEvent::Pointer &second) {
34 if (!first || !second) {
35 return false;
36 }
37 return first->GetHandleTime() < second->GetHandleTime();
38 };
39 auto it = std::upper_bound(events.begin(), events.end(), event, f);
40 events.insert(it, std::move(event));
41 }
42
43 // Help to remove file descriptor listeners.
44 template<typename T>
RemoveFileDescriptorListenerLocked(std::map<int32_t,std::shared_ptr<FileDescriptorListener>> & listeners,const std::shared_ptr<IoWaiter> & ioWaiter,const T & filter)45 void RemoveFileDescriptorListenerLocked(std::map<int32_t, std::shared_ptr<FileDescriptorListener>> &listeners,
46 const std::shared_ptr<IoWaiter> &ioWaiter, const T &filter)
47 {
48 if (!ioWaiter) {
49 return;
50 }
51 for (auto it = listeners.begin(); it != listeners.end();) {
52 if (filter(it->second)) {
53 ioWaiter->RemoveFileDescriptor(it->first);
54 it = listeners.erase(it);
55 } else {
56 ++it;
57 }
58 }
59 }
60
61 // Help to check whether there is a valid event in list and update wake up time.
CheckEventInListLocked(const std::list<InnerEvent::Pointer> & events,const InnerEvent::TimePoint & now,InnerEvent::TimePoint & nextWakeUpTime)62 inline bool CheckEventInListLocked(const std::list<InnerEvent::Pointer> &events, const InnerEvent::TimePoint &now,
63 InnerEvent::TimePoint &nextWakeUpTime)
64 {
65 if (!events.empty()) {
66 const auto &handleTime = events.front()->GetHandleTime();
67 if (handleTime < nextWakeUpTime) {
68 nextWakeUpTime = handleTime;
69 return handleTime <= now;
70 }
71 }
72
73 return false;
74 }
75
PopFrontEventFromListLocked(std::list<InnerEvent::Pointer> & events)76 inline InnerEvent::Pointer PopFrontEventFromListLocked(std::list<InnerEvent::Pointer> &events)
77 {
78 InnerEvent::Pointer event = std::move(events.front());
79 events.pop_front();
80 return event;
81 }
82 } // unnamed namespace
83
EventQueue()84 EventQueue::EventQueue() : ioWaiter_(std::make_shared<NoneIoWaiter>())
85 {}
86
EventQueue(const std::shared_ptr<IoWaiter> & ioWaiter)87 EventQueue::EventQueue(const std::shared_ptr<IoWaiter> &ioWaiter)
88 : ioWaiter_(ioWaiter ? ioWaiter : std::make_shared<NoneIoWaiter>())
89 {
90 if (ioWaiter_->SupportListeningFileDescriptor()) {
91 // Set callback to handle events from file descriptors.
92 ioWaiter_->SetFileDescriptorEventCallback(
93 std::bind(&EventQueue::HandleFileDescriptorEvent, this, std::placeholders::_1, std::placeholders::_2));
94 }
95 }
96
Insert(InnerEvent::Pointer & event,Priority priority)97 void EventQueue::Insert(InnerEvent::Pointer &event, Priority priority)
98 {
99 if (!event) {
100 HILOGE("Insert: Could not insert an invalid event");
101 return;
102 }
103
104 std::lock_guard<std::mutex> lock(queueLock_);
105 bool needNotify = false;
106 switch (priority) {
107 case Priority::IMMEDIATE:
108 case Priority::HIGH:
109 case Priority::LOW: {
110 needNotify = (event->GetHandleTime() < wakeUpTime_);
111 InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event);
112 break;
113 }
114 case Priority::IDLE: {
115 // Never wake up thread if insert an idle event.
116 InsertEventsLocked(idleEvents_, event);
117 break;
118 }
119 default:
120 break;
121 }
122
123 if (needNotify) {
124 ioWaiter_->NotifyOne();
125 }
126 }
127
RemoveOrphan()128 void EventQueue::RemoveOrphan()
129 {
130 // Remove all events which lost its owner.
131 auto filter = [](const InnerEvent::Pointer &p) { return !p->GetOwner(); };
132
133 Remove(filter);
134
135 // Remove all listeners which lost its owner.
136 auto listenerFilter = [](const std::shared_ptr<FileDescriptorListener> &listener) {
137 if (!listener) {
138 return true;
139 }
140 return !listener->GetOwner();
141 };
142
143 std::lock_guard<std::mutex> lock(queueLock_);
144 RemoveFileDescriptorListenerLocked(listeners_, ioWaiter_, listenerFilter);
145 }
146
Remove(const std::shared_ptr<EventHandler> & owner)147 void EventQueue::Remove(const std::shared_ptr<EventHandler> &owner)
148 {
149 if (!owner) {
150 HILOGE("Remove: Invalid owner");
151 return;
152 }
153
154 auto filter = [&owner](const InnerEvent::Pointer &p) { return (p->GetOwner() == owner); };
155
156 Remove(filter);
157 }
158
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)159 void EventQueue::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
160 {
161 if (!owner) {
162 HILOGE("Remove: Invalid owner");
163 return;
164 }
165
166 auto filter = [&owner, innerEventId](const InnerEvent::Pointer &p) {
167 return (!p->HasTask()) && (p->GetOwner() == owner) && (p->GetInnerEventId() == innerEventId);
168 };
169
170 Remove(filter);
171 }
172
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId,int64_t param)173 void EventQueue::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId, int64_t param)
174 {
175 if (!owner) {
176 HILOGE("Remove: Invalid owner");
177 return;
178 }
179
180 auto filter = [&owner, innerEventId, param](const InnerEvent::Pointer &p) {
181 return (!p->HasTask()) && (p->GetOwner() == owner) && (p->GetInnerEventId() == innerEventId) &&
182 (p->GetParam() == param);
183 };
184
185 Remove(filter);
186 }
187
Remove(const std::shared_ptr<EventHandler> & owner,const std::string & name)188 void EventQueue::Remove(const std::shared_ptr<EventHandler> &owner, const std::string &name)
189 {
190 if ((!owner) || (name.empty())) {
191 HILOGE("Remove: Invalid owner or task name");
192 return;
193 }
194
195 auto filter = [&owner, &name](const InnerEvent::Pointer &p) {
196 return (p->HasTask()) && (p->GetOwner() == owner) && (p->GetTaskName() == name);
197 };
198
199 Remove(filter);
200 }
201
Remove(const RemoveFilter & filter)202 void EventQueue::Remove(const RemoveFilter &filter)
203 {
204 std::lock_guard<std::mutex> lock(queueLock_);
205 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
206 subEventQueues_[i].queue.remove_if(filter);
207 }
208 idleEvents_.remove_if(filter);
209 }
210
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)211 bool EventQueue::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
212 {
213 if (!owner) {
214 HILOGE("HasInnerEvent: Invalid owner");
215 return false;
216 }
217 auto filter = [&owner, innerEventId](const InnerEvent::Pointer &p) {
218 return (!p->HasTask()) && (p->GetOwner() == owner) && (p->GetInnerEventId() == innerEventId);
219 };
220 return HasInnerEvent(filter);
221 }
222
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,int64_t param)223 bool EventQueue::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, int64_t param)
224 {
225 if (!owner) {
226 HILOGE("HasInnerEvent: Invalid owner");
227 return false;
228 }
229 auto filter = [&owner, param](const InnerEvent::Pointer &p) {
230 return (!p->HasTask()) && (p->GetOwner() == owner) && (p->GetParam() == param);
231 };
232 return HasInnerEvent(filter);
233 }
234
HasInnerEvent(const HasFilter & filter)235 bool EventQueue::HasInnerEvent(const HasFilter &filter)
236 {
237 std::lock_guard<std::mutex> lock(queueLock_);
238 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
239 std::list<InnerEvent::Pointer>::iterator iter =
240 std::find_if(subEventQueues_[i].queue.begin(), subEventQueues_[i].queue.end(), filter);
241 if (iter != subEventQueues_[i].queue.end()) {
242 return true;
243 }
244 }
245 std::list<InnerEvent::Pointer>::iterator iter = std::find_if(idleEvents_.begin(), idleEvents_.end(), filter);
246 return iter != idleEvents_.end();
247 }
248
PickEventLocked(const InnerEvent::TimePoint & now,InnerEvent::TimePoint & nextWakeUpTime)249 InnerEvent::Pointer EventQueue::PickEventLocked(const InnerEvent::TimePoint &now, InnerEvent::TimePoint &nextWakeUpTime)
250 {
251 uint32_t priorityIndex = SUB_EVENT_QUEUE_NUM;
252 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
253 // Check whether any event need to be distributed.
254 if (!CheckEventInListLocked(subEventQueues_[i].queue, now, nextWakeUpTime)) {
255 continue;
256 }
257
258 // Check whether any event in higher priority need to be distributed.
259 if (priorityIndex < SUB_EVENT_QUEUE_NUM) {
260 SubEventQueue &subQueue = subEventQueues_[priorityIndex];
261 // Check whether enough events in higher priority queue are handled continuously.
262 if (subQueue.handledEventsCount < subQueue.maxHandledEventsCount) {
263 subQueue.handledEventsCount++;
264 break;
265 }
266 }
267
268 // Try to pick event from this queue.
269 priorityIndex = i;
270 }
271
272 if (priorityIndex >= SUB_EVENT_QUEUE_NUM) {
273 // If not found any event to distribute, return nullptr.
274 return InnerEvent::Pointer(nullptr, nullptr);
275 }
276
277 // Reset handled event count for sub event queues in higher priority.
278 for (uint32_t i = 0; i < priorityIndex; ++i) {
279 subEventQueues_[i].handledEventsCount = 0;
280 }
281
282 return PopFrontEventFromListLocked(subEventQueues_[priorityIndex].queue);
283 }
284
GetExpiredEventLocked(InnerEvent::TimePoint & nextExpiredTime)285 InnerEvent::Pointer EventQueue::GetExpiredEventLocked(InnerEvent::TimePoint &nextExpiredTime)
286 {
287 auto now = InnerEvent::Clock::now();
288 wakeUpTime_ = InnerEvent::TimePoint::max();
289 // Find an event which could be distributed right now.
290 InnerEvent::Pointer event = PickEventLocked(now, wakeUpTime_);
291 if (event) {
292 // Exit idle mode, if found an event to distribute.
293 isIdle_ = false;
294 return event;
295 }
296
297 // If found nothing, enter idle mode and make a time stamp.
298 if (!isIdle_) {
299 idleTimeStamp_ = now;
300 isIdle_ = true;
301 }
302
303 if (!idleEvents_.empty()) {
304 const auto &idleEvent = idleEvents_.front();
305
306 // Return the idle event that has been sent before time stamp and reaches its handle time.
307 if ((idleEvent->GetSendTime() <= idleTimeStamp_) && (idleEvent->GetHandleTime() <= now)) {
308 return PopFrontEventFromListLocked(idleEvents_);
309 }
310 }
311
312 // Update wake up time.
313 nextExpiredTime = wakeUpTime_;
314 return InnerEvent::Pointer(nullptr, nullptr);
315 }
316
GetEvent()317 InnerEvent::Pointer EventQueue::GetEvent()
318 {
319 std::unique_lock<std::mutex> lock(queueLock_);
320 while (!finished_) {
321 InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max();
322 InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime);
323 if (event) {
324 return event;
325 }
326 WaitUntilLocked(nextWakeUpTime, lock);
327 }
328
329 HILOGD("GetEvent: Break out");
330 return InnerEvent::Pointer(nullptr, nullptr);
331 }
332
GetExpiredEvent(InnerEvent::TimePoint & nextExpiredTime)333 InnerEvent::Pointer EventQueue::GetExpiredEvent(InnerEvent::TimePoint &nextExpiredTime)
334 {
335 std::unique_lock<std::mutex> lock(queueLock_);
336 return GetExpiredEventLocked(nextExpiredTime);
337 }
338
AddFileDescriptorListener(int32_t fileDescriptor,uint32_t events,const std::shared_ptr<FileDescriptorListener> & listener)339 ErrCode EventQueue::AddFileDescriptorListener(
340 int32_t fileDescriptor, uint32_t events, const std::shared_ptr<FileDescriptorListener> &listener)
341 {
342 if ((fileDescriptor < 0) || ((events & FILE_DESCRIPTOR_EVENTS_MASK) == 0) || (!listener)) {
343 HILOGE("AddFileDescriptorListener(%{public}d, %{public}u, %{public}s): Invalid parameter",
344 fileDescriptor,
345 events,
346 listener ? "valid" : "null");
347 return EVENT_HANDLER_ERR_INVALID_PARAM;
348 }
349
350 std::lock_guard<std::mutex> lock(queueLock_);
351 auto it = listeners_.find(fileDescriptor);
352 if (it != listeners_.end()) {
353 HILOGE("AddFileDescriptorListener: File descriptor %{public}d is already in listening", fileDescriptor);
354 return EVENT_HANDLER_ERR_FD_ALREADY;
355 }
356
357 if (!EnsureIoWaiterSupportListerningFileDescriptorLocked()) {
358 return EVENT_HANDLER_ERR_FD_NOT_SUPPORT;
359 }
360
361 if (!ioWaiter_->AddFileDescriptor(fileDescriptor, events)) {
362 HILOGE("AddFileDescriptorListener: Failed to add file descriptor into IO waiter");
363 return EVENT_HANDLER_ERR_FD_FAILED;
364 }
365
366 listeners_.emplace(fileDescriptor, listener);
367 return ERR_OK;
368 }
369
RemoveFileDescriptorListener(const std::shared_ptr<EventHandler> & owner)370 void EventQueue::RemoveFileDescriptorListener(const std::shared_ptr<EventHandler> &owner)
371 {
372 if (!owner) {
373 HILOGE("RemoveFileDescriptorListener: Invalid owner");
374 return;
375 }
376
377 auto listenerFilter = [&owner](const std::shared_ptr<FileDescriptorListener> &listener) {
378 if (!listener) {
379 return false;
380 }
381 return listener->GetOwner() == owner;
382 };
383
384 std::lock_guard<std::mutex> lock(queueLock_);
385 RemoveFileDescriptorListenerLocked(listeners_, ioWaiter_, listenerFilter);
386 }
387
RemoveFileDescriptorListener(int32_t fileDescriptor)388 void EventQueue::RemoveFileDescriptorListener(int32_t fileDescriptor)
389 {
390 if (fileDescriptor < 0) {
391 HILOGE("RemoveFileDescriptorListener(%{public}d): Invalid file descriptor", fileDescriptor);
392 return;
393 }
394
395 std::lock_guard<std::mutex> lock(queueLock_);
396 if (listeners_.erase(fileDescriptor) > 0) {
397 ioWaiter_->RemoveFileDescriptor(fileDescriptor);
398 }
399 }
400
Prepare()401 void EventQueue::Prepare()
402 {
403 std::lock_guard<std::mutex> lock(queueLock_);
404 finished_ = false;
405 }
406
Finish()407 void EventQueue::Finish()
408 {
409 std::lock_guard<std::mutex> lock(queueLock_);
410 finished_ = true;
411 ioWaiter_->NotifyAll();
412 }
413
WaitUntilLocked(const InnerEvent::TimePoint & when,std::unique_lock<std::mutex> & lock)414 void EventQueue::WaitUntilLocked(const InnerEvent::TimePoint &when, std::unique_lock<std::mutex> &lock)
415 {
416 // Get a temp reference of IO waiter, otherwise it maybe released while waiting.
417 auto ioWaiterHolder = ioWaiter_;
418 if (!ioWaiterHolder->WaitFor(lock, TimePointToTimeOut(when))) {
419 HILOGE("WaitUntilLocked: Failed to call wait, reset IO waiter");
420 ioWaiter_ = std::make_shared<NoneIoWaiter>();
421 listeners_.clear();
422 }
423 }
424
HandleFileDescriptorEvent(int32_t fileDescriptor,uint32_t events)425 void EventQueue::HandleFileDescriptorEvent(int32_t fileDescriptor, uint32_t events)
426 {
427 std::shared_ptr<FileDescriptorListener> listener;
428
429 {
430 std::lock_guard<std::mutex> lock(queueLock_);
431 auto it = listeners_.find(fileDescriptor);
432 if (it == listeners_.end()) {
433 HILOGW("HandleFileDescriptorEvent: Can not found listener, maybe it is removed");
434 return;
435 }
436
437 // Hold instance of listener.
438 listener = it->second;
439 if (!listener) {
440 return;
441 }
442 }
443
444 auto handler = listener->GetOwner();
445 if (!handler) {
446 HILOGW("HandleFileDescriptorEvent: Owner of listener is released");
447 return;
448 }
449
450 std::weak_ptr<FileDescriptorListener> wp = listener;
451 auto f = [fileDescriptor, events, wp]() {
452 auto listener = wp.lock();
453 if (!listener) {
454 HILOGW("HandleFileDescriptorEvent-Lambda: Listener is released");
455 return;
456 }
457
458 if ((events & FILE_DESCRIPTOR_INPUT_EVENT) != 0) {
459 listener->OnReadable(fileDescriptor);
460 }
461
462 if ((events & FILE_DESCRIPTOR_OUTPUT_EVENT) != 0) {
463 listener->OnWritable(fileDescriptor);
464 }
465
466 if ((events & FILE_DESCRIPTOR_SHUTDOWN_EVENT) != 0) {
467 listener->OnShutdown(fileDescriptor);
468 }
469
470 if ((events & FILE_DESCRIPTOR_EXCEPTION_EVENT) != 0) {
471 listener->OnException(fileDescriptor);
472 }
473 };
474
475 // Post a high priority task to handle file descriptor events.
476 handler->PostHighPriorityTask(f);
477 }
478
EnsureIoWaiterSupportListerningFileDescriptorLocked()479 bool EventQueue::EnsureIoWaiterSupportListerningFileDescriptorLocked()
480 {
481 if (ioWaiter_->SupportListeningFileDescriptor()) {
482 return true;
483 }
484
485 auto newIoWaiter = std::make_shared<EpollIoWaiter>();
486 if (!newIoWaiter->Init()) {
487 HILOGE("EnsureIoWaiterSupportListerningFileDescriptorLocked: Failed to initialize epoll");
488 return false;
489 }
490
491 // Set callback to handle events from file descriptors.
492 newIoWaiter->SetFileDescriptorEventCallback(
493 std::bind(&EventQueue::HandleFileDescriptorEvent, this, std::placeholders::_1, std::placeholders::_2));
494
495 ioWaiter_->NotifyAll();
496 ioWaiter_ = newIoWaiter;
497 return true;
498 }
499
Dump(Dumper & dumper)500 void EventQueue::Dump(Dumper &dumper)
501 {
502 std::lock_guard<std::mutex> lock(queueLock_);
503 std::string priority[] = {"Immediate", "High", "Low"};
504 uint32_t total = 0;
505 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
506 uint32_t n = 0;
507 dumper.Dump(dumper.GetTag() + " " + priority[i] + " priority event queue information:" + LINE_SEPARATOR);
508 for (auto it = subEventQueues_[i].queue.begin(); it != subEventQueues_[i].queue.end(); ++it) {
509 ++n;
510 dumper.Dump(dumper.GetTag() + " No." + std::to_string(n) + " : " + (*it)->Dump());
511 ++total;
512 }
513 dumper.Dump(
514 dumper.GetTag() + " Total size of " + priority[i] + " events : " + std::to_string(n) + LINE_SEPARATOR);
515 }
516
517 dumper.Dump(dumper.GetTag() + " Idle priority event queue information:" + LINE_SEPARATOR);
518 int n = 0;
519 for (auto it = idleEvents_.begin(); it != idleEvents_.end(); ++it) {
520 ++n;
521 dumper.Dump(dumper.GetTag() + " No." + std::to_string(n) + " : " + (*it)->Dump());
522 ++total;
523 }
524 dumper.Dump(dumper.GetTag() + " Total size of Idle events : " + std::to_string(n) + LINE_SEPARATOR);
525
526 dumper.Dump(dumper.GetTag() + " Total event size : " + std::to_string(total) + LINE_SEPARATOR);
527 }
528
DumpQueueInfo(std::string & queueInfo)529 void EventQueue::DumpQueueInfo(std::string& queueInfo)
530 {
531 std::lock_guard<std::mutex> lock(queueLock_);
532 std::string priority[] = {"Immediate", "High", "Low"};
533 uint32_t total = 0;
534 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
535 uint32_t n = 0;
536 queueInfo += " " + priority[i] + " priority event queue:" + LINE_SEPARATOR;
537 for (auto it = subEventQueues_[i].queue.begin(); it != subEventQueues_[i].queue.end(); ++it) {
538 ++n;
539 queueInfo += " No." + std::to_string(n) + " : " + (*it)->Dump();
540 ++total;
541 }
542 queueInfo += " Total size of " + priority[i] + " events : " + std::to_string(n) + LINE_SEPARATOR;
543 }
544
545 queueInfo += " Idle priority event queue:" + LINE_SEPARATOR;
546
547 int n = 0;
548 for (auto it = idleEvents_.begin(); it != idleEvents_.end(); ++it) {
549 ++n;
550 queueInfo += " No." + std::to_string(n) + " : " + (*it)->Dump();
551 ++total;
552 }
553 queueInfo += " Total size of Idle events : " + std::to_string(n) + LINE_SEPARATOR;
554
555 queueInfo += " Total event size : " + std::to_string(total);
556 }
557
IsIdle()558 bool EventQueue::IsIdle()
559 {
560 return isIdle_;
561 }
562
IsQueueEmpty()563 bool EventQueue::IsQueueEmpty()
564 {
565 std::lock_guard<std::mutex> lock(queueLock_);
566 for (uint32_t i = 0; i < SUB_EVENT_QUEUE_NUM; ++i) {
567 uint32_t queueSize = subEventQueues_[i].queue.size();
568 if (queueSize != 0) {
569 return false;
570 }
571 }
572
573 return idleEvents_.size() == 0;
574 }
575 } // namespace AppExecFwk
576 } // namespace OHOS
577