1 /*
2 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "event_loop.h"
16
17 #include <climits>
18 #include <functional>
19 #include <thread>
20
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "memory_util.h"
38 #include "thread_util.h"
39 namespace OHOS {
40 namespace HiviewDFX {
41 namespace {
GetFalseFuture()42 std::future<bool> GetFalseFuture()
43 {
44 std::promise<bool> tmpPromise;
45 tmpPromise.set_value(false);
46 return tmpPromise.get_future();
47 }
48 }
49
50 DEFINE_LOG_TAG("HiView-EventLoop");
51
EventLoop(const std::string & name)52 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
53 {}
54
~EventLoop()55 EventLoop::~EventLoop()
56 {
57 StopLoop();
58 }
59
InitEventQueueNotifier()60 bool EventLoop::InitEventQueueNotifier()
61 {
62 #if defined(__HIVIEW_OHOS__)
63 #if defined(USE_POLL)
64 for (int i = 0; i < 2; i++) { // 2:event queue fd size
65 if (eventQueueFd_[i] > 0) {
66 close(eventQueueFd_[i]);
67 eventQueueFd_[i] = -1;
68 }
69 }
70
71 if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
72 HIVIEW_LOGW("Failed to create event queue fd.");
73 return false;
74 }
75
76 watchFds_[0].fd = eventQueueFd_[0];
77 watchFds_[0].events = POLLIN;
78 watchedFdSize_ = 1;
79 #else
80 #if defined EPOLL_CLOEXEC
81 sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
82 #else
83 sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
84 #endif
85 pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
86 struct epoll_event eventItem;
87 eventItem.events = EPOLLIN | EPOLLET;
88 eventItem.data.fd = pendingEventQueueFd_.Get();
89 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
90 if (result < 0) {
91 HIVIEW_LOGE("Fail to Create event poll queue.");
92 return false;
93 }
94 #endif
95 #elif defined(_WIN32)
96 watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
97 #endif
98 return true;
99 }
100
WakeUp()101 void EventLoop::WakeUp()
102 {
103 #if defined(__HIVIEW_OHOS__)
104 #ifdef USE_POLL
105 if (eventQueueFd_[1] > 0) {
106 int32_t count = 1;
107 write(eventQueueFd_[1], &count, sizeof(count));
108 }
109 #else
110 if (pendingEventQueueFd_.Get() > 0) {
111 eventfd_t count = 1;
112 write(pendingEventQueueFd_.Get(), &count, sizeof(count));
113 }
114 #endif
115 #elif defined(_WIN32)
116 SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
117 #endif
118 }
119
StartLoop(bool createNewThread)120 void EventLoop::StartLoop(bool createNewThread)
121 {
122 std::lock_guard<std::mutex> lock(queueMutex_);
123 if (IsRunning()) {
124 return;
125 }
126 if (!InitEventQueueNotifier()) {
127 return;
128 }
129
130 isRunning_ = true;
131 if (createNewThread) {
132 thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
133 return;
134 }
135 // handle loop in current thread cases
136 Run();
137 }
138
StopLoop()139 void EventLoop::StopLoop()
140 {
141 needQuit_ = true;
142 if (!IsRunning()) {
143 return;
144 }
145
146 {
147 std::lock_guard<std::mutex> lock(queueMutex_);
148 while (!pendingEvents_.empty()) {
149 pendingEvents_.pop();
150 }
151 isRunning_ = false;
152 }
153
154 WakeUp();
155 if (thread_ != nullptr && thread_->joinable()) {
156 thread_->join();
157 }
158 }
159
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)160 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
161 {
162 if (needQuit_) {
163 return 0;
164 }
165
166 uint64_t now = NanoSecondSinceSystemStart();
167 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
168 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
169 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
170 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
171 }
172
173 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
174 loopEvent.event = std::move(event);
175 loopEvent.handler = handler;
176 loopEvent.task = task;
177 std::lock_guard<std::mutex> lock(queueMutex_);
178 pendingEvents_.push(std::move(loopEvent));
179 WakeUp();
180 return now;
181 }
182
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)183 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
184 {
185 if (needQuit_) {
186 return GetFalseFuture();
187 }
188
189 if (handler == nullptr || event == nullptr) {
190 return GetFalseFuture();
191 }
192
193 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
194 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
195 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
196 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
197 }
198
199 auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
200 auto task = std::make_shared<std::packaged_task<bool()>>(bind);
201 auto result = task->get_future();
202 uint64_t now = NanoSecondSinceSystemStart();
203 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
204 loopEvent.taskType = LOOP_PACKAGED_TASK;
205 loopEvent.event = std::move(event);
206 loopEvent.handler = handler;
207 loopEvent.packagedTask = std::move(task);
208 std::lock_guard<std::mutex> lock(queueMutex_);
209 pendingEvents_.push(std::move(loopEvent));
210 WakeUp();
211 return result;
212 }
213
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)214 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task &task,
215 uint64_t interval, bool repeat)
216 {
217 if (needQuit_) {
218 return 0;
219 }
220
221 uint64_t now = NanoSecondSinceSystemStart();
222 uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
223 if (now + intervalMicro < now) {
224 HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
225 return -1;
226 }
227
228 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
229 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
230 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
231 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
232 }
233
234 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
235 loopEvent.isRepeat = repeat;
236 loopEvent.taskType = LOOP_EVENT_TASK;
237 loopEvent.interval = intervalMicro;
238 loopEvent.targetTime = now + intervalMicro;
239 loopEvent.event = std::move(event);
240 loopEvent.handler = handler;
241 loopEvent.task = task;
242 std::lock_guard<std::mutex> lock(queueMutex_);
243 pendingEvents_.push(std::move(loopEvent));
244 ResetTimerIfNeedLocked();
245 return now;
246 }
247
RemoveEvent(uint64_t seq)248 bool EventLoop::RemoveEvent(uint64_t seq)
249 {
250 std::lock_guard<std::mutex> lock(queueMutex_);
251 auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
252 if ((curEvent != nullptr) && (curEvent->seq == seq)) {
253 curEvent->seq = 0;
254 HIVIEW_LOGI("removing the current processing event.");
255 return false;
256 }
257 return pendingEvents_.remove(seq);
258 }
259
ResetTimerIfNeedLocked()260 void EventLoop::ResetTimerIfNeedLocked()
261 {
262 const LoopEvent &event = pendingEvents_.top();
263 if (nextWakeupTime_ == event.targetTime) {
264 return;
265 }
266 WakeUp();
267 }
268
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)269 bool EventLoop::AddFileDescriptorEventCallback(
270 const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
271 {
272 if (needQuit_) {
273 return false;
274 }
275
276 std::lock_guard<std::mutex> lock(queueMutex_);
277 #if defined(__HIVIEW_OHOS__)
278 if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
279 HIVIEW_LOGW("Watched fds exceed 64.");
280 return false;
281 }
282
283 if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
284 HIVIEW_LOGW("Exist fd callback with same name.");
285 return false;
286 }
287
288 int fd = source->GetPollFd();
289 if (fd <= 0) {
290 HIVIEW_LOGW("Invalid poll fd.");
291 return false;
292 }
293
294 #ifdef USE_POLL
295 eventSourceNameMap_[name] = fd;
296 eventSourceMap_[fd] = source;
297 modifyFdStatus_ = true;
298 WakeUp();
299 #else
300 struct epoll_event eventItem;
301 eventItem.events = source->GetPollType();
302 eventItem.data.fd = fd;
303 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
304 if (result < 0) {
305 HIVIEW_LOGW("Fail to Add Fd callback.");
306 return false;
307 }
308
309 eventSourceNameMap_[name] = fd;
310 eventSourceMap_[fd] = source;
311 #endif
312 #elif defined(_WIN32)
313 // not supported yet
314 #endif
315 return true;
316 }
317
RemoveFileDescriptorEventCallback(const std::string & name)318 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
319 {
320 std::lock_guard<std::mutex> lock(queueMutex_);
321 #if defined(__HIVIEW_OHOS__)
322 if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
323 HIVIEW_LOGW("fd callback name is not existed.");
324 return false;
325 }
326
327 int fd = eventSourceNameMap_[name];
328 eventSourceNameMap_.erase(name);
329 eventSourceMap_.erase(fd);
330
331 #ifdef USE_POLL
332 modifyFdStatus_ = true;
333 WakeUp();
334 #else
335 if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
336 HIVIEW_LOGW("fail to remove watched fd.");
337 }
338 #endif
339 #elif defined(_WIN32)
340 // not supported yet
341 #endif
342 return true;
343 }
344
345 #ifdef USE_POLL
ModifyFdStatus()346 void EventLoop::ModifyFdStatus()
347 {
348 std::lock_guard<std::mutex> lock(queueMutex_);
349 modifyFdStatus_ = false;
350 int index = 1;
351 for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
352 if (index > MAX_WATCHED_FDS - 1) {
353 break;
354 }
355
356 watchFds_[index].fd = it->first;
357 watchFds_[index].events = it->second->GetPollType();
358 index++;
359 watchedFdSize_ = index;
360 }
361 }
362
PollNextEvent(uint64_t timeout)363 void EventLoop::PollNextEvent(uint64_t timeout)
364 {
365 poll(watchFds_, watchedFdSize_, timeout);
366 isWaken_ = true;
367 if (modifyFdStatus_) {
368 ModifyFdStatus();
369 return;
370 }
371
372 if (watchFds_[0].revents & POLLIN) {
373 // new queued event arrived
374 int32_t val = 0;
375 read(watchFds_[0].fd, &val, sizeof(val));
376 return;
377 }
378
379 for (int i = 1; i < watchedFdSize_; i++) {
380 int32_t fd = watchFds_[i].fd;
381 std::lock_guard<std::mutex> lock(queueMutex_);
382 auto it = eventSourceMap_.find(fd);
383 if (it == eventSourceMap_.end()) {
384 continue;
385 }
386
387 int32_t pollType = it->second->GetPollType();
388 if (watchFds_[i].revents & pollType) {
389 it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
390 }
391 }
392 }
393 #endif
394
Run()395 void EventLoop::Run()
396 {
397 if (MemoryUtil::DisableThreadCache() != 0 || MemoryUtil::DisableDelayFree() != 0) {
398 HIVIEW_LOGW("Failed to optimize memory for current thread");
399 }
400
401 // set thread name
402 const int maxLength = 16;
403 std::string restrictedName = name_;
404 if (name_.length() >= maxLength) {
405 HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
406 restrictedName = name_.substr(0, maxLength - 1);
407 }
408 Thread::SetThreadDescription(restrictedName);
409
410 name_ = name_ + "@" + std::to_string(Thread::GetTid());
411
412 while (true) {
413 uint64_t leftTimeNanosecond = ProcessQueuedEvent();
414 uint64_t leftTimeMill = INT_MAX;
415 if (leftTimeNanosecond != INT_MAX) {
416 leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
417 }
418 WaitNextEvent(leftTimeMill);
419 if (needQuit_) {
420 break;
421 }
422 }
423 }
424
ProcessQueuedEvent()425 uint64_t EventLoop::ProcessQueuedEvent()
426 {
427 if (pendingEvents_.empty()) {
428 return INT_MAX;
429 }
430
431 uint64_t leftTimeNanosecond = 0;
432 while (!pendingEvents_.empty()) {
433 uint64_t now = NanoSecondSinceSystemStart();
434 LoopEvent event;
435 if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
436 break;
437 }
438
439 ProcessEvent(event);
440
441 if (event.isRepeat && (event.interval > 0)) {
442 // force update time
443 now = NanoSecondSinceSystemStart();
444 ReInsertPeriodicEvent(now, event);
445 }
446
447 std::lock_guard<std::mutex> lock(queueMutex_);
448 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
449 }
450 isWaken_ = false;
451 return leftTimeNanosecond;
452 }
453
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)454 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
455 {
456 if (needQuit_) {
457 return false;
458 }
459
460 std::lock_guard<std::mutex> lock(queueMutex_);
461 if (pendingEvents_.empty()) {
462 return false;
463 }
464
465 size_t pendingSize = pendingEvents_.size();
466 const size_t warningPendingSize = 1000;
467 if ((pendingSize > warningPendingSize) && (pendingSize % warningPendingSize == 0)) {
468 HIVIEW_LOGW("%{public}s has %{public}zu pending events.", name_.c_str(), pendingSize);
469 }
470 pendingEvents_.ShrinkIfNeedLocked();
471
472 const LoopEvent &event = pendingEvents_.top();
473 if (event.targetTime > now) {
474 leftTimeNanosecond = event.targetTime - now;
475 nextWakeupTime_ = event.targetTime;
476 return false;
477 }
478
479 out = event;
480 pendingEvents_.pop();
481 currentProcessingEvent_.store(&out, std::memory_order_relaxed);
482 return true;
483 }
484
ProcessEvent(LoopEvent & event)485 void EventLoop::ProcessEvent(LoopEvent &event)
486 {
487 if (event.taskType == LOOP_EVENT_TASK) {
488 if (event.task != nullptr) {
489 event.task();
490 } else if ((event.handler != nullptr) && (event.event != nullptr)) {
491 event.handler->OnEventProxy(event.event);
492 } else {
493 HIVIEW_LOGW("Loop event task with null tasks.");
494 }
495 } else if (event.taskType == LOOP_PACKAGED_TASK) {
496 if (event.packagedTask != nullptr) {
497 event.packagedTask->operator()();
498 } else {
499 HIVIEW_LOGW("Loop packaged task with null tasks.");
500 }
501 } else {
502 HIVIEW_LOGW("unrecognized task type.");
503 }
504 }
505
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)506 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
507 {
508 std::lock_guard<std::mutex> lock(queueMutex_);
509 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
510 if (event.seq == 0) {
511 return;
512 }
513
514 event.enqueueTime = now;
515 event.targetTime = now + event.interval;
516
517 if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
518 event.event->ResetTimestamp();
519 auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
520 Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
521 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
522 }
523
524 pendingEvents_.push(std::move(event));
525 ResetTimerIfNeedLocked();
526 }
527
WaitNextEvent(uint64_t leftTimeMill)528 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
529 {
530 #if defined(__HIVIEW_OHOS__)
531 #ifdef USE_POLL
532 PollNextEvent(leftTimeMill);
533 #else
534 struct epoll_event eventItems[MAX_EVENT_SIZE];
535 int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
536 isWaken_ = true;
537 if (eventCount <= 0) {
538 // no event read from watched fd, process queued events
539 return;
540 }
541
542 for (int i = 0; i < eventCount; i++) {
543 int fd = eventItems[i].data.fd;
544 uint32_t events = eventItems[i].events;
545 if (fd == pendingEventQueueFd_.Get()) {
546 // new queued event arrived
547 eventfd_t val = 0;
548 read(fd, &val, sizeof(val));
549 return;
550 } else {
551 // process data source callbacks
552 auto it = eventSourceMap_.find(fd);
553 if (it != eventSourceMap_.end()) {
554 it->second->OnFileDescriptorEvent(fd, events);
555 }
556 }
557 }
558 #endif
559 #elif defined(_WIN32)
560 DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
561 DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
562 #endif
563 }
564
NanoSecondSinceSystemStart()565 uint64_t EventLoop::NanoSecondSinceSystemStart()
566 {
567 auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
568 return static_cast<uint64_t>(nanoNow.count());
569 }
570 } // namespace HiviewDFX
571 } // namespace OHOS
572