1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_demultiplexer.h"
17 #include "event_reactor.h"
18 #include "event_handler.h"
19 #include "common_timer_errors.h"
20 #include "utils_log.h"
21
22 namespace OHOS {
23 namespace Utils {
24
25 static const int EPOLL_MAX_EVENS_INIT = 8;
26 static const int HALF_OF_MAX_EVENT = 2;
27 static const int EPOLL_INVALID_FD = -1;
28 static const int INTERRUPTED_SYS_CALL = 4;
29 static const int EPOLL_ERROR_BADF = 9;
30 static const int EPOLL_ERROR_EINVAL = 22;
31 static const int QUEUE_MAX_SIZE = 10;
32
EventDemultiplexer()33 EventDemultiplexer::EventDemultiplexer()
34 : epollFd_(epoll_create1(EPOLL_CLOEXEC)), maxEvents_(EPOLL_MAX_EVENS_INIT), mutex_(), eventHandlers_(),
35 epollCtlErrQueue_()
36 {
37 }
38
~EventDemultiplexer()39 EventDemultiplexer::~EventDemultiplexer()
40 {
41 CleanUp();
42 }
43
StartUp()44 uint32_t EventDemultiplexer::StartUp()
45 {
46 if (epollFd_ < 0) {
47 epollFd_ = epoll_create1(EPOLL_CLOEXEC);
48 if (epollFd_ < 0) {
49 return TIMER_ERR_BADF;
50 }
51 }
52 return TIMER_ERR_OK;
53 }
54
CleanUp()55 void EventDemultiplexer::CleanUp()
56 {
57 if (epollFd_ != EPOLL_INVALID_FD) {
58 close(epollFd_);
59 epollFd_ = EPOLL_INVALID_FD;
60 }
61 }
62
UpdateEventHandler(EventHandler * handler)63 uint32_t EventDemultiplexer::UpdateEventHandler(EventHandler* handler)
64 {
65 if (handler == nullptr) {
66 return TIMER_ERR_INVALID_VALUE;
67 }
68
69 std::lock_guard<std::recursive_mutex> lock(mutex_);
70 auto itor = eventHandlers_.find(handler->GetHandle());
71 if (itor == eventHandlers_.end()) {
72 eventHandlers_.insert(std::make_pair(handler->GetHandle(), handler->shared_from_this()));
73 return Update(EPOLL_CTL_ADD, handler);
74 }
75
76 if (handler->Events() == EventReactor::NONE_EVENT) {
77 eventHandlers_.erase(itor);
78 return Update(EPOLL_CTL_DEL, handler);
79 }
80
81 if (handler != itor->second.get()) {
82 return TIMER_ERR_DEAL_FAILED;
83 }
84 return Update(EPOLL_CTL_MOD, handler);
85 }
86
Update(int operation,EventHandler * handler)87 uint32_t EventDemultiplexer::Update(int operation, EventHandler* handler)
88 {
89 struct epoll_event event;
90 bzero(&event, sizeof(event));
91 event.events = Reactor2Epoll(handler->Events());
92 event.data.fd = handler->GetHandle();
93
94 if (epoll_ctl(epollFd_, operation, handler->GetHandle(), &event) != 0) {
95 UTILS_LOGE("epoll_ctl %{public}d operation %{public}d on handle %{public}d failed, errno %{public}d",
96 epollFd_, operation, handler->GetHandle(), errno);
97
98 if (epollCtlErrQueue_.size() < QUEUE_MAX_SIZE) {
99 epollCtlErrQueue_.push(std::make_tuple(operation, errno, handler->GetHandle()));
100 }
101 return TIMER_ERR_DEAL_FAILED;
102 }
103 return TIMER_ERR_OK;
104 }
105
Polling(int timeout,std::vector<epoll_event> & epollEvents)106 int EventDemultiplexer::Polling(int timeout /* ms */, std::vector<epoll_event> &epollEvents)
107 {
108 std::vector<std::shared_ptr<EventHandler>> taskQue;
109 std::vector<uint32_t> eventQue;
110
111 int nfds = epoll_wait(epollFd_, &epollEvents[0], static_cast<int>(epollEvents.size()), timeout);
112 if (nfds == 0) {
113 return nfds;
114 }
115 if (nfds == -1) {
116 if (errno != INTERRUPTED_SYS_CALL) {
117 UTILS_LOGE("epoll_wait failed, errno %{public}d, epollFd_: %{public}d, pollEvents.size: %{public}zu",
118 errno, epollFd_, epollEvents.size());
119 }
120 if (errno == EPOLL_ERROR_BADF || errno == EPOLL_ERROR_EINVAL) {
121 UTILS_LOGE("epoll_wait critical error, thread exit");
122 return EPOLL_CRITICAL_ERROR;
123 }
124 return nfds;
125 }
126
127 {
128 std::lock_guard<std::recursive_mutex> lock(mutex_);
129 for (int idx = 0; idx < nfds; ++idx) {
130 int targetFd = epollEvents[idx].data.fd;
131 uint32_t events = epollEvents[idx].events;
132
133 auto itor = eventHandlers_.find(targetFd);
134 if (itor != eventHandlers_.end()) {
135 taskQue.emplace_back(itor->second);
136 eventQue.emplace_back(events);
137 } else {
138 UTILS_LOGE("fd not found in eventHandlers_, fd=%{public}d, events=%{public}d", targetFd, events);
139
140 while (!epollCtlErrQueue_.empty()) {
141 auto [operation, errnoNum, fd] = epollCtlErrQueue_.front();
142 UTILS_LOGE("epoll_ctl: %{public}d, errno: %{public}d, handle: %{public}d", operation, errnoNum, fd);
143 epollCtlErrQueue_.pop();
144 }
145 }
146 }
147 }
148
149 for (size_t idx = 0u; idx < taskQue.size() && idx < eventQue.size(); idx++) {
150 taskQue[idx]->HandleEvents(eventQue[idx]);
151 }
152
153 if (nfds == maxEvents_) {
154 maxEvents_ *= HALF_OF_MAX_EVENT;
155 epollEvents.resize(maxEvents_);
156 }
157 return nfds;
158 }
159
Epoll2Reactor(uint32_t epollEvents)160 uint32_t EventDemultiplexer::Epoll2Reactor(uint32_t epollEvents)
161 {
162 if (epollEvents & (EPOLLIN | EPOLLPRI | EPOLLRDHUP)) {
163 return EventReactor::READ_EVENT;
164 }
165
166 return EventReactor::NONE_EVENT;
167 }
168
Reactor2Epoll(uint32_t reactorEvent)169 uint32_t EventDemultiplexer::Reactor2Epoll(uint32_t reactorEvent)
170 {
171 switch (reactorEvent) {
172 case EventReactor::NONE_EVENT:
173 return TIMER_ERR_OK;
174 case EventReactor::READ_EVENT:
175 return EPOLLIN | EPOLLPRI;
176 default:
177 UTILS_LOGD("invalid event %{public}u.", reactorEvent);
178 return TIMER_ERR_DEAL_FAILED;
179 }
180 }
181
182 }
183 }
184