• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_loop.h"
16 
17 #include <climits>
18 #include <functional>
19 #include <thread>
20 
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33 
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44     std::promise<bool> tmpPromise;
45     tmpPromise.set_value(false);
46     return tmpPromise.get_future();
47 }
48 }
49 
50 DEFINE_LOG_TAG("HiView-EventLoop");
51 
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54 
~EventLoop()55 EventLoop::~EventLoop()
56 {
57     StopLoop();
58 }
59 
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64     for (int i = 0; i < 2; i++) { // 2:event queue fd size
65         if (eventQueueFd_[i] > 0) {
66             close(eventQueueFd_[i]);
67             eventQueueFd_[i] = -1;
68         }
69     }
70 
71     if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72         HIVIEW_LOGW("Failed to create event queue fd.");
73         return false;
74     }
75 
76     watchFds_[0].fd = eventQueueFd_[0];
77     watchFds_[0].events = POLLIN;
78     watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81     sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83     sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85     pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86     struct epoll_event eventItem;
87     eventItem.events = EPOLLIN | EPOLLET;
88     eventItem.data.fd = pendingEventQueueFd_.Get();
89     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90     if (result < 0) {
91         HIVIEW_LOGE("Fail to Create event poll queue.");
92         return false;
93     }
94 #endif
95 #elif defined(_WIN32)
96     watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98     return true;
99 }
100 
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105     if (eventQueueFd_[1] > 0) {
106         int32_t count = 1;
107         write(eventQueueFd_[1], &count, sizeof(count));
108     }
109 #else
110     if (pendingEventQueueFd_.Get() > 0) {
111         eventfd_t count = 1;
112         write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113     }
114 #endif
115 #elif defined(_WIN32)
116     SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119 
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122     std::lock_guard<std::mutex> lock(queueMutex_);
123     if (IsRunning()) {
124         return;
125     }
126     if (!InitEventQueueNotifier()) {
127         return;
128     }
129 
130     isRunning_ = true;
131     if (createNewThread) {
132         thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
133         return;
134     }
135     // handle loop in current thread cases
136     Run();
137 }
138 
StopLoop()139 void EventLoop::StopLoop()
140 {
141     needQuit_ = true;
142     if (!IsRunning()) {
143         return;
144     }
145 
146     {
147         std::lock_guard<std::mutex> lock(queueMutex_);
148         while (!pendingEvents_.empty()) {
149             pendingEvents_.pop();
150         }
151         isRunning_ = false;
152     }
153 
154     WakeUp();
155     if (thread_ != nullptr && thread_->joinable()) {
156         thread_->join();
157     }
158 }
159 
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)160 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
161 {
162     if (needQuit_) {
163         return 0;
164     }
165 
166     uint64_t now = NanoSecondSinceSystemStart();
167     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
168         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
169                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
170         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
171     }
172 
173     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
174     loopEvent.event = std::move(event);
175     loopEvent.handler = handler;
176     loopEvent.task = task;
177     std::lock_guard<std::mutex> lock(queueMutex_);
178     pendingEvents_.push(std::move(loopEvent));
179     WakeUp();
180     return now;
181 }
182 
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)183 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
184 {
185     if (needQuit_) {
186         return GetFalseFuture();
187     }
188 
189     if (handler == nullptr || event == nullptr) {
190         return GetFalseFuture();
191     }
192 
193     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
194         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
195                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
196         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
197     }
198 
199     auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
200     auto task = std::make_shared<std::packaged_task<bool()>>(bind);
201     auto result = task->get_future();
202     uint64_t now = NanoSecondSinceSystemStart();
203     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
204     loopEvent.taskType = LOOP_PACKAGED_TASK;
205     loopEvent.event = std::move(event);
206     loopEvent.handler = handler;
207     loopEvent.packagedTask = std::move(task);
208     std::lock_guard<std::mutex> lock(queueMutex_);
209     pendingEvents_.push(std::move(loopEvent));
210     WakeUp();
211     return result;
212 }
213 
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)214 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
215     const Task &task, uint64_t interval, bool repeat)
216 {
217     if (needQuit_) {
218         return 0;
219     }
220 
221     uint64_t now = NanoSecondSinceSystemStart();
222     uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
223     if (now + intervalMicro < now) {
224         HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
225         return -1;
226     }
227 
228     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
229         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
230                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
231         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
232     }
233 
234     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
235     loopEvent.isRepeat = repeat;
236     loopEvent.taskType = LOOP_EVENT_TASK;
237     loopEvent.interval = intervalMicro;
238     loopEvent.targetTime = now + intervalMicro;
239     loopEvent.event = std::move(event);
240     loopEvent.handler = handler;
241     loopEvent.task = task;
242     std::lock_guard<std::mutex> lock(queueMutex_);
243     pendingEvents_.push(std::move(loopEvent));
244     HIVIEW_LOGI("task[%{public}" PRIu64 "|%{public}d] has been pushed into task queue.", interval,
245         static_cast<int>(repeat));
246     ResetTimerIfNeedLocked();
247     return now;
248 }
249 
RemoveEvent(uint64_t seq)250 bool EventLoop::RemoveEvent(uint64_t seq)
251 {
252     std::lock_guard<std::mutex> lock(queueMutex_);
253     auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
254     if ((curEvent != nullptr) && (curEvent->seq == seq)) {
255         curEvent->seq = 0;
256         HIVIEW_LOGI("removing the current processing event.");
257         return false;
258     }
259     return pendingEvents_.remove(seq);
260 }
261 
ResetTimerIfNeedLocked()262 void EventLoop::ResetTimerIfNeedLocked()
263 {
264     const LoopEvent &event = pendingEvents_.top();
265     if (nextWakeupTime_ == event.targetTime) {
266         return;
267     }
268     WakeUp();
269 }
270 
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)271 bool EventLoop::AddFileDescriptorEventCallback(
272     const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
273 {
274     if (needQuit_) {
275         return false;
276     }
277 
278     std::lock_guard<std::mutex> lock(queueMutex_);
279 #if defined(__HIVIEW_OHOS__)
280     if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
281         HIVIEW_LOGW("Watched fds exceed 64.");
282         return false;
283     }
284 
285     if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
286         HIVIEW_LOGW("Exist fd callback with same name.");
287         return false;
288     }
289 
290     int fd = source->GetPollFd();
291     if (fd <= 0) {
292         HIVIEW_LOGW("Invalid poll fd.");
293         return false;
294     }
295 
296 #ifdef USE_POLL
297     eventSourceNameMap_[name] = fd;
298     eventSourceMap_[fd] = source;
299     modifyFdStatus_ = true;
300     WakeUp();
301 #else
302     struct epoll_event eventItem;
303     eventItem.events = source->GetPollType();
304     eventItem.data.fd = fd;
305     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
306     if (result < 0) {
307         HIVIEW_LOGW("Fail to Add Fd callback.");
308         return false;
309     }
310 
311     eventSourceNameMap_[name] = fd;
312     eventSourceMap_[fd] = source;
313 #endif
314 #elif defined(_WIN32)
315     // not supported yet
316 #endif
317     return true;
318 }
319 
RemoveFileDescriptorEventCallback(const std::string & name)320 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
321 {
322     std::lock_guard<std::mutex> lock(queueMutex_);
323 #if defined(__HIVIEW_OHOS__)
324     if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
325         HIVIEW_LOGW("fd callback name is not existed.");
326         return false;
327     }
328 
329     int fd = eventSourceNameMap_[name];
330     eventSourceNameMap_.erase(name);
331     eventSourceMap_.erase(fd);
332 
333 #ifdef USE_POLL
334     modifyFdStatus_ = true;
335     WakeUp();
336 #else
337     if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
338         HIVIEW_LOGW("fail to remove watched fd.");
339     }
340 #endif
341 #elif defined(_WIN32)
342     // not supported yet
343 #endif
344     return true;
345 }
346 
347 #ifdef USE_POLL
ModifyFdStatus()348 void EventLoop::ModifyFdStatus()
349 {
350     std::lock_guard<std::mutex> lock(queueMutex_);
351     modifyFdStatus_ = false;
352     int index = 1;
353     for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
354         if (index > MAX_WATCHED_FDS - 1) {
355             break;
356         }
357 
358         watchFds_[index].fd = it->first;
359         watchFds_[index].events = it->second->GetPollType();
360         index++;
361         watchedFdSize_ = index;
362     }
363 }
364 
PollNextEvent(uint64_t timeout)365 void EventLoop::PollNextEvent(uint64_t timeout)
366 {
367     poll(watchFds_, watchedFdSize_, timeout);
368     isWaken_ = true;
369     if (modifyFdStatus_) {
370         ModifyFdStatus();
371         return;
372     }
373 
374     if (watchFds_[0].revents & POLLIN) {
375         // new queued event arrived
376         int32_t val = 0;
377         read(watchFds_[0].fd, &val, sizeof(val));
378         return;
379     }
380 
381     for (int i = 1; i < watchedFdSize_; i++) {
382         int32_t fd = watchFds_[i].fd;
383         std::lock_guard<std::mutex> lock(queueMutex_);
384         auto it = eventSourceMap_.find(fd);
385         if (it == eventSourceMap_.end()) {
386             continue;
387         }
388 
389         int32_t pollType = it->second->GetPollType();
390         if (watchFds_[i].revents & pollType) {
391             it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
392         }
393     }
394 }
395 #endif
396 
Run()397 void EventLoop::Run()
398 {
399     if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
400         HIVIEW_LOGW("Failed to optimize memory for current thread");
401     }
402 
403     // set thread name
404     const int maxLength = 16;
405     std::string restrictedName = name_;
406     if (name_.length() >= maxLength) {
407         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
408         restrictedName = name_.substr(0, maxLength - 1);
409     }
410     Thread::SetThreadDescription(restrictedName);
411 
412     name_ = name_ + "@" + std::to_string(Thread::GetTid());
413 
414     while (true) {
415         uint64_t leftTimeNanosecond = ProcessQueuedEvent();
416         uint64_t leftTimeMill = INT_MAX;
417         if (leftTimeNanosecond != INT_MAX) {
418             leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
419         }
420         WaitNextEvent(leftTimeMill);
421         if (needQuit_) {
422             break;
423         }
424     }
425 }
426 
ProcessQueuedEvent()427 uint64_t EventLoop::ProcessQueuedEvent()
428 {
429     if (pendingEvents_.empty()) {
430         return INT_MAX;
431     }
432 
433     uint64_t leftTimeNanosecond = 0;
434     while (!pendingEvents_.empty()) {
435         uint64_t now = NanoSecondSinceSystemStart();
436         LoopEvent event;
437         if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
438             break;
439         }
440 
441         ProcessEvent(event);
442 
443         if (event.isRepeat && (event.interval > 0)) {
444             // force update time
445             now = NanoSecondSinceSystemStart();
446             ReInsertPeriodicEvent(now, event);
447         }
448 
449         std::lock_guard<std::mutex> lock(queueMutex_);
450         currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
451     }
452     isWaken_ = false;
453     return leftTimeNanosecond;
454 }
455 
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)456 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
457 {
458     if (needQuit_) {
459         return false;
460     }
461 
462     std::lock_guard<std::mutex> lock(queueMutex_);
463     if (pendingEvents_.empty()) {
464         return false;
465     }
466 
467     size_t pendingSize = pendingEvents_.size();
468     const size_t warningPendingSize = 1000;
469     if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
470         HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
471     }
472     pendingEvents_.ShrinkIfNeedLocked();
473 
474     const LoopEvent &event = pendingEvents_.top();
475     if (event.targetTime > now) {
476         leftTimeNanosecond = event.targetTime - now;
477         nextWakeupTime_ = event.targetTime;
478         return false;
479     }
480 
481     out = event;
482     pendingEvents_.pop();
483     currentProcessingEvent_.store(&out, std::memory_order_relaxed);
484     return true;
485 }
486 
ProcessEvent(LoopEvent & event)487 void EventLoop::ProcessEvent(LoopEvent &event)
488 {
489     if (event.taskType == LOOP_EVENT_TASK) {
490         if (event.task != nullptr) {
491             event.task();
492         } else if ((event.handler != nullptr) && (event.event != nullptr)) {
493             event.handler->OnEventProxy(event.event);
494         } else {
495             HIVIEW_LOGW("Loop event task with null tasks.");
496         }
497     } else if (event.taskType == LOOP_PACKAGED_TASK) {
498         if (event.packagedTask != nullptr) {
499             event.packagedTask->operator()();
500         } else {
501             HIVIEW_LOGW("Loop packaged task with null tasks.");
502         }
503     } else {
504         HIVIEW_LOGW("unrecognized task type.");
505     }
506 }
507 
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)508 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
509 {
510     std::lock_guard<std::mutex> lock(queueMutex_);
511     currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
512     if (event.seq == 0) {
513         return;
514     }
515 
516     event.enqueueTime = now;
517     event.targetTime = now + event.interval;
518 
519     if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
520         event.event->ResetTimestamp();
521         auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
522                       Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
523         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
524     }
525 
526     pendingEvents_.push(std::move(event));
527     ResetTimerIfNeedLocked();
528 }
529 
WaitNextEvent(uint64_t leftTimeMill)530 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
531 {
532 #if defined(__HIVIEW_OHOS__)
533 #ifdef USE_POLL
534     PollNextEvent(leftTimeMill);
535 #else
536     struct epoll_event eventItems[MAX_EVENT_SIZE];
537     int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
538     isWaken_ = true;
539     if (eventCount <= 0) {
540         // no event read from watched fd, process queued events
541         return;
542     }
543 
544     for (int i = 0; i < eventCount; i++) {
545         int fd = eventItems[i].data.fd;
546         uint32_t events = eventItems[i].events;
547         if (fd == pendingEventQueueFd_.Get()) {
548             // new queued event arrived
549             eventfd_t val = 0;
550             read(fd, &val, sizeof(val));
551             return;
552         } else {
553             // process data source callbacks
554             auto it = eventSourceMap_.find(fd);
555             if (it != eventSourceMap_.end()) {
556                 it->second->OnFileDescriptorEvent(fd, events);
557             }
558         }
559     }
560 #endif
561 #elif defined(_WIN32)
562     DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
563     DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
564 #endif
565 }
566 
NanoSecondSinceSystemStart()567 uint64_t EventLoop::NanoSecondSinceSystemStart()
568 {
569     auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
570     return static_cast<uint64_t>(nanoNow.count());
571 }
572 }  // namespace HiviewDFX
573 }  // namespace OHOS
574