• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_loop.h"
16 
17 #include <climits>
18 #include <functional>
19 #include <thread>
20 
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33 
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44     std::promise<bool> tmpPromise;
45     tmpPromise.set_value(false);
46     return tmpPromise.get_future();
47 }
48 }
49 
50 DEFINE_LOG_TAG("HiView-EventLoop");
51 
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54 
~EventLoop()55 EventLoop::~EventLoop()
56 {
57     StopLoop();
58 }
59 
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64     for (int i = 0; i < 2; i++) { // 2:event queue fd size
65         if (eventQueueFd_[i] > 0) {
66             close(eventQueueFd_[i]);
67             eventQueueFd_[i] = -1;
68         }
69     }
70 
71     if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72         HIVIEW_LOGW("Failed to create event queue fd.");
73         return false;
74     }
75 
76     watchFds_[0].fd = eventQueueFd_[0];
77     watchFds_[0].events = POLLIN;
78     watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81     sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83     sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85     pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86     struct epoll_event eventItem;
87     eventItem.events = EPOLLIN | EPOLLET;
88     eventItem.data.fd = pendingEventQueueFd_.Get();
89     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90     if (result < 0) {
91         HIVIEW_LOGE("Fail to Create event poll queue.");
92         return false;
93     }
94 #endif
95 #elif defined(_WIN32)
96     watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98     return true;
99 }
100 
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105     if (eventQueueFd_[1] > 0) {
106         int32_t count = 1;
107         write(eventQueueFd_[1], &count, sizeof(count));
108     }
109 #else
110     if (pendingEventQueueFd_.Get() > 0) {
111         eventfd_t count = 1;
112         write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113     }
114 #endif
115 #elif defined(_WIN32)
116     SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119 
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122     std::lock_guard<std::mutex> lock(queueMutex_);
123     if (IsRunning()) {
124         return;
125     }
126     if (!InitEventQueueNotifier()) {
127         return;
128     }
129 
130     isRunning_ = true;
131     if (createNewThread) {
132         thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
133         return;
134     }
135     // handle loop in current thread cases
136     Run();
137 }
138 
StopLoop()139 void EventLoop::StopLoop()
140 {
141     needQuit_ = true;
142     if (!IsRunning()) {
143         return;
144     }
145 
146     {
147         std::lock_guard<std::mutex> lock(queueMutex_);
148         while (!pendingEvents_.empty()) {
149             pendingEvents_.pop();
150         }
151         isRunning_ = false;
152     }
153 
154     WakeUp();
155     if (thread_ != nullptr && thread_->joinable()) {
156         thread_->join();
157     }
158 }
159 
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)160 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
161 {
162     if (needQuit_) {
163         return 0;
164     }
165 
166     uint64_t now = NanoSecondSinceSystemStart();
167     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
168         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
169                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
170         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
171     }
172 
173     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
174     loopEvent.event = std::move(event);
175     loopEvent.handler = handler;
176     loopEvent.task = task;
177     std::lock_guard<std::mutex> lock(queueMutex_);
178     pendingEvents_.push(std::move(loopEvent));
179     WakeUp();
180     return now;
181 }
182 
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)183 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
184 {
185     if (needQuit_) {
186         return GetFalseFuture();
187     }
188 
189     if (handler == nullptr || event == nullptr) {
190         return GetFalseFuture();
191     }
192 
193     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
194         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
195                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
196         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
197     }
198 
199     auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
200     auto task = std::make_shared<std::packaged_task<bool()>>(bind);
201     auto result = task->get_future();
202     uint64_t now = NanoSecondSinceSystemStart();
203     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
204     loopEvent.taskType = LOOP_PACKAGED_TASK;
205     loopEvent.event = std::move(event);
206     loopEvent.handler = handler;
207     loopEvent.packagedTask = std::move(task);
208     std::lock_guard<std::mutex> lock(queueMutex_);
209     pendingEvents_.push(std::move(loopEvent));
210     WakeUp();
211     return result;
212 }
213 
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)214 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task &task,
215     uint64_t interval, bool repeat)
216 {
217     if (needQuit_) {
218         return 0;
219     }
220 
221     uint64_t now = NanoSecondSinceSystemStart();
222     uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
223     if (now + intervalMicro < now) {
224         HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
225         return -1;
226     }
227 
228     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
229         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
230                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
231         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
232     }
233 
234     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
235     loopEvent.isRepeat = repeat;
236     loopEvent.taskType = LOOP_EVENT_TASK;
237     loopEvent.interval = intervalMicro;
238     loopEvent.targetTime = now + intervalMicro;
239     loopEvent.event = std::move(event);
240     loopEvent.handler = handler;
241     loopEvent.task = task;
242     std::lock_guard<std::mutex> lock(queueMutex_);
243     pendingEvents_.push(std::move(loopEvent));
244     ResetTimerIfNeedLocked();
245     return now;
246 }
247 
RemoveEvent(uint64_t seq)248 bool EventLoop::RemoveEvent(uint64_t seq)
249 {
250     std::lock_guard<std::mutex> lock(queueMutex_);
251     auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
252     if ((curEvent != nullptr) && (curEvent->seq == seq)) {
253         curEvent->seq = 0;
254         HIVIEW_LOGI("removing the current processing event.");
255         return false;
256     }
257     return pendingEvents_.remove(seq);
258 }
259 
ResetTimerIfNeedLocked()260 void EventLoop::ResetTimerIfNeedLocked()
261 {
262     const LoopEvent &event = pendingEvents_.top();
263     if (nextWakeupTime_ == event.targetTime) {
264         return;
265     }
266     WakeUp();
267 }
268 
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)269 bool EventLoop::AddFileDescriptorEventCallback(
270     const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
271 {
272     if (needQuit_) {
273         return false;
274     }
275 
276     std::lock_guard<std::mutex> lock(queueMutex_);
277 #if defined(__HIVIEW_OHOS__)
278     if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
279         HIVIEW_LOGW("Watched fds exceed 64.");
280         return false;
281     }
282 
283     if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
284         HIVIEW_LOGW("Exist fd callback with same name.");
285         return false;
286     }
287 
288     int fd = source->GetPollFd();
289     if (fd <= 0) {
290         HIVIEW_LOGW("Invalid poll fd.");
291         return false;
292     }
293 
294 #ifdef USE_POLL
295     eventSourceNameMap_[name] = fd;
296     eventSourceMap_[fd] = source;
297     modifyFdStatus_ = true;
298     WakeUp();
299 #else
300     struct epoll_event eventItem;
301     eventItem.events = source->GetPollType();
302     eventItem.data.fd = fd;
303     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
304     if (result < 0) {
305         HIVIEW_LOGW("Fail to Add Fd callback.");
306         return false;
307     }
308 
309     eventSourceNameMap_[name] = fd;
310     eventSourceMap_[fd] = source;
311 #endif
312 #elif defined(_WIN32)
313     // not supported yet
314 #endif
315     return true;
316 }
317 
RemoveFileDescriptorEventCallback(const std::string & name)318 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
319 {
320     std::lock_guard<std::mutex> lock(queueMutex_);
321 #if defined(__HIVIEW_OHOS__)
322     if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
323         HIVIEW_LOGW("fd callback name is not existed.");
324         return false;
325     }
326 
327     int fd = eventSourceNameMap_[name];
328     eventSourceNameMap_.erase(name);
329     eventSourceMap_.erase(fd);
330 
331 #ifdef USE_POLL
332     modifyFdStatus_ = true;
333     WakeUp();
334 #else
335     if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
336         HIVIEW_LOGW("fail to remove watched fd.");
337     }
338 #endif
339 #elif defined(_WIN32)
340     // not supported yet
341 #endif
342     return true;
343 }
344 
345 #ifdef USE_POLL
ModifyFdStatus()346 void EventLoop::ModifyFdStatus()
347 {
348     std::lock_guard<std::mutex> lock(queueMutex_);
349     modifyFdStatus_ = false;
350     int index = 1;
351     for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
352         if (index > MAX_WATCHED_FDS - 1) {
353             break;
354         }
355 
356         watchFds_[index].fd = it->first;
357         watchFds_[index].events = it->second->GetPollType();
358         index++;
359         watchedFdSize_ = index;
360     }
361 }
362 
PollNextEvent(uint64_t timeout)363 void EventLoop::PollNextEvent(uint64_t timeout)
364 {
365     poll(watchFds_, watchedFdSize_, timeout);
366     isWaken_ = true;
367     if (modifyFdStatus_) {
368         ModifyFdStatus();
369         return;
370     }
371 
372     if (watchFds_[0].revents & POLLIN) {
373         // new queued event arrived
374         int32_t val = 0;
375         read(watchFds_[0].fd, &val, sizeof(val));
376         return;
377     }
378 
379     for (int i = 1; i < watchedFdSize_; i++) {
380         int32_t fd = watchFds_[i].fd;
381         std::lock_guard<std::mutex> lock(queueMutex_);
382         auto it = eventSourceMap_.find(fd);
383         if (it == eventSourceMap_.end()) {
384             continue;
385         }
386 
387         int32_t pollType = it->second->GetPollType();
388         if (watchFds_[i].revents & pollType) {
389             it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
390         }
391     }
392 }
393 #endif
394 
Run()395 void EventLoop::Run()
396 {
397     if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
398         HIVIEW_LOGW("Failed to optimize memory for current thread");
399     }
400 
401     // set thread name
402     const int maxLength = 16;
403     std::string restrictedName = name_;
404     if (name_.length() >= maxLength) {
405         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
406         restrictedName = name_.substr(0, maxLength - 1);
407     }
408     Thread::SetThreadDescription(restrictedName);
409 
410     name_ = name_ + "@" + std::to_string(Thread::GetTid());
411 
412     while (true) {
413         uint64_t leftTimeNanosecond = ProcessQueuedEvent();
414         uint64_t leftTimeMill = INT_MAX;
415         if (leftTimeNanosecond != INT_MAX) {
416             leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
417         }
418         WaitNextEvent(leftTimeMill);
419         if (needQuit_) {
420             break;
421         }
422     }
423 }
424 
ProcessQueuedEvent()425 uint64_t EventLoop::ProcessQueuedEvent()
426 {
427     if (pendingEvents_.empty()) {
428         return INT_MAX;
429     }
430 
431     uint64_t leftTimeNanosecond = 0;
432     while (!pendingEvents_.empty()) {
433         uint64_t now = NanoSecondSinceSystemStart();
434         LoopEvent event;
435         if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
436             break;
437         }
438 
439         ProcessEvent(event);
440 
441         if (event.isRepeat && (event.interval > 0)) {
442             // force update time
443             now = NanoSecondSinceSystemStart();
444             ReInsertPeriodicEvent(now, event);
445         }
446 
447         std::lock_guard<std::mutex> lock(queueMutex_);
448         currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
449     }
450     isWaken_ = false;
451     return leftTimeNanosecond;
452 }
453 
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)454 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
455 {
456     if (needQuit_) {
457         return false;
458     }
459 
460     std::lock_guard<std::mutex> lock(queueMutex_);
461     if (pendingEvents_.empty()) {
462         return false;
463     }
464 
465     size_t pendingSize = pendingEvents_.size();
466     const size_t warningPendingSize = 1000;
467     if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
468         HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
469     }
470     pendingEvents_.ShrinkIfNeedLocked();
471 
472     const LoopEvent &event = pendingEvents_.top();
473     if (event.targetTime > now) {
474         leftTimeNanosecond = event.targetTime - now;
475         nextWakeupTime_ = event.targetTime;
476         return false;
477     }
478 
479     out = event;
480     pendingEvents_.pop();
481     currentProcessingEvent_.store(&out, std::memory_order_relaxed);
482     return true;
483 }
484 
ProcessEvent(LoopEvent & event)485 void EventLoop::ProcessEvent(LoopEvent &event)
486 {
487     if (event.taskType == LOOP_EVENT_TASK) {
488         if (event.task != nullptr) {
489             event.task();
490         } else if ((event.handler != nullptr) && (event.event != nullptr)) {
491             event.handler->OnEventProxy(event.event);
492         } else {
493             HIVIEW_LOGW("Loop event task with null tasks.");
494         }
495     } else if (event.taskType == LOOP_PACKAGED_TASK) {
496         if (event.packagedTask != nullptr) {
497             event.packagedTask->operator()();
498         } else {
499             HIVIEW_LOGW("Loop packaged task with null tasks.");
500         }
501     } else {
502         HIVIEW_LOGW("unrecognized task type.");
503     }
504 }
505 
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)506 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
507 {
508     std::lock_guard<std::mutex> lock(queueMutex_);
509     currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
510     if (event.seq == 0) {
511         return;
512     }
513 
514     event.enqueueTime = now;
515     event.targetTime = now + event.interval;
516 
517     if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
518         event.event->ResetTimestamp();
519         auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
520                       Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
521         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
522     }
523 
524     pendingEvents_.push(std::move(event));
525     ResetTimerIfNeedLocked();
526 }
527 
WaitNextEvent(uint64_t leftTimeMill)528 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
529 {
530 #if defined(__HIVIEW_OHOS__)
531 #ifdef USE_POLL
532     PollNextEvent(leftTimeMill);
533 #else
534     struct epoll_event eventItems[MAX_EVENT_SIZE];
535     int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
536     isWaken_ = true;
537     if (eventCount <= 0) {
538         // no event read from watched fd, process queued events
539         return;
540     }
541 
542     for (int i = 0; i < eventCount; i++) {
543         int fd = eventItems[i].data.fd;
544         uint32_t events = eventItems[i].events;
545         if (fd == pendingEventQueueFd_.Get()) {
546             // new queued event arrived
547             eventfd_t val = 0;
548             read(fd, &val, sizeof(val));
549             return;
550         } else {
551             // process data source callbacks
552             auto it = eventSourceMap_.find(fd);
553             if (it != eventSourceMap_.end()) {
554                 it->second->OnFileDescriptorEvent(fd, events);
555             }
556         }
557     }
558 #endif
559 #elif defined(_WIN32)
560     DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
561     DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
562 #endif
563 }
564 
NanoSecondSinceSystemStart()565 uint64_t EventLoop::NanoSecondSinceSystemStart()
566 {
567     auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
568     return static_cast<uint64_t>(nanoNow.count());
569 }
570 }  // namespace HiviewDFX
571 }  // namespace OHOS
572