1 /*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "event_loop.h"
16
17 #include <climits>
18 #include <functional>
19 #include <thread>
20
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44 std::promise<bool> tmpPromise;
45 tmpPromise.set_value(false);
46 return tmpPromise.get_future();
47 }
48 }
49
50 DEFINE_LOG_TAG("HiView-EventLoop");
51
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54
~EventLoop()55 EventLoop::~EventLoop()
56 {
57 StopLoop();
58 }
59
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64 for (int i = 0; i < 2; i++) { // 2:event queue fd size
65 if (eventQueueFd_[i] > 0) {
66 close(eventQueueFd_[i]);
67 eventQueueFd_[i] = -1;
68 }
69 }
70
71 if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72 HIVIEW_LOGW("Failed to create event queue fd.");
73 return false;
74 }
75
76 watchFds_[0].fd = eventQueueFd_[0];
77 watchFds_[0].events = POLLIN;
78 watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81 sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83 sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85 pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86 struct epoll_event eventItem;
87 eventItem.events = EPOLLIN | EPOLLET;
88 eventItem.data.fd = pendingEventQueueFd_.Get();
89 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90 if (result < 0) {
91 HIVIEW_LOGE("Fail to Create event poll queue.");
92 return false;
93 }
94 #endif
95 #elif defined(_WIN32)
96 watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98 return true;
99 }
100
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105 if (eventQueueFd_[1] > 0) {
106 int32_t count = 1;
107 write(eventQueueFd_[1], &count, sizeof(count));
108 }
109 #else
110 if (pendingEventQueueFd_.Get() > 0) {
111 eventfd_t count = 1;
112 write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113 }
114 #endif
115 #elif defined(_WIN32)
116 SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122 std::lock_guard<std::mutex> lock(queueMutex_);
123 if (IsRunning()) {
124 return;
125 }
126 if (!InitEventQueueNotifier()) {
127 return;
128 }
129
130 isRunning_ = true;
131 if (createNewThread) {
132 thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
133 return;
134 }
135 // handle loop in current thread cases
136 Run();
137 }
138
StopLoop()139 void EventLoop::StopLoop()
140 {
141 needQuit_ = true;
142 if (!IsRunning()) {
143 return;
144 }
145
146 {
147 std::lock_guard<std::mutex> lock(queueMutex_);
148 while (!pendingEvents_.empty()) {
149 pendingEvents_.pop();
150 }
151 isRunning_ = false;
152 }
153
154 WakeUp();
155 if (thread_ != nullptr && thread_->joinable()) {
156 thread_->join();
157 }
158 }
159
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)160 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
161 {
162 if (needQuit_) {
163 return 0;
164 }
165
166 uint64_t now = NanoSecondSinceSystemStart();
167 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
168 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
169 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
170 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
171 }
172
173 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
174 loopEvent.event = std::move(event);
175 loopEvent.handler = handler;
176 loopEvent.task = task;
177 std::lock_guard<std::mutex> lock(queueMutex_);
178 pendingEvents_.push(std::move(loopEvent));
179 WakeUp();
180 return now;
181 }
182
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)183 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
184 {
185 if (needQuit_) {
186 return GetFalseFuture();
187 }
188
189 if (handler == nullptr || event == nullptr) {
190 return GetFalseFuture();
191 }
192
193 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
194 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
195 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
196 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
197 }
198
199 auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
200 auto task = std::make_shared<std::packaged_task<bool()>>(bind);
201 auto result = task->get_future();
202 uint64_t now = NanoSecondSinceSystemStart();
203 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
204 loopEvent.taskType = LOOP_PACKAGED_TASK;
205 loopEvent.event = std::move(event);
206 loopEvent.handler = handler;
207 loopEvent.packagedTask = std::move(task);
208 std::lock_guard<std::mutex> lock(queueMutex_);
209 pendingEvents_.push(std::move(loopEvent));
210 WakeUp();
211 return result;
212 }
213
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)214 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event,
215 const Task &task, uint64_t interval, bool repeat)
216 {
217 if (needQuit_) {
218 return 0;
219 }
220
221 uint64_t now = NanoSecondSinceSystemStart();
222 uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
223 if (now + intervalMicro < now) {
224 HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
225 return -1;
226 }
227
228 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
229 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
230 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
231 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
232 }
233
234 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
235 loopEvent.isRepeat = repeat;
236 loopEvent.taskType = LOOP_EVENT_TASK;
237 loopEvent.interval = intervalMicro;
238 loopEvent.targetTime = now + intervalMicro;
239 loopEvent.event = std::move(event);
240 loopEvent.handler = handler;
241 loopEvent.task = task;
242 std::lock_guard<std::mutex> lock(queueMutex_);
243 pendingEvents_.push(std::move(loopEvent));
244 HIVIEW_LOGI("task[%{public}" PRIu64 "|%{public}d] has been pushed into task queue.", interval,
245 static_cast<int>(repeat));
246 ResetTimerIfNeedLocked();
247 return now;
248 }
249
RemoveEvent(uint64_t seq)250 bool EventLoop::RemoveEvent(uint64_t seq)
251 {
252 std::lock_guard<std::mutex> lock(queueMutex_);
253 auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
254 if ((curEvent != nullptr) && (curEvent->seq == seq)) {
255 curEvent->seq = 0;
256 HIVIEW_LOGI("removing the current processing event.");
257 return false;
258 }
259 return pendingEvents_.remove(seq);
260 }
261
ResetTimerIfNeedLocked()262 void EventLoop::ResetTimerIfNeedLocked()
263 {
264 const LoopEvent &event = pendingEvents_.top();
265 if (nextWakeupTime_ == event.targetTime) {
266 return;
267 }
268 WakeUp();
269 }
270
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)271 bool EventLoop::AddFileDescriptorEventCallback(
272 const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
273 {
274 if (needQuit_) {
275 return false;
276 }
277
278 std::lock_guard<std::mutex> lock(queueMutex_);
279 #if defined(__HIVIEW_OHOS__)
280 if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
281 HIVIEW_LOGW("Watched fds exceed 64.");
282 return false;
283 }
284
285 if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
286 HIVIEW_LOGW("Exist fd callback with same name.");
287 return false;
288 }
289
290 int fd = source->GetPollFd();
291 if (fd <= 0) {
292 HIVIEW_LOGW("Invalid poll fd.");
293 return false;
294 }
295
296 #ifdef USE_POLL
297 eventSourceNameMap_[name] = fd;
298 eventSourceMap_[fd] = source;
299 modifyFdStatus_ = true;
300 WakeUp();
301 #else
302 struct epoll_event eventItem;
303 eventItem.events = source->GetPollType();
304 eventItem.data.fd = fd;
305 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
306 if (result < 0) {
307 HIVIEW_LOGW("Fail to Add Fd callback.");
308 return false;
309 }
310
311 eventSourceNameMap_[name] = fd;
312 eventSourceMap_[fd] = source;
313 #endif
314 #elif defined(_WIN32)
315 // not supported yet
316 #endif
317 return true;
318 }
319
RemoveFileDescriptorEventCallback(const std::string & name)320 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
321 {
322 std::lock_guard<std::mutex> lock(queueMutex_);
323 #if defined(__HIVIEW_OHOS__)
324 if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
325 HIVIEW_LOGW("fd callback name is not existed.");
326 return false;
327 }
328
329 int fd = eventSourceNameMap_[name];
330 eventSourceNameMap_.erase(name);
331 eventSourceMap_.erase(fd);
332
333 #ifdef USE_POLL
334 modifyFdStatus_ = true;
335 WakeUp();
336 #else
337 if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
338 HIVIEW_LOGW("fail to remove watched fd.");
339 }
340 #endif
341 #elif defined(_WIN32)
342 // not supported yet
343 #endif
344 return true;
345 }
346
347 #ifdef USE_POLL
ModifyFdStatus()348 void EventLoop::ModifyFdStatus()
349 {
350 std::lock_guard<std::mutex> lock(queueMutex_);
351 modifyFdStatus_ = false;
352 int index = 1;
353 for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
354 if (index > MAX_WATCHED_FDS - 1) {
355 break;
356 }
357
358 watchFds_[index].fd = it->first;
359 watchFds_[index].events = it->second->GetPollType();
360 index++;
361 watchedFdSize_ = index;
362 }
363 }
364
PollNextEvent(uint64_t timeout)365 void EventLoop::PollNextEvent(uint64_t timeout)
366 {
367 poll(watchFds_, watchedFdSize_, timeout);
368 isWaken_ = true;
369 if (modifyFdStatus_) {
370 ModifyFdStatus();
371 return;
372 }
373
374 if (watchFds_[0].revents & POLLIN) {
375 // new queued event arrived
376 int32_t val = 0;
377 read(watchFds_[0].fd, &val, sizeof(val));
378 return;
379 }
380
381 for (int i = 1; i < watchedFdSize_; i++) {
382 int32_t fd = watchFds_[i].fd;
383 std::lock_guard<std::mutex> lock(queueMutex_);
384 auto it = eventSourceMap_.find(fd);
385 if (it == eventSourceMap_.end()) {
386 continue;
387 }
388
389 int32_t pollType = it->second->GetPollType();
390 if (watchFds_[i].revents & pollType) {
391 it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
392 }
393 }
394 }
395 #endif
396
Run()397 void EventLoop::Run()
398 {
399 if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
400 HIVIEW_LOGW("Failed to optimize memory for current thread");
401 }
402
403 // set thread name
404 const int maxLength = 16;
405 std::string restrictedName = name_;
406 if (name_.length() >= maxLength) {
407 HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
408 restrictedName = name_.substr(0, maxLength - 1);
409 }
410 Thread::SetThreadDescription(restrictedName);
411
412 name_ = name_ + "@" + std::to_string(Thread::GetTid());
413
414 while (true) {
415 uint64_t leftTimeNanosecond = ProcessQueuedEvent();
416 uint64_t leftTimeMill = INT_MAX;
417 if (leftTimeNanosecond != INT_MAX) {
418 leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
419 }
420 WaitNextEvent(leftTimeMill);
421 if (needQuit_) {
422 break;
423 }
424 }
425 }
426
ProcessQueuedEvent()427 uint64_t EventLoop::ProcessQueuedEvent()
428 {
429 if (pendingEvents_.empty()) {
430 return INT_MAX;
431 }
432
433 uint64_t leftTimeNanosecond = 0;
434 while (!pendingEvents_.empty()) {
435 uint64_t now = NanoSecondSinceSystemStart();
436 LoopEvent event;
437 if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
438 break;
439 }
440
441 ProcessEvent(event);
442
443 if (event.isRepeat && (event.interval > 0)) {
444 // force update time
445 now = NanoSecondSinceSystemStart();
446 ReInsertPeriodicEvent(now, event);
447 }
448
449 std::lock_guard<std::mutex> lock(queueMutex_);
450 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
451 }
452 isWaken_ = false;
453 return leftTimeNanosecond;
454 }
455
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)456 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
457 {
458 if (needQuit_) {
459 return false;
460 }
461
462 std::lock_guard<std::mutex> lock(queueMutex_);
463 if (pendingEvents_.empty()) {
464 return false;
465 }
466
467 size_t pendingSize = pendingEvents_.size();
468 const size_t warningPendingSize = 1000;
469 if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
470 HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
471 }
472 pendingEvents_.ShrinkIfNeedLocked();
473
474 const LoopEvent &event = pendingEvents_.top();
475 if (event.targetTime > now) {
476 leftTimeNanosecond = event.targetTime - now;
477 nextWakeupTime_ = event.targetTime;
478 return false;
479 }
480
481 out = event;
482 pendingEvents_.pop();
483 currentProcessingEvent_.store(&out, std::memory_order_relaxed);
484 return true;
485 }
486
ProcessEvent(LoopEvent & event)487 void EventLoop::ProcessEvent(LoopEvent &event)
488 {
489 if (event.taskType == LOOP_EVENT_TASK) {
490 if (event.task != nullptr) {
491 event.task();
492 } else if ((event.handler != nullptr) && (event.event != nullptr)) {
493 event.handler->OnEventProxy(event.event);
494 } else {
495 HIVIEW_LOGW("Loop event task with null tasks.");
496 }
497 } else if (event.taskType == LOOP_PACKAGED_TASK) {
498 if (event.packagedTask != nullptr) {
499 event.packagedTask->operator()();
500 } else {
501 HIVIEW_LOGW("Loop packaged task with null tasks.");
502 }
503 } else {
504 HIVIEW_LOGW("unrecognized task type.");
505 }
506 }
507
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)508 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
509 {
510 std::lock_guard<std::mutex> lock(queueMutex_);
511 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
512 if (event.seq == 0) {
513 return;
514 }
515
516 event.enqueueTime = now;
517 event.targetTime = now + event.interval;
518
519 if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
520 event.event->ResetTimestamp();
521 auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
522 Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
523 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
524 }
525
526 pendingEvents_.push(std::move(event));
527 ResetTimerIfNeedLocked();
528 }
529
WaitNextEvent(uint64_t leftTimeMill)530 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
531 {
532 #if defined(__HIVIEW_OHOS__)
533 #ifdef USE_POLL
534 PollNextEvent(leftTimeMill);
535 #else
536 struct epoll_event eventItems[MAX_EVENT_SIZE];
537 int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
538 isWaken_ = true;
539 if (eventCount <= 0) {
540 // no event read from watched fd, process queued events
541 return;
542 }
543
544 for (int i = 0; i < eventCount; i++) {
545 int fd = eventItems[i].data.fd;
546 uint32_t events = eventItems[i].events;
547 if (fd == pendingEventQueueFd_.Get()) {
548 // new queued event arrived
549 eventfd_t val = 0;
550 read(fd, &val, sizeof(val));
551 return;
552 } else {
553 // process data source callbacks
554 auto it = eventSourceMap_.find(fd);
555 if (it != eventSourceMap_.end()) {
556 it->second->OnFileDescriptorEvent(fd, events);
557 }
558 }
559 }
560 #endif
561 #elif defined(_WIN32)
562 DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
563 DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
564 #endif
565 }
566
NanoSecondSinceSystemStart()567 uint64_t EventLoop::NanoSecondSinceSystemStart()
568 {
569 auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
570 return static_cast<uint64_t>(nanoNow.count());
571 }
572 } // namespace HiviewDFX
573 } // namespace OHOS
574