1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "epoll_event_poller.h"
16
17 #include <algorithm>
18 #include <cerrno>
19 #include <csignal>
20 #include <cstring>
21 #include <fcntl.h>
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <unistd.h>
26
27 #include "logging.h"
28
EpollEventPoller(int timeOut)29 EpollEventPoller::EpollEventPoller(int timeOut) : timeOut_(timeOut), epollFd_(INVALID_FD), eventFd_(INVALID_FD) {}
30
~EpollEventPoller()31 EpollEventPoller::~EpollEventPoller()
32 {
33 if (state_ == STARTED) {
34 HILOG_INFO(LOG_CORE, "need Stop in destructor!");
35 Stop();
36 }
37 if (state_ == INITIED) {
38 HILOG_INFO(LOG_CORE, "need Finalize in dtor!");
39 Finalize();
40 }
41 }
42
AddFileDescriptor(int fd,const OnReadableCallback & onReadable,const OnWritableCallback & onWritable)43 bool EpollEventPoller::AddFileDescriptor(int fd,
44 const OnReadableCallback& onReadable,
45 const OnWritableCallback& onWritable)
46 {
47 auto ctx = std::make_shared<EventContext>();
48 CHECK_NOTNULL(ctx, false, "create EventContext FAILED!");
49 ctx->fd = fd;
50 ctx->onReadable = onReadable;
51 ctx->onWritable = onWritable;
52
53 std::unique_lock<std::mutex> lock(mutex_);
54 CHECK_TRUE(AddContextLocked(ctx), false, "add context for %d failed!", fd);
55 return Notify();
56 }
57
RemoveFileDescriptor(int fd)58 bool EpollEventPoller::RemoveFileDescriptor(int fd)
59 {
60 std::unique_lock<std::mutex> lock(mutex_);
61 auto it = context_.find(fd);
62 CHECK_TRUE(it != context_.end(), false, "fd %d not found in poll set!", fd);
63
64 auto ctx = it->second;
65 CHECK_NOTNULL(ctx, false, "ctx null!");
66 CHECK_TRUE(RemoveContextLocked(ctx), false, "remove context for %d failed!", fd);
67 return Notify();
68 }
69
AddContextLocked(const EventContextPtr & ctx)70 bool EpollEventPoller::AddContextLocked(const EventContextPtr& ctx)
71 {
72 context_[ctx->fd] = ctx;
73 return UpdateEvent(EPOLL_CTL_ADD, ctx);
74 }
75
RemoveContextLocked(const EventContextPtr & ctx)76 bool EpollEventPoller::RemoveContextLocked(const EventContextPtr& ctx)
77 {
78 context_.erase(ctx->fd);
79 CHECK_TRUE(UpdateEvent(EPOLL_CTL_DEL, ctx), false, "update fd %d ctx FAILED!", ctx->fd);
80 return true;
81 }
82
EpollOpName(int op)83 static std::string EpollOpName(int op)
84 {
85 if (op == EPOLL_CTL_ADD) {
86 return "ADD";
87 }
88 if (op == EPOLL_CTL_DEL) {
89 return "DEL";
90 }
91 if (op == EPOLL_CTL_MOD) {
92 return "MOD";
93 }
94 return "";
95 }
96
UpdateEvent(int op,const EventContextPtr & ctx)97 bool EpollEventPoller::UpdateEvent(int op, const EventContextPtr& ctx)
98 {
99 struct epoll_event event = {};
100 if (ctx->onReadable) {
101 event.events |= EPOLLIN;
102 }
103 if (ctx->onWritable) {
104 event.events |= EPOLLOUT;
105 }
106 event.data.ptr = ctx.get();
107
108 std::string name = EpollOpName(op).c_str();
109 HILOG_DEBUG(LOG_CORE, "poll set %s %d %x start!", name.c_str(), ctx->fd, event.events);
110 int retval = epoll_ctl(epollFd_, op, ctx->fd, &event);
111 CHECK_TRUE(retval == 0, false, "epoll_ctl %s failed, %d", name.c_str(), errno);
112 HILOG_DEBUG(LOG_CORE, "poll set %s %d %x done!", name.c_str(), ctx->fd, event.events);
113 return true;
114 }
115
Run()116 void EpollEventPoller::Run()
117 {
118 pthread_setname_np(pthread_self(), "EventPoller");
119 std::vector<struct epoll_event> eventVec;
120 while (running_) {
121 {
122 std::unique_lock<std::mutex> lock(mutex_);
123 eventVec.resize(context_.size());
124 }
125 int retval = TEMP_FAILURE_RETRY(epoll_wait(epollFd_, eventVec.data(), eventVec.size(), timeOut_));
126 CHECK_TRUE(retval >= 0, NO_RETVAL, "epoll_wait failed, %d!", errno);
127 if (retval == 0) {
128 HILOG_INFO(LOG_CORE, "epoll_wait %dms timeout!", timeOut_);
129 continue;
130 }
131 for (int i = 0; i < retval; i++) {
132 auto ctx = reinterpret_cast<EventContext*>(eventVec[i].data.ptr);
133 if (ctx != nullptr) {
134 HandleEvent(eventVec[i].events, *ctx);
135 }
136 }
137 }
138 }
139
HandleEvent(uint32_t events,const EventContext & ctx)140 void EpollEventPoller::HandleEvent(uint32_t events, const EventContext& ctx)
141 {
142 if (events & EPOLLIN) {
143 if (ctx.onReadable) {
144 ctx.onReadable();
145 }
146 } else if (events & EPOLLOUT) {
147 if (ctx.onWritable) {
148 ctx.onWritable();
149 }
150 }
151 }
152
OnNotify()153 void EpollEventPoller::OnNotify()
154 {
155 uint64_t value = 0;
156 CHECK_TRUE(read(eventFd_, &value, sizeof(value)) == sizeof(value), NO_RETVAL, "read eventfd FAILED!");
157 HILOG_DEBUG(LOG_CORE, "OnNotify %llu done!", static_cast<unsigned long long>(value));
158 }
159
Notify(uint64_t value)160 bool EpollEventPoller::Notify(uint64_t value)
161 {
162 auto nbytes = write(eventFd_, &value, sizeof(value));
163 CHECK_TRUE(static_cast<size_t>(nbytes) == sizeof(value), false, "write eventfd FAILED!");
164 HILOG_DEBUG(LOG_CORE, "Notify %llu done!", static_cast<unsigned long long>(value));
165 return true;
166 }
167
Init()168 bool EpollEventPoller::Init()
169 {
170 HILOG_INFO(LOG_CORE, "Init %d", state_.load());
171 CHECK_TRUE(state_ == INITIAL, false, "Init FAIL %d", state_.load());
172
173 int epollFd = epoll_create1(EPOLL_CLOEXEC);
174 CHECK_TRUE(epollFd >= 0, false, "epoll_create failed, %d!", errno);
175 fileDescriptors_.push_back(epollFd);
176
177 int eventFd = eventfd(0, O_CLOEXEC | O_NONBLOCK);
178 CHECK_TRUE(eventFd >= 0, false, "create event fd failed, %d", errno);
179 fileDescriptors_.push_back(eventFd);
180
181 auto eventFdCtx = std::make_shared<EventContext>();
182 CHECK_NOTNULL(eventFdCtx, false, "create EventContext failed!");
183 eventFdCtx->fd = eventFd;
184 eventFdCtx->onReadable = std::bind(&EpollEventPoller::OnNotify, this);
185
186 std::unique_lock<std::mutex> lock(mutex_);
187 epollFd_ = epollFd;
188 eventFd_ = eventFd;
189 AddContextLocked(eventFdCtx);
190 HILOG_INFO(LOG_CORE, "EpollEventPoller::Init %d done!", state_.load());
191 state_ = INITIED;
192 return true;
193 }
194
Finalize()195 bool EpollEventPoller::Finalize()
196 {
197 if (state_ == STARTED) {
198 HILOG_INFO(LOG_CORE, "need Stop in Finalize!");
199 Stop();
200 }
201
202 HILOG_INFO(LOG_CORE, "Finalize %d", state_.load());
203 CHECK_TRUE(state_ == INITIED, false, "Finalize FAIL %d", state_.load());
204
205 std::unique_lock<std::mutex> lock(mutex_);
206 std::vector<EventContextPtr> contextVec;
207 for (auto& ctxPair : context_) {
208 contextVec.push_back(ctxPair.second);
209 }
210 for (auto ctxPtr : contextVec) {
211 HILOG_DEBUG(LOG_CORE, "remove context for %d", ctxPtr->fd);
212 RemoveContextLocked(ctxPtr);
213 }
214
215 for (int fd : fileDescriptors_) {
216 close(fd);
217 }
218 fileDescriptors_.clear();
219
220 epollFd_ = INVALID_FD;
221 eventFd_ = INVALID_FD;
222 state_ = INITIAL;
223 return true;
224 }
225
Start()226 bool EpollEventPoller::Start()
227 {
228 HILOG_INFO(LOG_CORE, "%s %d", __func__, state_.load());
229 CHECK_TRUE(state_ == INITIED, false, "Start FAIL %d", state_.load());
230
231 running_ = true;
232 pollThread_ = std::thread(&EpollEventPoller::Run, this);
233 state_ = STARTED;
234 return true;
235 }
236
Stop()237 bool EpollEventPoller::Stop()
238 {
239 CHECK_TRUE(state_ == STARTED, false, "Stop FAIL %d", state_.load());
240
241 running_ = false;
242 Notify();
243 if (pollThread_.joinable()) {
244 pollThread_.join();
245 }
246 state_ = INITIED;
247 return true;
248 }
249