• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_loop.h"
16 
17 #include <climits>
18 #include <functional>
19 #include <thread>
20 
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33 
34 #include "file_util.h"
35 #include "hiview_logger.h"
36 #include "memory_util.h"
37 #include "thread_util.h"
38 #include "time_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44     std::promise<bool> tmpPromise;
45     tmpPromise.set_value(false);
46     return tmpPromise.get_future();
47 }
48 }
49 
50 DEFINE_LOG_TAG("HiView-EventLoop");
51 
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54 
~EventLoop()55 EventLoop::~EventLoop()
56 {
57     StopLoop();
58 }
59 
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64     for (int i = 0; i < 2; i++) { // 2:event queue fd size
65         if (eventQueueFd_[i] > 0) {
66             close(eventQueueFd_[i]);
67             eventQueueFd_[i] = -1;
68         }
69     }
70 
71     if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72         HIVIEW_LOGW("Failed to create event queue fd.");
73         return false;
74     }
75 
76     watchFds_[0].fd = eventQueueFd_[0];
77     watchFds_[0].events = POLLIN;
78     watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81     sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83     sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85     pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86     struct epoll_event eventItem;
87     eventItem.events = EPOLLIN | EPOLLET;
88     eventItem.data.fd = pendingEventQueueFd_.Get();
89     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90     if (result < 0) {
91         HIVIEW_LOGE("Fail to Create event poll queue.");
92         return false;
93     }
94 #endif
95 #elif defined(_WIN32)
96     watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98     return true;
99 }
100 
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105     if (eventQueueFd_[1] > 0) {
106         int32_t count = 1;
107         write(eventQueueFd_[1], &count, sizeof(count));
108     }
109 #else
110     if (pendingEventQueueFd_.Get() > 0) {
111         eventfd_t count = 1;
112         write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113     }
114 #endif
115 #elif defined(_WIN32)
116     SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119 
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122     {
123         std::lock_guard<std::mutex> lock(queueMutex_);
124         if (IsRunning()) {
125             return;
126         }
127         if (!InitEventQueueNotifier()) {
128             return;
129         }
130         isRunning_ = true;
131     }
132 
133     if (createNewThread) {
134         thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
135         return;
136     }
137     // handle loop in current thread cases
138     Run();
139 }
140 
StopLoop()141 void EventLoop::StopLoop()
142 {
143     needQuit_ = true;
144     if (!IsRunning()) {
145         return;
146     }
147 
148     {
149         std::lock_guard<std::mutex> lock(queueMutex_);
150         while (!pendingEvents_.empty()) {
151             pendingEvents_.pop();
152         }
153         isRunning_ = false;
154     }
155 
156     WakeUp();
157     if (thread_ != nullptr && thread_->joinable()) {
158         thread_->join();
159     }
160 }
161 
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)162 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
163 {
164     if (needQuit_) {
165         return 0;
166     }
167 
168     uint64_t now = NanoSecondSinceSystemStart();
169     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
170     loopEvent.event = std::move(event);
171     loopEvent.handler = handler;
172     loopEvent.task = task;
173     std::lock_guard<std::mutex> lock(queueMutex_);
174     pendingEvents_.push(std::move(loopEvent));
175     WakeUp();
176     return now;
177 }
178 
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)179 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
180 {
181     if (needQuit_) {
182         return GetFalseFuture();
183     }
184 
185     if (handler == nullptr || event == nullptr) {
186         return GetFalseFuture();
187     }
188 
189     auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
190     auto task = std::make_shared<std::packaged_task<bool()>>(bind);
191     auto result = task->get_future();
192     uint64_t now = NanoSecondSinceSystemStart();
193     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
194     loopEvent.taskType = LOOP_PACKAGED_TASK;
195     loopEvent.event = std::move(event);
196     loopEvent.handler = handler;
197     loopEvent.packagedTask = std::move(task);
198     std::lock_guard<std::mutex> lock(queueMutex_);
199     pendingEvents_.push(std::move(loopEvent));
200     WakeUp();
201     return result;
202 }
203 
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)204 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
205     const Task &task, uint64_t interval, bool repeat)
206 {
207     if (needQuit_) {
208         return 0;
209     }
210 
211     uint64_t now = NanoSecondSinceSystemStart();
212     uint64_t intervalMicro = interval * static_cast<uint64_t>(TimeUtil::SEC_TO_NANOSEC);
213     if (now + intervalMicro < now) {
214         HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
215         return -1;
216     }
217 
218     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
219     loopEvent.isRepeat = repeat;
220     loopEvent.taskType = LOOP_EVENT_TASK;
221     loopEvent.interval = intervalMicro;
222     loopEvent.targetTime = now + intervalMicro;
223     loopEvent.event = std::move(event);
224     loopEvent.handler = handler;
225     loopEvent.task = task;
226     std::lock_guard<std::mutex> lock(queueMutex_);
227     pendingEvents_.push(std::move(loopEvent));
228     HIVIEW_LOGI("add task interval=%{public}" PRIu64 ", repeat=%{public}d, seq=%{public}" PRIu64,
229         interval, static_cast<int>(repeat), loopEvent.seq);
230     ResetTimerIfNeedLocked();
231     return now;
232 }
233 
RemoveEvent(uint64_t seq)234 bool EventLoop::RemoveEvent(uint64_t seq)
235 {
236     std::lock_guard<std::mutex> lock(queueMutex_);
237     auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
238     if ((curEvent != nullptr) && (curEvent->seq == seq)) {
239         curEvent->seq = 0;
240         HIVIEW_LOGI("removing the current processing event.");
241         return false;
242     }
243     HIVIEW_LOGI("remove task seq=%{public}" PRIu64, seq);
244     return pendingEvents_.remove(seq);
245 }
246 
GetRawName() const247 std::string EventLoop::GetRawName() const
248 {
249     auto pos = name_.find('@');
250     return pos == std::string::npos ? name_ : name_.substr(0, pos);
251 }
252 
ResetTimerIfNeedLocked()253 void EventLoop::ResetTimerIfNeedLocked()
254 {
255     const LoopEvent &event = pendingEvents_.top();
256     if (nextWakeupTime_ == event.targetTime) {
257         return;
258     }
259     WakeUp();
260 }
261 
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)262 bool EventLoop::AddFileDescriptorEventCallback(
263     const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
264 {
265     if (needQuit_) {
266         return false;
267     }
268 
269     std::lock_guard<std::mutex> lock(queueMutex_);
270 #if defined(__HIVIEW_OHOS__)
271     if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
272         HIVIEW_LOGW("Watched fds exceed 64.");
273         return false;
274     }
275 
276     if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
277         HIVIEW_LOGW("Exist fd callback with same name.");
278         return false;
279     }
280 
281     int fd = source->GetPollFd();
282     if (fd <= 0) {
283         HIVIEW_LOGW("Invalid poll fd.");
284         return false;
285     }
286 
287 #ifdef USE_POLL
288     eventSourceNameMap_[name] = fd;
289     eventSourceMap_[fd] = source;
290     modifyFdStatus_ = true;
291     WakeUp();
292 #else
293     struct epoll_event eventItem;
294     eventItem.events = source->GetPollType();
295     eventItem.data.fd = fd;
296     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
297     if (result < 0) {
298         HIVIEW_LOGW("Fail to Add Fd callback.");
299         return false;
300     }
301 
302     eventSourceNameMap_[name] = fd;
303     eventSourceMap_[fd] = source;
304 #endif
305 #elif defined(_WIN32)
306     // not supported yet
307 #endif
308     return true;
309 }
310 
RemoveFileDescriptorEventCallback(const std::string & name)311 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
312 {
313     std::lock_guard<std::mutex> lock(queueMutex_);
314 #if defined(__HIVIEW_OHOS__)
315     if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
316         HIVIEW_LOGW("fd callback name is not existed.");
317         return false;
318     }
319 
320     int fd = eventSourceNameMap_[name];
321     eventSourceNameMap_.erase(name);
322     eventSourceMap_.erase(fd);
323 
324 #ifdef USE_POLL
325     modifyFdStatus_ = true;
326     WakeUp();
327 #else
328     if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
329         HIVIEW_LOGW("fail to remove watched fd.");
330     }
331 #endif
332 #elif defined(_WIN32)
333     // not supported yet
334 #endif
335     return true;
336 }
337 
338 #ifdef USE_POLL
ModifyFdStatus()339 void EventLoop::ModifyFdStatus()
340 {
341     std::lock_guard<std::mutex> lock(queueMutex_);
342     modifyFdStatus_ = false;
343     int index = 1;
344     for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
345         if (index > MAX_WATCHED_FDS - 1) {
346             break;
347         }
348 
349         watchFds_[index].fd = it->first;
350         watchFds_[index].events = it->second->GetPollType();
351         index++;
352         watchedFdSize_ = index;
353     }
354 }
355 
PollNextEvent(uint64_t timeout)356 void EventLoop::PollNextEvent(uint64_t timeout)
357 {
358     poll(watchFds_, watchedFdSize_, timeout);
359     isWaken_ = true;
360     if (modifyFdStatus_) {
361         ModifyFdStatus();
362         return;
363     }
364 
365     if (watchFds_[0].revents & POLLIN) {
366         // new queued event arrived
367         int32_t val = 0;
368         read(watchFds_[0].fd, &val, sizeof(val));
369         return;
370     }
371 
372     for (int i = 1; i < watchedFdSize_; i++) {
373         int32_t fd = watchFds_[i].fd;
374         std::lock_guard<std::mutex> lock(queueMutex_);
375         auto it = eventSourceMap_.find(fd);
376         if (it == eventSourceMap_.end()) {
377             continue;
378         }
379 
380         int32_t pollType = it->second->GetPollType();
381         if (watchFds_[i].revents & pollType) {
382             it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
383         }
384     }
385 }
386 #endif
387 
Run()388 void EventLoop::Run()
389 {
390     if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
391         HIVIEW_LOGW("Failed to optimize memory for current thread");
392     }
393 
394     InitThreadName();
395 
396     while (true) {
397         uint64_t leftTimeNanosecond = ProcessQueuedEvent();
398         uint64_t leftTimeMill = INT_MAX;
399         if (leftTimeNanosecond != INT_MAX) {
400             leftTimeMill = (leftTimeNanosecond / static_cast<uint64_t>(TimeUtil::MILLISEC_TO_NANOSEC));
401         }
402         WaitNextEvent(leftTimeMill);
403         if (needQuit_) {
404             break;
405         }
406     }
407 }
408 
InitThreadName()409 void EventLoop::InitThreadName()
410 {
411     // set thread name
412     const int maxLength = 16;
413     std::string restrictedName = name_;
414     if (name_.length() >= maxLength) {
415         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
416         restrictedName = name_.substr(0, maxLength - 1);
417     }
418     Thread::SetThreadDescription(restrictedName);
419 
420     name_ = name_ + "@" + std::to_string(Thread::GetTid());
421 }
422 
ProcessQueuedEvent()423 uint64_t EventLoop::ProcessQueuedEvent()
424 {
425     uint64_t leftTimeNanosecond = 0;
426     while (true) {
427         {
428             std::lock_guard<std::mutex> lock(queueMutex_);
429             if (pendingEvents_.empty()) {
430                 return INT_MAX;
431             }
432         }
433         uint64_t now = NanoSecondSinceSystemStart();
434         LoopEvent event;
435         if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
436             break;
437         }
438 
439         ProcessEvent(event);
440 
441         if (event.isRepeat && (event.interval > 0)) {
442             // force update time
443             now = NanoSecondSinceSystemStart();
444             ReInsertPeriodicEvent(now, event);
445         }
446 
447         std::lock_guard<std::mutex> lock(queueMutex_);
448         currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
449     }
450     isWaken_ = false;
451     return leftTimeNanosecond;
452 }
453 
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)454 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
455 {
456     if (needQuit_) {
457         return false;
458     }
459 
460     std::lock_guard<std::mutex> lock(queueMutex_);
461     if (pendingEvents_.empty()) {
462         return false;
463     }
464 
465     size_t pendingSize = pendingEvents_.size();
466     const size_t warningPendingSize = 1000;
467     if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
468         HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
469     }
470     pendingEvents_.ShrinkIfNeedLocked();
471 
472     const LoopEvent &event = pendingEvents_.top();
473     if (event.targetTime > now) {
474         leftTimeNanosecond = event.targetTime - now;
475         nextWakeupTime_ = event.targetTime;
476         return false;
477     }
478 
479     out = event;
480     pendingEvents_.pop();
481     currentProcessingEvent_.store(&out, std::memory_order_relaxed);
482     return true;
483 }
484 
ProcessEvent(LoopEvent & event)485 void EventLoop::ProcessEvent(LoopEvent &event)
486 {
487     if (event.taskType == LOOP_EVENT_TASK) {
488         if (event.task != nullptr) {
489             event.task();
490         } else if ((event.handler != nullptr) && (event.event != nullptr)) {
491             event.handler->OnEventProxy(event.event);
492         } else {
493             HIVIEW_LOGW("Loop event task with null tasks.");
494         }
495     } else if (event.taskType == LOOP_PACKAGED_TASK) {
496         if (event.packagedTask != nullptr) {
497             event.packagedTask->operator()();
498         } else {
499             HIVIEW_LOGW("Loop packaged task with null tasks.");
500         }
501     } else {
502         HIVIEW_LOGW("unrecognized task type.");
503     }
504 }
505 
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)506 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
507 {
508     std::lock_guard<std::mutex> lock(queueMutex_);
509     currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
510     if (event.seq == 0) {
511         return;
512     }
513 
514     event.enqueueTime = now;
515     event.targetTime = now + event.interval;
516     pendingEvents_.push(std::move(event));
517     ResetTimerIfNeedLocked();
518 }
519 
WaitNextEvent(uint64_t leftTimeMill)520 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
521 {
522 #if defined(__HIVIEW_OHOS__)
523 #ifdef USE_POLL
524     PollNextEvent(leftTimeMill);
525 #else
526     struct epoll_event eventItems[MAX_EVENT_SIZE];
527     int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
528     isWaken_ = true;
529     if (eventCount <= 0) {
530         // no event read from watched fd, process queued events
531         return;
532     }
533 
534     for (int i = 0; i < eventCount; i++) {
535         int fd = eventItems[i].data.fd;
536         uint32_t events = eventItems[i].events;
537         if (fd == pendingEventQueueFd_.Get()) {
538             // new queued event arrived
539             eventfd_t val = 0;
540             read(fd, &val, sizeof(val));
541             return;
542         } else {
543             // process data source callbacks
544             auto it = eventSourceMap_.find(fd);
545             if (it != eventSourceMap_.end()) {
546                 it->second->OnFileDescriptorEvent(fd, events);
547             }
548         }
549     }
550 #endif
551 #elif defined(_WIN32)
552     DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
553     DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
554 #endif
555 }
556 
NanoSecondSinceSystemStart()557 uint64_t EventLoop::NanoSecondSinceSystemStart()
558 {
559     auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
560     return static_cast<uint64_t>(nanoNow.count());
561 }
562 }  // namespace HiviewDFX
563 }  // namespace OHOS
564