• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "event_loop.h"
16 
17 #include <climits>
18 #include <functional>
19 #include <thread>
20 
21 #if defined(__HIVIEW_OHOS__)
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/prctl.h>
26 #elif defined(_WIN32)
27 #include <processthreadsapi.h>
28 #include <sstream>
29 #include <Synchapi.h>
30 #include <tchar.h>
31 #include <windows.h>
32 #endif
33 
34 #include "audit.h"
35 #include "file_util.h"
36 #include "logger.h"
37 #include "thread_util.h"
38 namespace OHOS {
39 namespace HiviewDFX {
40 namespace {
GetFalseFuture()41 std::future<bool> GetFalseFuture()
42 {
43     std::promise<bool> tmpPromise;
44     tmpPromise.set_value(false);
45     return tmpPromise.get_future();
46 }
47 }
48 
49 DEFINE_LOG_TAG("HiView-EventLoop");
50 
EventLoop(const std::string & name)51 EventLoop::EventLoop(const std::string &name) : name_(name), nextWakeupTime_(0), currentProcessingEvent_(nullptr)
52 {}
53 
~EventLoop()54 EventLoop::~EventLoop()
55 {
56     StopLoop();
57 }
58 
InitEventQueueNotifier()59 bool EventLoop::InitEventQueueNotifier()
60 {
61 #if defined(__HIVIEW_OHOS__)
62 #if defined(USE_POLL)
63     for (int i = 0; i < 2; i++) { // 2:event queue fd size
64         if (eventQueueFd_[i] > 0) {
65             close(eventQueueFd_[i]);
66             eventQueueFd_[i] = -1;
67         }
68     }
69 
70     if (pipe2(eventQueueFd_, O_CLOEXEC) != 0) {
71         HIVIEW_LOGW("Failed to create event queue fd.");
72         return false;
73     }
74 
75     watchFds_[0].fd = eventQueueFd_[0];
76     watchFds_[0].events = POLLIN;
77     watchedFdSize_ = 1;
78 #else
79 #if defined EPOLL_CLOEXEC
80     sharedPollingFd_ = UniqueFd(epoll_create1(EPOLL_CLOEXEC));
81 #else
82     sharedPollingFd_ = UniqueFd(epoll_create(1024)); // listen 1024 sockets
83 #endif
84     pendingEventQueueFd_ = UniqueFd(eventfd(0, EFD_NONBLOCK));
85     struct epoll_event eventItem;
86     eventItem.events = EPOLLIN | EPOLLET;
87     eventItem.data.fd = pendingEventQueueFd_.Get();
88     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, pendingEventQueueFd_.Get(), &eventItem);
89     if (result < 0) {
90         HIVIEW_LOGE("Fail to Create event poll queue.");
91         return false;
92     }
93 #endif
94 #elif defined(_WIN32)
95     watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX] = CreateEventA(NULL, FALSE, FALSE, NULL);
96 #endif
97     return true;
98 }
99 
WakeUp()100 void EventLoop::WakeUp()
101 {
102 #if defined(__HIVIEW_OHOS__)
103 #ifdef USE_POLL
104     if (eventQueueFd_[1] > 0) {
105         int32_t count = 1;
106         write(eventQueueFd_[1], &count, sizeof(count));
107     }
108 #else
109     if (pendingEventQueueFd_.Get() > 0) {
110         eventfd_t count = 1;
111         write(pendingEventQueueFd_.Get(), &count, sizeof(count));
112     }
113 #endif
114 #elif defined(_WIN32)
115     SetEvent(watchHandleList_[LOOP_WAKEUP_HANDLE_INDEX]);
116 #endif
117 }
118 
StartLoop(bool createNewThread)119 void EventLoop::StartLoop(bool createNewThread)
120 {
121     std::lock_guard<std::mutex> lock(queueMutex_);
122     if (IsRunning()) {
123         return;
124     }
125     if (!InitEventQueueNotifier()) {
126         return;
127     }
128 
129     isRunning_ = true;
130     if (createNewThread) {
131         thread_ = std::make_unique<std::thread>(&EventLoop::Run, this);
132         return;
133     }
134     // handle loop in current thread cases
135     Run();
136 }
137 
StopLoop()138 void EventLoop::StopLoop()
139 {
140     needQuit_ = true;
141     if (!IsRunning()) {
142         return;
143     }
144 
145     {
146         std::lock_guard<std::mutex> lock(queueMutex_);
147         while (!pendingEvents_.empty()) {
148             pendingEvents_.pop();
149         }
150         isRunning_ = false;
151     }
152 
153     WakeUp();
154     if (thread_ != nullptr && thread_->joinable()) {
155         thread_->join();
156     }
157 }
158 
AddEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task task)159 uint64_t EventLoop::AddEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task task)
160 {
161     if (needQuit_) {
162         return 0;
163     }
164 
165     uint64_t now = NanoSecondSinceSystemStart();
166     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
167         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
168                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
169         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
170     }
171 
172     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
173     loopEvent.event = std::move(event);
174     loopEvent.handler = handler;
175     loopEvent.task = task;
176     std::lock_guard<std::mutex> lock(queueMutex_);
177     pendingEvents_.push(std::move(loopEvent));
178     WakeUp();
179     return now;
180 }
181 
AddEventForResult(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event)182 std::future<bool> EventLoop::AddEventForResult(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event)
183 {
184     if (needQuit_) {
185         return GetFalseFuture();
186     }
187 
188     if (handler == nullptr || event == nullptr) {
189         return GetFalseFuture();
190     }
191 
192     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
193         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
194                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
195         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
196     }
197 
198     auto bind = std::bind(&EventHandler::OnEventProxy, handler.get(), event);
199     auto task = std::make_shared<std::packaged_task<bool()>>(bind);
200     auto result = task->get_future();
201     uint64_t now = NanoSecondSinceSystemStart();
202     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
203     loopEvent.taskType = LOOP_PACKAGED_TASK;
204     loopEvent.event = std::move(event);
205     loopEvent.handler = handler;
206     loopEvent.packagedTask = std::move(task);
207     std::lock_guard<std::mutex> lock(queueMutex_);
208     pendingEvents_.push(std::move(loopEvent));
209     WakeUp();
210     return result;
211 }
212 
AddTimerEvent(std::shared_ptr<EventHandler> handler,std::shared_ptr<Event> event,const Task & task,uint64_t interval,bool repeat)213 uint64_t EventLoop::AddTimerEvent(std::shared_ptr<EventHandler> handler, std::shared_ptr<Event> event, const Task &task,
214     uint64_t interval, bool repeat)
215 {
216     if (needQuit_) {
217         return 0;
218     }
219 
220     uint64_t now = NanoSecondSinceSystemStart();
221     uint64_t intervalMicro = interval * SECOND_TO_NANOSECOND;
222     if (now + intervalMicro < now) {
223         HIVIEW_LOGW("Add Timer Event fail. The interval is too large. please check.");
224         return -1;
225     }
226 
227     if (Audit::IsEnabled() && (event != nullptr) && (handler != nullptr) && (!(event->isPipeline_))) {
228         auto digest = event->sender_ + Audit::DOMAIN_DELIMITER + handler->GetHandlerInfo() + Audit::DOMAIN_DELIMITER +
229                       GetName() + Audit::DOMAIN_DELIMITER + event->GetEventInfo();
230         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event->createTime_, digest);
231     }
232 
233     LoopEvent loopEvent = LoopEvent::CreateLoopEvent(now);
234     loopEvent.isRepeat = repeat;
235     loopEvent.taskType = LOOP_EVENT_TASK;
236     loopEvent.interval = intervalMicro;
237     loopEvent.targetTime = now + intervalMicro;
238     loopEvent.event = std::move(event);
239     loopEvent.handler = handler;
240     loopEvent.task = task;
241     std::lock_guard<std::mutex> lock(queueMutex_);
242     pendingEvents_.push(std::move(loopEvent));
243     ResetTimerIfNeedLocked();
244     return now;
245 }
246 
RemoveEvent(uint64_t seq)247 bool EventLoop::RemoveEvent(uint64_t seq)
248 {
249     std::lock_guard<std::mutex> lock(queueMutex_);
250     auto curEvent = currentProcessingEvent_.load(std::memory_order_relaxed);
251     if ((curEvent != nullptr) && (curEvent->seq == seq)) {
252         curEvent->seq = 0;
253         HIVIEW_LOGI("removing the current processing event.");
254         return false;
255     }
256     return pendingEvents_.remove(seq);
257 }
258 
ResetTimerIfNeedLocked()259 void EventLoop::ResetTimerIfNeedLocked()
260 {
261     const LoopEvent &event = pendingEvents_.top();
262     if (nextWakeupTime_ == event.targetTime) {
263         return;
264     }
265     WakeUp();
266 }
267 
AddFileDescriptorEventCallback(const std::string & name,std::shared_ptr<FileDescriptorEventCallback> source)268 bool EventLoop::AddFileDescriptorEventCallback(
269     const std::string &name, std::shared_ptr<FileDescriptorEventCallback> source)
270 {
271     if (needQuit_) {
272         return false;
273     }
274 
275     std::lock_guard<std::mutex> lock(queueMutex_);
276 #if defined(__HIVIEW_OHOS__)
277     if (eventSourceNameMap_.size() >= (MAX_WATCHED_FDS - 1)) {
278         HIVIEW_LOGW("Watched fds exceed 64.");
279         return false;
280     }
281 
282     if (eventSourceNameMap_.find(name) != eventSourceNameMap_.end()) {
283         HIVIEW_LOGW("Exist fd callback with same name.");
284         return false;
285     }
286 
287     int fd = source->GetPollFd();
288     if (fd <= 0) {
289         HIVIEW_LOGW("Invalid poll fd.");
290         return false;
291     }
292 
293 #ifdef USE_POLL
294     eventSourceNameMap_[name] = fd;
295     eventSourceMap_[fd] = source;
296     modifyFdStatus_ = true;
297     WakeUp();
298 #else
299     struct epoll_event eventItem;
300     eventItem.events = source->GetPollType();
301     eventItem.data.fd = fd;
302     int result = epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_ADD, fd, &eventItem);
303     if (result < 0) {
304         HIVIEW_LOGW("Fail to Add Fd callback.");
305         return false;
306     }
307 
308     eventSourceNameMap_[name] = fd;
309     eventSourceMap_[fd] = source;
310 #endif
311 #elif defined(_WIN32)
312     // not supported yet
313 #endif
314     return true;
315 }
316 
RemoveFileDescriptorEventCallback(const std::string & name)317 bool EventLoop::RemoveFileDescriptorEventCallback(const std::string &name)
318 {
319     std::lock_guard<std::mutex> lock(queueMutex_);
320 #if defined(__HIVIEW_OHOS__)
321     if (eventSourceNameMap_.find(name) == eventSourceNameMap_.end()) {
322         HIVIEW_LOGW("fd callback name is not existed.");
323         return false;
324     }
325 
326     int fd = eventSourceNameMap_[name];
327     eventSourceNameMap_.erase(name);
328     eventSourceMap_.erase(fd);
329 
330 #ifdef USE_POLL
331     modifyFdStatus_ = true;
332     WakeUp();
333 #else
334     if (epoll_ctl(sharedPollingFd_.Get(), EPOLL_CTL_DEL, fd, nullptr) == -1) {
335         HIVIEW_LOGW("fail to remove watched fd.");
336     }
337 #endif
338 #elif defined(_WIN32)
339     // not supported yet
340 #endif
341     return true;
342 }
343 
344 #ifdef USE_POLL
ModifyFdStatus()345 void EventLoop::ModifyFdStatus()
346 {
347     std::lock_guard<std::mutex> lock(queueMutex_);
348     modifyFdStatus_ = false;
349     int index = 1;
350     for (auto it = eventSourceMap_.begin(); it != eventSourceMap_.end(); it++) {
351         if (index > MAX_WATCHED_FDS - 1) {
352             break;
353         }
354 
355         watchFds_[index].fd = it->first;
356         watchFds_[index].events = it->second->GetPollType();
357         index++;
358         watchedFdSize_ = index;
359     }
360 }
361 
PollNextEvent(uint64_t timeout)362 void EventLoop::PollNextEvent(uint64_t timeout)
363 {
364     poll(watchFds_, watchedFdSize_, timeout);
365     isWaken_ = true;
366     if (modifyFdStatus_) {
367         ModifyFdStatus();
368         return;
369     }
370 
371     if (watchFds_[0].revents & POLLIN) {
372         // new queued event arrived
373         int32_t val = 0;
374         read(watchFds_[0].fd, &val, sizeof(val));
375         return;
376     }
377 
378     for (int i = 1; i < watchedFdSize_; i++) {
379         int32_t fd = watchFds_[i].fd;
380         std::lock_guard<std::mutex> lock(queueMutex_);
381         auto it = eventSourceMap_.find(fd);
382         if (it == eventSourceMap_.end()) {
383             continue;
384         }
385 
386         int32_t pollType = it->second->GetPollType();
387         if (watchFds_[i].revents & pollType) {
388             it->second->OnFileDescriptorEvent(fd, watchFds_[i].revents);
389         }
390     }
391 }
392 #endif
393 
Run()394 void EventLoop::Run()
395 {
396     // set thread name
397     const int maxLength = 16;
398     std::string restrictedName = name_;
399     if (name_.length() >= maxLength) {
400         HIVIEW_LOGW("%{public}s is too long for thread, please change to a shorter one.", name_.c_str());
401         restrictedName = name_.substr(0, maxLength - 1);
402     }
403     Thread::SetThreadDescription(restrictedName);
404 
405     name_ = name_ + "@" + std::to_string(Thread::GetTid());
406 
407     while (true) {
408         uint64_t leftTimeNanosecond = ProcessQueuedEvent();
409         uint64_t leftTimeMill = INT_MAX;
410         if (leftTimeNanosecond != INT_MAX) {
411             leftTimeMill = (leftTimeNanosecond / NANOSECOND_TO_MILLSECOND);
412         }
413         WaitNextEvent(leftTimeMill);
414         if (needQuit_) {
415             break;
416         }
417     }
418 }
419 
ProcessQueuedEvent()420 uint64_t EventLoop::ProcessQueuedEvent()
421 {
422     if (pendingEvents_.empty()) {
423         return INT_MAX;
424     }
425 
426     uint64_t leftTimeNanosecond = 0;
427     while (!pendingEvents_.empty()) {
428         uint64_t now = NanoSecondSinceSystemStart();
429         LoopEvent event;
430         if (!FetchNextEvent(now, leftTimeNanosecond, event)) {
431             break;
432         }
433 
434         ProcessEvent(event);
435 
436         if (event.isRepeat && (event.interval > 0)) {
437             // force update time
438             now = NanoSecondSinceSystemStart();
439             ReInsertPeriodicEvent(now, event);
440         }
441 
442         std::lock_guard<std::mutex> lock(queueMutex_);
443         currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
444     }
445     isWaken_ = false;
446     return leftTimeNanosecond;
447 }
448 
FetchNextEvent(uint64_t now,uint64_t & leftTimeNanosecond,LoopEvent & out)449 bool EventLoop::FetchNextEvent(uint64_t now, uint64_t& leftTimeNanosecond, LoopEvent& out)
450 {
451     if (needQuit_) {
452         return false;
453     }
454 
455     std::lock_guard<std::mutex> lock(queueMutex_);
456     if (pendingEvents_.empty()) {
457         return false;
458     }
459 
460     const LoopEvent &event = pendingEvents_.top();
461     if (event.targetTime > now) {
462         leftTimeNanosecond = event.targetTime - now;
463         nextWakeupTime_ = event.targetTime;
464         return false;
465     }
466 
467     out = event;
468     pendingEvents_.pop();
469     currentProcessingEvent_.store(&out, std::memory_order_relaxed);
470     return true;
471 }
472 
ProcessEvent(LoopEvent & event)473 void EventLoop::ProcessEvent(LoopEvent &event)
474 {
475     if (event.taskType == LOOP_EVENT_TASK) {
476         if (event.task != nullptr) {
477             event.task();
478         } else if ((event.handler != nullptr) && (event.event != nullptr)) {
479             event.handler->OnEventProxy(event.event);
480         } else {
481             HIVIEW_LOGW("Loop event task with null tasks.");
482         }
483     } else if (event.taskType == LOOP_PACKAGED_TASK) {
484         if (event.packagedTask != nullptr) {
485             event.packagedTask->operator()();
486         } else {
487             HIVIEW_LOGW("Loop packaged task with null tasks.");
488         }
489     } else {
490         HIVIEW_LOGW("unrecognized task type.");
491     }
492 }
493 
ReInsertPeriodicEvent(uint64_t now,LoopEvent & event)494 void EventLoop::ReInsertPeriodicEvent(uint64_t now, LoopEvent &event)
495 {
496     std::lock_guard<std::mutex> lock(queueMutex_);
497     currentProcessingEvent_.store(nullptr, std::memory_order_relaxed);
498     if (event.seq == 0) {
499         return;
500     }
501 
502     event.enqueueTime = now;
503     event.targetTime = now + event.interval;
504 
505     if (Audit::IsEnabled() && (event.event != nullptr) && (event.handler != nullptr)) {
506         event.event->ResetTimestamp();
507         auto digest = event.event->sender_ + Audit::DOMAIN_DELIMITER + event.handler->GetHandlerInfo() +
508                       Audit::DOMAIN_DELIMITER + GetName() + Audit::DOMAIN_DELIMITER + event.event->GetEventInfo();
509         Audit::WriteAuditEvent(Audit::StatsEvent::QUEUE_EVENT_IN, event.event->createTime_, digest);
510     }
511 
512     pendingEvents_.push(std::move(event));
513     ResetTimerIfNeedLocked();
514 }
515 
WaitNextEvent(uint64_t leftTimeMill)516 void EventLoop::WaitNextEvent(uint64_t leftTimeMill)
517 {
518 #if defined(__HIVIEW_OHOS__)
519 #ifdef USE_POLL
520     PollNextEvent(leftTimeMill);
521 #else
522     struct epoll_event eventItems[MAX_EVENT_SIZE];
523     int eventCount = epoll_wait(sharedPollingFd_.Get(), eventItems, MAX_EVENT_SIZE, leftTimeMill);
524     isWaken_ = true;
525     if (eventCount <= 0) {
526         // no event read from watched fd, process queued events
527         return;
528     }
529 
530     for (int i = 0; i < eventCount; i++) {
531         int fd = eventItems[i].data.fd;
532         uint32_t events = eventItems[i].events;
533         if (fd == pendingEventQueueFd_.Get()) {
534             // new queued event arrived
535             eventfd_t val = 0;
536             read(fd, &val, sizeof(val));
537             return;
538         } else {
539             // process data source callbacks
540             auto it = eventSourceMap_.find(fd);
541             if (it != eventSourceMap_.end()) {
542                 it->second->OnFileDescriptorEvent(fd, events);
543             }
544         }
545     }
546 #endif
547 #elif defined(_WIN32)
548     DWORD dWaitTime = (leftTimeMill >= INFINITE) ? INFINITE : static_cast<DWORD>(leftTimeMill);
549     DWORD result = WaitForMultipleObjects(MAX_HANDLE_ARRAY_SIZE, watchHandleList_, TRUE, dWaitTime);
550 #endif
551 }
552 
NanoSecondSinceSystemStart()553 uint64_t EventLoop::NanoSecondSinceSystemStart()
554 {
555     auto nanoNow = std::chrono::steady_clock::now().time_since_epoch();
556     return static_cast<uint64_t>(nanoNow.count());
557 }
558 }  // namespace HiviewDFX
559 }  // namespace OHOS
560