1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "event_loop.h"
16
17 #include <climits>
18 #include <functional>
19 #include <thread>
20
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33
34 #include "file_util.h"
35 #include "hiview_logger.h"
36 #include "memory_util.h"
37 #include "thread_util.h"
38 #include "time_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44 std::promise<bool> tmpPromise;
45 tmpPromise.set_value(false);
46 return tmpPromise.get_future();
47 }
48 }
49
50 DEFINE_LOG_TAG("HiView-EventLoop");
51
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54
~EventLoop()55 EventLoop::~EventLoop()
56 {
57 StopLoop();
58 }
59
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64 for (int i = 0; i < 2; i++) { // 2:event queue fd size
65 if (eventQueueFd_[i] > 0) {
66 close(eventQueueFd_[i]);
67 eventQueueFd_[i] = -1;
68 }
69 }
70
71 if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72 HIVIEW_LOGW("Failed to create event queue fd.");
73 return false;
74 }
75
76 watchFds_[0].fd = eventQueueFd_[0];
77 watchFds_[0].events = POLLIN;
78 watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81 sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83 sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85 pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86 struct epoll_event eventItem;
87 eventItem.events = EPOLLIN | EPOLLET;
88 eventItem.data.fd = pendingEventQueueFd_.Get();
89 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90 if (result < 0) {
91 HIVIEW_LOGE("Fail to Create event poll queue.");
92 return false;
93 }
94 #endif
95 #elif defined(_WIN32)
96 watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98 return true;
99 }
100
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105 if (eventQueueFd_[1] > 0) {
106 int32_t count = 1;
107 write(eventQueueFd_[1], &count, sizeof(count));
108 }
109 #else
110 if (pendingEventQueueFd_.Get() > 0) {
111 eventfd_t count = 1;
112 write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113 }
114 #endif
115 #elif defined(_WIN32)
116 SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122 {
123 std::lock_guard<std::mutex> lock(queueMutex_);
124 if (IsRunning()) {
125 return;
126 }
127 if (!InitEventQueueNotifier()) {
128 return;
129 }
130 isRunning_ = true;
131 }
132
133 if (createNewThread) {
134 thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
135 return;
136 }
137 // handle loop in current thread cases
138 Run();
139 }
140
StopLoop()141 void EventLoop::StopLoop()
142 {
143 needQuit_ = true;
144 if (!IsRunning()) {
145 return;
146 }
147
148 {
149 std::lock_guard<std::mutex> lock(queueMutex_);
150 while (!pendingEvents_.empty()) {
151 pendingEvents_.pop();
152 }
153 isRunning_ = false;
154 }
155
156 WakeUp();
157 if (thread_ != nullptr && thread_->joinable()) {
158 thread_->join();
159 }
160 }
161
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)162 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
163 {
164 if (needQuit_) {
165 return 0;
166 }
167
168 uint64_t now = NanoSecondSinceSystemStart();
169 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
170 loopEvent.event = std::move(event);
171 loopEvent.handler = handler;
172 loopEvent.task = task;
173 std::lock_guard<std::mutex> lock(queueMutex_);
174 pendingEvents_.push(std::move(loopEvent));
175 WakeUp();
176 return now;
177 }
178
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)179 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
180 {
181 if (needQuit_) {
182 return GetFalseFuture();
183 }
184
185 if (handler == nullptr || event == nullptr) {
186 return GetFalseFuture();
187 }
188
189 auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
190 auto task = std::make_shared<std::packaged_task<bool()>>(bind);
191 auto result = task->get_future();
192 uint64_t now = NanoSecondSinceSystemStart();
193 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
194 loopEvent.taskType = LOOP_PACKAGED_TASK;
195 loopEvent.event = std::move(event);
196 loopEvent.handler = handler;
197 loopEvent.packagedTask = std::move(task);
198 std::lock_guard<std::mutex> lock(queueMutex_);
199 pendingEvents_.push(std::move(loopEvent));
200 WakeUp();
201 return result;
202 }
203
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)204 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
205 const Task &task, uint64_t interval, bool repeat)
206 {
207 if (needQuit_) {
208 return 0;
209 }
210
211 uint64_t now = NanoSecondSinceSystemStart();
212 uint64_t intervalMicro = interval * static_cast<uint64_t>(TimeUtil::SEC_TO_NANOSEC);
213 if (now + intervalMicro < now) {
214 HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
215 return -1;
216 }
217
218 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
219 loopEvent.isRepeat = repeat;
220 loopEvent.taskType = LOOP_EVENT_TASK;
221 loopEvent.interval = intervalMicro;
222 loopEvent.targetTime = now + intervalMicro;
223 loopEvent.event = std::move(event);
224 loopEvent.handler = handler;
225 loopEvent.task = task;
226 std::lock_guard<std::mutex> lock(queueMutex_);
227 pendingEvents_.push(std::move(loopEvent));
228 HIVIEW_LOGI("add task interval=%{public}" PRIu64 ", repeat=%{public}d, seq=%{public}" PRIu64,
229 interval, static_cast<int>(repeat), loopEvent.seq);
230 ResetTimerIfNeedLocked();
231 return now;
232 }
233
RemoveEvent(uint64_t seq)234 bool EventLoop::RemoveEvent(uint64_t seq)
235 {
236 std::lock_guard<std::mutex> lock(queueMutex_);
237 auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
238 if ((curEvent != nullptr) && (curEvent->seq == seq)) {
239 curEvent->seq = 0;
240 HIVIEW_LOGI("removing the current processing event.");
241 return false;
242 }
243 HIVIEW_LOGI("remove task seq=%{public}" PRIu64, seq);
244 return pendingEvents_.remove(seq);
245 }
246
GetRawName() const247 std::string EventLoop::GetRawName() const
248 {
249 auto pos = name_.find('@');
250 return pos == std::string::npos ? name_ : name_.substr(0, pos);
251 }
252
ResetTimerIfNeedLocked()253 void EventLoop::ResetTimerIfNeedLocked()
254 {
255 const LoopEvent &event = pendingEvents_.top();
256 if (nextWakeupTime_ == event.targetTime) {
257 return;
258 }
259 WakeUp();
260 }
261
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)262 bool EventLoop::AddFileDescriptorEventCallback(
263 const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
264 {
265 if (needQuit_) {
266 return false;
267 }
268
269 std::lock_guard<std::mutex> lock(queueMutex_);
270 #if defined(__HIVIEW_OHOS__)
271 if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
272 HIVIEW_LOGW("Watched fds exceed 64.");
273 return false;
274 }
275
276 if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
277 HIVIEW_LOGW("Exist fd callback with same name.");
278 return false;
279 }
280
281 int fd = source->GetPollFd();
282 if (fd <= 0) {
283 HIVIEW_LOGW("Invalid poll fd.");
284 return false;
285 }
286
287 #ifdef USE_POLL
288 eventSourceNameMap_[name] = fd;
289 eventSourceMap_[fd] = source;
290 modifyFdStatus_ = true;
291 WakeUp();
292 #else
293 struct epoll_event eventItem;
294 eventItem.events = source->GetPollType();
295 eventItem.data.fd = fd;
296 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
297 if (result < 0) {
298 HIVIEW_LOGW("Fail to Add Fd callback.");
299 return false;
300 }
301
302 eventSourceNameMap_[name] = fd;
303 eventSourceMap_[fd] = source;
304 #endif
305 #elif defined(_WIN32)
306 // not supported yet
307 #endif
308 return true;
309 }
310
RemoveFileDescriptorEventCallback(const std::string & name)311 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
312 {
313 std::lock_guard<std::mutex> lock(queueMutex_);
314 #if defined(__HIVIEW_OHOS__)
315 if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
316 HIVIEW_LOGW("fd callback name is not existed.");
317 return false;
318 }
319
320 int fd = eventSourceNameMap_[name];
321 eventSourceNameMap_.erase(name);
322 eventSourceMap_.erase(fd);
323
324 #ifdef USE_POLL
325 modifyFdStatus_ = true;
326 WakeUp();
327 #else
328 if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
329 HIVIEW_LOGW("fail to remove watched fd.");
330 }
331 #endif
332 #elif defined(_WIN32)
333 // not supported yet
334 #endif
335 return true;
336 }
337
338 #ifdef USE_POLL
ModifyFdStatus()339 void EventLoop::ModifyFdStatus()
340 {
341 std::lock_guard<std::mutex> lock(queueMutex_);
342 modifyFdStatus_ = false;
343 int index = 1;
344 for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
345 if (index > MAX_WATCHED_FDS - 1) {
346 break;
347 }
348
349 watchFds_[index].fd = it->first;
350 watchFds_[index].events = it->second->GetPollType();
351 index++;
352 watchedFdSize_ = index;
353 }
354 }
355
PollNextEvent(uint64_t timeout)356 void EventLoop::PollNextEvent(uint64_t timeout)
357 {
358 poll(watchFds_, watchedFdSize_, timeout);
359 isWaken_ = true;
360 if (modifyFdStatus_) {
361 ModifyFdStatus();
362 return;
363 }
364
365 if (watchFds_[0].revents & POLLIN) {
366 // new queued event arrived
367 int32_t val = 0;
368 read(watchFds_[0].fd, &val, sizeof(val));
369 return;
370 }
371
372 for (int i = 1; i < watchedFdSize_; i++) {
373 int32_t fd = watchFds_[i].fd;
374 std::lock_guard<std::mutex> lock(queueMutex_);
375 auto it = eventSourceMap_.find(fd);
376 if (it == eventSourceMap_.end()) {
377 continue;
378 }
379
380 int32_t pollType = it->second->GetPollType();
381 if (watchFds_[i].revents & pollType) {
382 it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
383 }
384 }
385 }
386 #endif
387
Run()388 void EventLoop::Run()
389 {
390 if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
391 HIVIEW_LOGW("Failed to optimize memory for current thread");
392 }
393
394 InitThreadName();
395
396 while (true) {
397 uint64_t leftTimeNanosecond = ProcessQueuedEvent();
398 uint64_t leftTimeMill = INT_MAX;
399 if (leftTimeNanosecond != INT_MAX) {
400 leftTimeMill = (leftTimeNanosecond / static_cast<uint64_t>(TimeUtil::MILLISEC_TO_NANOSEC));
401 }
402 WaitNextEvent(leftTimeMill);
403 if (needQuit_) {
404 break;
405 }
406 }
407 }
408
InitThreadName()409 void EventLoop::InitThreadName()
410 {
411 // set thread name
412 const int maxLength = 16;
413 std::string restrictedName = name_;
414 if (name_.length() >= maxLength) {
415 HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
416 restrictedName = name_.substr(0, maxLength - 1);
417 }
418 Thread::SetThreadDescription(restrictedName);
419
420 name_ = name_ + "@" + std::to_string(Thread::GetTid());
421 }
422
ProcessQueuedEvent()423 uint64_t EventLoop::ProcessQueuedEvent()
424 {
425 uint64_t leftTimeNanosecond = 0;
426 while (true) {
427 {
428 std::lock_guard<std::mutex> lock(queueMutex_);
429 if (pendingEvents_.empty()) {
430 return INT_MAX;
431 }
432 }
433 uint64_t now = NanoSecondSinceSystemStart();
434 LoopEvent event;
435 if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
436 break;
437 }
438
439 ProcessEvent(event);
440
441 if (event.isRepeat && (event.interval > 0)) {
442 // force update time
443 now = NanoSecondSinceSystemStart();
444 ReInsertPeriodicEvent(now, event);
445 }
446
447 std::lock_guard<std::mutex> lock(queueMutex_);
448 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
449 }
450 isWaken_ = false;
451 return leftTimeNanosecond;
452 }
453
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)454 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
455 {
456 if (needQuit_) {
457 return false;
458 }
459
460 std::lock_guard<std::mutex> lock(queueMutex_);
461 if (pendingEvents_.empty()) {
462 return false;
463 }
464
465 size_t pendingSize = pendingEvents_.size();
466 const size_t warningPendingSize = 1000;
467 if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
468 HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
469 }
470 pendingEvents_.ShrinkIfNeedLocked();
471
472 const LoopEvent &event = pendingEvents_.top();
473 if (event.targetTime > now) {
474 leftTimeNanosecond = event.targetTime - now;
475 nextWakeupTime_ = event.targetTime;
476 return false;
477 }
478
479 out = event;
480 pendingEvents_.pop();
481 currentProcessingEvent_.store(&out, std::memory_order_relaxed);
482 return true;
483 }
484
ProcessEvent(LoopEvent & event)485 void EventLoop::ProcessEvent(LoopEvent &event)
486 {
487 if (event.taskType == LOOP_EVENT_TASK) {
488 if (event.task != nullptr) {
489 event.task();
490 } else if ((event.handler != nullptr) && (event.event != nullptr)) {
491 event.handler->OnEventProxy(event.event);
492 } else {
493 HIVIEW_LOGW("Loop event task with null tasks.");
494 }
495 } else if (event.taskType == LOOP_PACKAGED_TASK) {
496 if (event.packagedTask != nullptr) {
497 event.packagedTask->operator()();
498 } else {
499 HIVIEW_LOGW("Loop packaged task with null tasks.");
500 }
501 } else {
502 HIVIEW_LOGW("unrecognized task type.");
503 }
504 }
505
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)506 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
507 {
508 std::lock_guard<std::mutex> lock(queueMutex_);
509 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
510 if (event.seq == 0) {
511 return;
512 }
513
514 event.enqueueTime = now;
515 event.targetTime = now + event.interval;
516 pendingEvents_.push(std::move(event));
517 ResetTimerIfNeedLocked();
518 }
519
WaitNextEvent(uint64_t leftTimeMill)520 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
521 {
522 #if defined(__HIVIEW_OHOS__)
523 #ifdef USE_POLL
524 PollNextEvent(leftTimeMill);
525 #else
526 struct epoll_event eventItems[MAX_EVENT_SIZE];
527 int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
528 isWaken_ = true;
529 if (eventCount <= 0) {
530 // no event read from watched fd, process queued events
531 return;
532 }
533
534 for (int i = 0; i < eventCount; i++) {
535 int fd = eventItems[i].data.fd;
536 uint32_t events = eventItems[i].events;
537 if (fd == pendingEventQueueFd_.Get()) {
538 // new queued event arrived
539 eventfd_t val = 0;
540 read(fd, &val, sizeof(val));
541 return;
542 } else {
543 // process data source callbacks
544 auto it = eventSourceMap_.find(fd);
545 if (it != eventSourceMap_.end()) {
546 it->second->OnFileDescriptorEvent(fd, events);
547 }
548 }
549 }
550 #endif
551 #elif defined(_WIN32)
552 DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
553 DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
554 #endif
555 }
556
NanoSecondSinceSystemStart()557 uint64_t EventLoop::NanoSecondSinceSystemStart()
558 {
559 auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
560 return static_cast<uint64_t>(nanoNow.count());
561 }
562 } // namespace HiviewDFX
563 } // namespace OHOS
564