1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "event_loop.h"
16
17 #include <climits>
18 #include <functional>
19 #include <thread>
20
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "thread_util.h"
38 namespace OHOS {
39 namespace HiviewDFX {
40 namespace {
GetFalseFuture()41 std::future<bool> GetFalseFuture()
42 {
43 std::promise<bool> tmpPromise;
44 tmpPromise.set_value(false);
45 return tmpPromise.get_future();
46 }
47 }
48
49 DEFINE_LOG_TAG("HiView-EventLoop");
50
EventLoop(const std::string & name)51 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
52 {}
53
~EventLoop()54 EventLoop::~EventLoop()
55 {
56 StopLoop();
57 }
58
InitEventQueueNotifier()59 bool EventLoop::InitEventQueueNotifier()
60 {
61 #if defined(__HIVIEW_OHOS__)
62 #if defined(USE_POLL)
63 for (int i = 0; i < 2; i++) { // 2:event queue fd size
64 if (eventQueueFd_[i] > 0) {
65 close(eventQueueFd_[i]);
66 eventQueueFd_[i] = -1;
67 }
68 }
69
70 if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
71 HIVIEW_LOGW("Failed to create event queue fd.");
72 return false;
73 }
74
75 watchFds_[0].fd = eventQueueFd_[0];
76 watchFds_[0].events = POLLIN;
77 watchedFdSize_ = 1;
78 #else
79 #if defined EPOLL_CLOEXEC
80 sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
81 #else
82 sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
83 #endif
84 pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
85 struct epoll_event eventItem;
86 eventItem.events = EPOLLIN | EPOLLET;
87 eventItem.data.fd = pendingEventQueueFd_.Get();
88 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
89 if (result < 0) {
90 HIVIEW_LOGE("Fail to Create event poll queue.");
91 return false;
92 }
93 #endif
94 #elif defined(_WIN32)
95 watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
96 #endif
97 return true;
98 }
99
WakeUp()100 void EventLoop::WakeUp()
101 {
102 #if defined(__HIVIEW_OHOS__)
103 #ifdef USE_POLL
104 if (eventQueueFd_[1] > 0) {
105 int32_t count = 1;
106 write(eventQueueFd_[1], &count, sizeof(count));
107 }
108 #else
109 if (pendingEventQueueFd_.Get() > 0) {
110 eventfd_t count = 1;
111 write(pendingEventQueueFd_.Get(), &count, sizeof(count));
112 }
113 #endif
114 #elif defined(_WIN32)
115 SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
116 #endif
117 }
118
StartLoop(bool createNewThread)119 void EventLoop::StartLoop(bool createNewThread)
120 {
121 std::lock_guard<std::mutex> lock(queueMutex_);
122 if (IsRunning()) {
123 return;
124 }
125 if (!InitEventQueueNotifier()) {
126 return;
127 }
128
129 isRunning_ = true;
130 if (createNewThread) {
131 thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
132 return;
133 }
134 // handle loop in current thread cases
135 Run();
136 }
137
StopLoop()138 void EventLoop::StopLoop()
139 {
140 needQuit_ = true;
141 if (!IsRunning()) {
142 return;
143 }
144
145 {
146 std::lock_guard<std::mutex> lock(queueMutex_);
147 while (!pendingEvents_.empty()) {
148 pendingEvents_.pop();
149 }
150 isRunning_ = false;
151 }
152
153 WakeUp();
154 if (thread_ != nullptr && thread_->joinable()) {
155 thread_->join();
156 }
157 }
158
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)159 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
160 {
161 if (needQuit_) {
162 return 0;
163 }
164
165 uint64_t now = NanoSecondSinceSystemStart();
166 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
167 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
168 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
169 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
170 }
171
172 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
173 loopEvent.event = std::move(event);
174 loopEvent.handler = handler;
175 loopEvent.task = task;
176 std::lock_guard<std::mutex> lock(queueMutex_);
177 pendingEvents_.push(std::move(loopEvent));
178 WakeUp();
179 return now;
180 }
181
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)182 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
183 {
184 if (needQuit_) {
185 return GetFalseFuture();
186 }
187
188 if (handler == nullptr || event == nullptr) {
189 return GetFalseFuture();
190 }
191
192 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
193 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
194 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
195 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
196 }
197
198 auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
199 auto task = std::make_shared<std::packaged_task<bool()>>(bind);
200 auto result = task->get_future();
201 uint64_t now = NanoSecondSinceSystemStart();
202 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
203 loopEvent.taskType = LOOP_PACKAGED_TASK;
204 loopEvent.event = std::move(event);
205 loopEvent.handler = handler;
206 loopEvent.packagedTask = std::move(task);
207 std::lock_guard<std::mutex> lock(queueMutex_);
208 pendingEvents_.push(std::move(loopEvent));
209 WakeUp();
210 return result;
211 }
212
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)213 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task &task,
214 uint64_t interval, bool repeat)
215 {
216 if (needQuit_) {
217 return 0;
218 }
219
220 uint64_t now = NanoSecondSinceSystemStart();
221 uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
222 if (now + intervalMicro < now) {
223 HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
224 return -1;
225 }
226
227 if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
228 auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
229 GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
230 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
231 }
232
233 LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
234 loopEvent.isRepeat = repeat;
235 loopEvent.taskType = LOOP_EVENT_TASK;
236 loopEvent.interval = intervalMicro;
237 loopEvent.targetTime = now + intervalMicro;
238 loopEvent.event = std::move(event);
239 loopEvent.handler = handler;
240 loopEvent.task = task;
241 std::lock_guard<std::mutex> lock(queueMutex_);
242 pendingEvents_.push(std::move(loopEvent));
243 ResetTimerIfNeedLocked();
244 return now;
245 }
246
RemoveEvent(uint64_t seq)247 bool EventLoop::RemoveEvent(uint64_t seq)
248 {
249 std::lock_guard<std::mutex> lock(queueMutex_);
250 auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
251 if ((curEvent != nullptr) && (curEvent->seq == seq)) {
252 curEvent->seq = 0;
253 HIVIEW_LOGI("removing the current processing event.");
254 return false;
255 }
256 return pendingEvents_.remove(seq);
257 }
258
ResetTimerIfNeedLocked()259 void EventLoop::ResetTimerIfNeedLocked()
260 {
261 const LoopEvent &event = pendingEvents_.top();
262 if (nextWakeupTime_ == event.targetTime) {
263 return;
264 }
265 WakeUp();
266 }
267
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)268 bool EventLoop::AddFileDescriptorEventCallback(
269 const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
270 {
271 if (needQuit_) {
272 return false;
273 }
274
275 std::lock_guard<std::mutex> lock(queueMutex_);
276 #if defined(__HIVIEW_OHOS__)
277 if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
278 HIVIEW_LOGW("Watched fds exceed 64.");
279 return false;
280 }
281
282 if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
283 HIVIEW_LOGW("Exist fd callback with same name.");
284 return false;
285 }
286
287 int fd = source->GetPollFd();
288 if (fd <= 0) {
289 HIVIEW_LOGW("Invalid poll fd.");
290 return false;
291 }
292
293 #ifdef USE_POLL
294 eventSourceNameMap_[name] = fd;
295 eventSourceMap_[fd] = source;
296 modifyFdStatus_ = true;
297 WakeUp();
298 #else
299 struct epoll_event eventItem;
300 eventItem.events = source->GetPollType();
301 eventItem.data.fd = fd;
302 int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
303 if (result < 0) {
304 HIVIEW_LOGW("Fail to Add Fd callback.");
305 return false;
306 }
307
308 eventSourceNameMap_[name] = fd;
309 eventSourceMap_[fd] = source;
310 #endif
311 #elif defined(_WIN32)
312 // not supported yet
313 #endif
314 return true;
315 }
316
RemoveFileDescriptorEventCallback(const std::string & name)317 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
318 {
319 std::lock_guard<std::mutex> lock(queueMutex_);
320 #if defined(__HIVIEW_OHOS__)
321 if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
322 HIVIEW_LOGW("fd callback name is not existed.");
323 return false;
324 }
325
326 int fd = eventSourceNameMap_[name];
327 eventSourceNameMap_.erase(name);
328 eventSourceMap_.erase(fd);
329
330 #ifdef USE_POLL
331 modifyFdStatus_ = true;
332 WakeUp();
333 #else
334 if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
335 HIVIEW_LOGW("fail to remove watched fd.");
336 }
337 #endif
338 #elif defined(_WIN32)
339 // not supported yet
340 #endif
341 return true;
342 }
343
344 #ifdef USE_POLL
ModifyFdStatus()345 void EventLoop::ModifyFdStatus()
346 {
347 std::lock_guard<std::mutex> lock(queueMutex_);
348 modifyFdStatus_ = false;
349 int index = 1;
350 for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
351 if (index > MAX_WATCHED_FDS - 1) {
352 break;
353 }
354
355 watchFds_[index].fd = it->first;
356 watchFds_[index].events = it->second->GetPollType();
357 index++;
358 watchedFdSize_ = index;
359 }
360 }
361
PollNextEvent(uint64_t timeout)362 void EventLoop::PollNextEvent(uint64_t timeout)
363 {
364 poll(watchFds_, watchedFdSize_, timeout);
365 isWaken_ = true;
366 if (modifyFdStatus_) {
367 ModifyFdStatus();
368 return;
369 }
370
371 if (watchFds_[0].revents & POLLIN) {
372 // new queued event arrived
373 int32_t val = 0;
374 read(watchFds_[0].fd, &val, sizeof(val));
375 return;
376 }
377
378 for (int i = 1; i < watchedFdSize_; i++) {
379 int32_t fd = watchFds_[i].fd;
380 std::lock_guard<std::mutex> lock(queueMutex_);
381 auto it = eventSourceMap_.find(fd);
382 if (it == eventSourceMap_.end()) {
383 continue;
384 }
385
386 int32_t pollType = it->second->GetPollType();
387 if (watchFds_[i].revents & pollType) {
388 it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
389 }
390 }
391 }
392 #endif
393
Run()394 void EventLoop::Run()
395 {
396 // set thread name
397 const int maxLength = 16;
398 std::string restrictedName = name_;
399 if (name_.length() >= maxLength) {
400 HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
401 restrictedName = name_.substr(0, maxLength - 1);
402 }
403 Thread::SetThreadDescription(restrictedName);
404
405 name_ = name_ + "@" + std::to_string(Thread::GetTid());
406
407 while (true) {
408 uint64_t leftTimeNanosecond = ProcessQueuedEvent();
409 uint64_t leftTimeMill = INT_MAX;
410 if (leftTimeNanosecond != INT_MAX) {
411 leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
412 }
413 WaitNextEvent(leftTimeMill);
414 if (needQuit_) {
415 break;
416 }
417 }
418 }
419
ProcessQueuedEvent()420 uint64_t EventLoop::ProcessQueuedEvent()
421 {
422 if (pendingEvents_.empty()) {
423 return INT_MAX;
424 }
425
426 uint64_t leftTimeNanosecond = 0;
427 while (!pendingEvents_.empty()) {
428 uint64_t now = NanoSecondSinceSystemStart();
429 LoopEvent event;
430 if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
431 break;
432 }
433
434 ProcessEvent(event);
435
436 if (event.isRepeat && (event.interval > 0)) {
437 // force update time
438 now = NanoSecondSinceSystemStart();
439 ReInsertPeriodicEvent(now, event);
440 }
441
442 std::lock_guard<std::mutex> lock(queueMutex_);
443 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
444 }
445 isWaken_ = false;
446 return leftTimeNanosecond;
447 }
448
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)449 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
450 {
451 if (needQuit_) {
452 return false;
453 }
454
455 std::lock_guard<std::mutex> lock(queueMutex_);
456 if (pendingEvents_.empty()) {
457 return false;
458 }
459
460 const LoopEvent &event = pendingEvents_.top();
461 if (event.targetTime > now) {
462 leftTimeNanosecond = event.targetTime - now;
463 nextWakeupTime_ = event.targetTime;
464 return false;
465 }
466
467 out = event;
468 pendingEvents_.pop();
469 currentProcessingEvent_.store(&out, std::memory_order_relaxed);
470 return true;
471 }
472
ProcessEvent(LoopEvent & event)473 void EventLoop::ProcessEvent(LoopEvent &event)
474 {
475 if (event.taskType == LOOP_EVENT_TASK) {
476 if (event.task != nullptr) {
477 event.task();
478 } else if ((event.handler != nullptr) && (event.event != nullptr)) {
479 event.handler->OnEventProxy(event.event);
480 } else {
481 HIVIEW_LOGW("Loop event task with null tasks.");
482 }
483 } else if (event.taskType == LOOP_PACKAGED_TASK) {
484 if (event.packagedTask != nullptr) {
485 event.packagedTask->operator()();
486 } else {
487 HIVIEW_LOGW("Loop packaged task with null tasks.");
488 }
489 } else {
490 HIVIEW_LOGW("unrecognized task type.");
491 }
492 }
493
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)494 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
495 {
496 std::lock_guard<std::mutex> lock(queueMutex_);
497 currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
498 if (event.seq == 0) {
499 return;
500 }
501
502 event.enqueueTime = now;
503 event.targetTime = now + event.interval;
504
505 if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
506 event.event->ResetTimestamp();
507 auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
508 Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
509 Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
510 }
511
512 pendingEvents_.push(std::move(event));
513 ResetTimerIfNeedLocked();
514 }
515
WaitNextEvent(uint64_t leftTimeMill)516 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
517 {
518 #if defined(__HIVIEW_OHOS__)
519 #ifdef USE_POLL
520 PollNextEvent(leftTimeMill);
521 #else
522 struct epoll_event eventItems[MAX_EVENT_SIZE];
523 int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
524 isWaken_ = true;
525 if (eventCount <= 0) {
526 // no event read from watched fd, process queued events
527 return;
528 }
529
530 for (int i = 0; i < eventCount; i++) {
531 int fd = eventItems[i].data.fd;
532 uint32_t events = eventItems[i].events;
533 if (fd == pendingEventQueueFd_.Get()) {
534 // new queued event arrived
535 eventfd_t val = 0;
536 read(fd, &val, sizeof(val));
537 return;
538 } else {
539 // process data source callbacks
540 auto it = eventSourceMap_.find(fd);
541 if (it != eventSourceMap_.end()) {
542 it->second->OnFileDescriptorEvent(fd, events);
543 }
544 }
545 }
546 #endif
547 #elif defined(_WIN32)
548 DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
549 DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
550 #endif
551 }
552
NanoSecondSinceSystemStart()553 uint64_t EventLoop::NanoSecondSinceSystemStart()
554 {
555 auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
556 return static_cast<uint64_t>(nanoNow.count());
557 }
558 } // namespace HiviewDFX
559 } // namespace OHOS
560