1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "epoll_event_poller.h"
16
17 #include <algorithm>
18 #include <cerrno>
19 #include <csignal>
20 #include <cstring>
21 #include <fcntl.h>
22 #include <pthread.h>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <unistd.h>
26
27 #include "logging.h"
28
EpollEventPoller(int timeOut)29 EpollEventPoller::EpollEventPoller(int timeOut) : timeOut_(timeOut), epollFd_(INVALID_FD), eventFd_(INVALID_FD) {}
30
~EpollEventPoller()31 EpollEventPoller::~EpollEventPoller()
32 {
33 if (state_ == STARTED) {
34 PROFILER_LOG_INFO(LOG_CORE, "need Stop in destructor!");
35 Stop();
36 }
37 if (state_ == INITIED) {
38 PROFILER_LOG_INFO(LOG_CORE, "need Finalize in dtor!");
39 Finalize();
40 }
41 }
42
AddFileDescriptor(int fd,const OnReadableCallback & onReadable,const OnWritableCallback & onWritable)43 bool EpollEventPoller::AddFileDescriptor(int fd,
44 const OnReadableCallback& onReadable,
45 const OnWritableCallback& onWritable)
46 {
47 auto ctx = std::make_shared<EventContext>();
48 CHECK_NOTNULL(ctx, false, "create EventContext FAILED!");
49 ctx->fd = fd;
50 ctx->onReadable = onReadable;
51 ctx->onWritable = onWritable;
52
53 std::unique_lock<std::mutex> lock(mutex_);
54 CHECK_TRUE(AddContextLocked(ctx), false, "add context for %d failed!", fd);
55 return Notify();
56 }
57
RemoveFileDescriptor(int fd)58 bool EpollEventPoller::RemoveFileDescriptor(int fd)
59 {
60 std::unique_lock<std::mutex> lock(mutex_);
61 auto it = context_.find(fd);
62 CHECK_TRUE(it != context_.end(), false, "fd %d not found in poll set!", fd);
63
64 auto ctx = it->second;
65 CHECK_NOTNULL(ctx, false, "ctx null!");
66 CHECK_TRUE(UpdateEvent(EPOLL_CTL_DEL, ctx), false, "update fd %d ctx FAILED!", ctx->fd);
67 return Notify();
68 }
69
AddContextLocked(const EventContextPtr & ctx)70 bool EpollEventPoller::AddContextLocked(const EventContextPtr& ctx)
71 {
72 context_[ctx->fd] = ctx;
73 return UpdateEvent(EPOLL_CTL_ADD, ctx);
74 }
75
EpollOpName(int op)76 static std::string EpollOpName(int op)
77 {
78 if (op == EPOLL_CTL_ADD) {
79 return "ADD";
80 }
81 if (op == EPOLL_CTL_DEL) {
82 return "DEL";
83 }
84 if (op == EPOLL_CTL_MOD) {
85 return "MOD";
86 }
87 return "";
88 }
89
UpdateEvent(int op,const EventContextPtr & ctx)90 bool EpollEventPoller::UpdateEvent(int op, const EventContextPtr& ctx)
91 {
92 struct epoll_event event = {};
93 if (ctx->onReadable) {
94 event.events |= EPOLLIN;
95 }
96 if (ctx->onWritable) {
97 event.events |= EPOLLOUT;
98 }
99 event.data.ptr = ctx.get();
100
101 std::string name = EpollOpName(op).c_str();
102 PROFILER_LOG_DEBUG(LOG_CORE, "poll set %s %d %x start!", name.c_str(), ctx->fd, event.events);
103 int retval = epoll_ctl(epollFd_, op, ctx->fd, &event);
104 CHECK_TRUE(retval == 0, false, "epoll_ctl %s failed, %d", name.c_str(), errno);
105 PROFILER_LOG_DEBUG(LOG_CORE, "poll set %s %d %x done!", name.c_str(), ctx->fd, event.events);
106 return true;
107 }
108
Run()109 void EpollEventPoller::Run()
110 {
111 pthread_setname_np(pthread_self(), "EventPoller");
112 std::vector<struct epoll_event> eventVec;
113 while (running_) {
114 {
115 std::unique_lock<std::mutex> lock(mutex_);
116 eventVec.resize(context_.size());
117 }
118 int retval = TEMP_FAILURE_RETRY(epoll_wait(epollFd_, eventVec.data(), eventVec.size(), timeOut_));
119 CHECK_TRUE(retval >= 0, NO_RETVAL, "epoll_wait failed, %d!", errno);
120 if (retval == 0) {
121 PROFILER_LOG_INFO(LOG_CORE, "epoll_wait %dms timeout!", timeOut_);
122 continue;
123 }
124 for (int i = 0; i < retval; i++) {
125 auto ctx = reinterpret_cast<EventContext*>(eventVec[i].data.ptr);
126 if (ctx != nullptr && running_.load()) {
127 HandleEvent(eventVec[i].events, *ctx);
128 }
129 }
130 }
131 }
132
HandleEvent(uint32_t events,const EventContext & ctx)133 void EpollEventPoller::HandleEvent(uint32_t events, const EventContext& ctx)
134 {
135 if (events & EPOLLIN) {
136 if (ctx.onReadable) {
137 ctx.onReadable();
138 }
139 } else if (events & EPOLLOUT) {
140 if (ctx.onWritable) {
141 ctx.onWritable();
142 }
143 }
144 }
145
OnNotify()146 void EpollEventPoller::OnNotify()
147 {
148 uint64_t value = 0;
149 CHECK_TRUE(read(eventFd_, &value, sizeof(value)) == sizeof(value), NO_RETVAL, "read eventfd FAILED!");
150 PROFILER_LOG_DEBUG(LOG_CORE, "OnNotify %llu done!", static_cast<unsigned long long>(value));
151 }
152
Notify(uint64_t value)153 bool EpollEventPoller::Notify(uint64_t value)
154 {
155 auto nbytes = write(eventFd_, &value, sizeof(value));
156 CHECK_TRUE(static_cast<size_t>(nbytes) == sizeof(value), false, "write eventfd FAILED!");
157 PROFILER_LOG_DEBUG(LOG_CORE, "Notify %llu done!", static_cast<unsigned long long>(value));
158 return true;
159 }
160
Init()161 bool EpollEventPoller::Init()
162 {
163 PROFILER_LOG_INFO(LOG_CORE, "Init %d", state_.load());
164 CHECK_TRUE(state_ == INITIAL, false, "Init FAIL %d", state_.load());
165
166 int epollFd = epoll_create1(EPOLL_CLOEXEC);
167 CHECK_TRUE(epollFd >= 0, false, "epoll_create failed, %d!", errno);
168 fileDescriptors_.push_back(epollFd);
169
170 int eventFd = eventfd(0, O_CLOEXEC | O_NONBLOCK);
171 CHECK_TRUE(eventFd >= 0, false, "create event fd failed, %d", errno);
172 fileDescriptors_.push_back(eventFd);
173
174 auto eventFdCtx = std::make_shared<EventContext>();
175 CHECK_NOTNULL(eventFdCtx, false, "create EventContext failed!");
176 eventFdCtx->fd = eventFd;
177 eventFdCtx->onReadable = ([this] { this->OnNotify(); });
178
179 std::unique_lock<std::mutex> lock(mutex_);
180 epollFd_ = epollFd;
181 eventFd_ = eventFd;
182 AddContextLocked(eventFdCtx);
183 PROFILER_LOG_INFO(LOG_CORE, "EpollEventPoller::Init %d done!", state_.load());
184 state_ = INITIED;
185 return true;
186 }
187
Finalize()188 bool EpollEventPoller::Finalize()
189 {
190 if (state_ == STARTED) {
191 PROFILER_LOG_INFO(LOG_CORE, "need Stop in Finalize!");
192 Stop();
193 }
194
195 PROFILER_LOG_INFO(LOG_CORE, "Finalize %d", state_.load());
196 CHECK_TRUE(state_ == INITIED, false, "Finalize FAIL %d", state_.load());
197
198 std::unique_lock<std::mutex> lock(mutex_);
199 std::vector<EventContextPtr> contextVec;
200 for (auto& ctxPair : context_) {
201 contextVec.push_back(ctxPair.second);
202 }
203 for (auto ctxPtr : contextVec) {
204 PROFILER_LOG_DEBUG(LOG_CORE, "remove context for %d", ctxPtr->fd);
205 UpdateEvent(EPOLL_CTL_DEL, ctxPtr);
206 }
207
208 for (int fd : fileDescriptors_) {
209 close(fd);
210 }
211 fileDescriptors_.clear();
212 context_.clear();
213 epollFd_ = INVALID_FD;
214 eventFd_ = INVALID_FD;
215 state_ = INITIAL;
216 return true;
217 }
218
Start()219 bool EpollEventPoller::Start()
220 {
221 PROFILER_LOG_INFO(LOG_CORE, "%s %d", __func__, state_.load());
222 if (state_ == STARTED) {
223 PROFILER_LOG_INFO(LOG_CORE, "epoll thread has started!");
224 return true;
225 }
226 std::unique_lock<std::mutex> lock(mutex_);
227 CHECK_TRUE(state_ == INITIED, false, "Start FAIL %d", state_.load());
228 running_ = true;
229 pollThread_ = std::thread([this] { this->Run(); });
230 state_ = STARTED;
231 return true;
232 }
233
Stop()234 bool EpollEventPoller::Stop()
235 {
236 std::unique_lock<std::mutex> lock(stopPollerMutex_);
237 CHECK_TRUE(state_ == STARTED, false, "Stop FAIL %d", state_.load());
238 running_ = false;
239 Notify();
240 if (pollThread_.joinable()) {
241 pollThread_.join();
242 }
243 state_ = INITIED;
244 return true;
245 }
246