1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "poller.h"
16
17 #include "sched/execute_ctx.h"
18 #include "dfx/log/ffrt_log_api.h"
19
20 #ifdef FFRT_IO_TASK_SCHEDULER
21 namespace ffrt {
Poller()22 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) },
23 m_events(1024)
24 {
25 m_wakeData.cb = nullptr;
26 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
27 epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
28 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
29 std::terminate();
30 }
31 }
32
~Poller()33 Poller::~Poller() noexcept
34 {
35 ::close(m_wakeData.fd);
36 ::close(m_epFd);
37 timerHandle_ = -1;
38 m_wakeDataMap.clear();
39 m_delCntMap.clear();
40 timerMap_.clear();
41 executedHandle_.clear();
42 flag_ = EpollStatus::TEARDOWN;
43 }
44
AddFdEvent(uint32_t events,int fd,void * data,ffrt_poller_cb cb)45 int Poller::AddFdEvent(uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
46 {
47 auto wakeData = std::unique_ptr<WakeDataWithCb>(new (std::nothrow) WakeDataWithCb(fd, data, cb));
48 epoll_event ev = { .events = events, .data = {.ptr = static_cast<void*>(wakeData.get())} };
49 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) != 0) {
50 FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
51 return -1;
52 }
53
54 std::unique_lock lock(m_mapMutex);
55 m_wakeDataMap[fd].emplace_back(std::move(wakeData));
56 fdEmpty_.store(false);
57 return 0;
58 }
59
DelFdEvent(int fd)60 int Poller::DelFdEvent(int fd) noexcept
61 {
62 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
63 FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
64 return -1;
65 }
66
67 std::unique_lock lock(m_mapMutex);
68 m_delCntMap[fd]++;
69 WakeUp();
70 return 0;
71 }
72
WakeUp()73 void Poller::WakeUp() noexcept
74 {
75 uint64_t one = 1;
76 ssize_t n = ::write(m_wakeData.fd, &one, sizeof one);
77 }
78
PollOnce(int timeout)79 PollerRet Poller::PollOnce(int timeout) noexcept
80 {
81 int realTimeout = timeout;
82 int timerHandle = -1;
83 PollerRet ret = PollerRet::RET_NULL;
84
85 timerMutex_.lock();
86 if (!timerMap_.empty()) {
87 auto cur = timerMap_.begin();
88 timerHandle = cur->second.handle;
89
90 realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
91 cur->first - std::chrono::steady_clock::now()).count();
92 if (realTimeout <= 0) {
93 ExecuteTimerCb(cur);
94 return PollerRet::RET_TIMER;
95 }
96
97 if (timeout != -1) {
98 timerHandle = -1;
99 realTimeout = timeout;
100 }
101
102 flag_ = EpollStatus::WAIT;
103 }
104 timerMutex_.unlock();
105
106 pollerCount_++;
107 int nfds = epoll_wait(m_epFd, m_events.data(), m_events.size(), realTimeout);
108 flag_ = EpollStatus::WAKE;
109 if (nfds < 0) {
110 FFRT_LOGE("epoll_wait error.");
111 return PollerRet::RET_NULL;
112 }
113
114 if (nfds == 0) {
115 if (timerHandle != -1) {
116 timerMutex_.lock();
117 for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
118 if (it->second.handle == timerHandle) {
119 ExecuteTimerCb(it);
120 return PollerRet::RET_TIMER;
121 }
122 }
123 timerMutex_.unlock();
124 }
125 return PollerRet::RET_NULL;
126 }
127
128 for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
129 struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(m_events[i].data.ptr);
130 int currFd = data->fd;
131 if (currFd == m_wakeData.fd) {
132 uint64_t one = 1;
133 ssize_t n = ::read(m_wakeData.fd, &one, sizeof one);
134 continue;
135 }
136
137 if (data->cb == nullptr) {
138 continue;
139 }
140 data->cb(data->data, m_events[i].events, pollerCount_);
141 }
142
143 ReleaseFdWakeData();
144 return PollerRet::RET_EPOLL;
145 }
146
ReleaseFdWakeData()147 void Poller::ReleaseFdWakeData() noexcept
148 {
149 std::unique_lock lock(m_mapMutex);
150 for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
151 int delFd = delIter->first;
152 int delCnt = delIter->second;
153 auto& wakeDataList = m_wakeDataMap[delFd];
154 int diff = wakeDataList.size() - delCnt;
155 if (diff == 0) {
156 m_wakeDataMap.erase(delFd);
157 m_delCntMap.erase(delIter++);
158 continue;
159 } else if (diff == 1) {
160 for (int i = 0; i < delCnt - 1; i++) {
161 wakeDataList.pop_front();
162 }
163 m_delCntMap[delFd] = 1;
164 } else {
165 FFRT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
166 }
167 delIter++;
168 }
169
170 fdEmpty_.store(m_wakeDataMap.empty());
171 }
ExecuteTimerCb(std::multimap<time_point_t,TimerDataWithCb>::iterator & timer)172 void Poller::ExecuteTimerCb(std::multimap<time_point_t, TimerDataWithCb>::iterator& timer) noexcept
173 {
174 std::vector<TimerDataWithCb> timerData;
175 for (auto iter = timerMap_.begin(); iter != timerMap_.end();) {
176 if (iter->first <= timer->first) {
177 timerData.emplace_back(iter->second);
178 executedHandle_[iter->second.handle] = TimerStatus::EXECUTING;
179 iter = timerMap_.erase(iter);
180 continue;
181 }
182 break;
183 }
184 timerEmpty_.store(timerMap_.empty());
185
186 timerMutex_.unlock();
187 for (const auto& data : timerData) {
188 data.cb(data.data);
189 executedHandle_[data.handle] = TimerStatus::EXECUTED;
190 }
191 }
192
RegisterTimer(uint64_t timeout,void * data,void (* cb)(void *))193 int Poller::RegisterTimer(uint64_t timeout, void* data, void(*cb)(void*)) noexcept
194 {
195 if (cb == nullptr || flag_ == EpollStatus::TEARDOWN) {
196 return -1;
197 }
198
199 std::lock_guard lock(timerMutex_);
200 time_point_t absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout);
201 bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
202
203 TimerDataWithCb timerMapValue(data, cb);
204 timerHandle_ += 1;
205 timerMapValue.handle = timerHandle_;
206 timerMap_.emplace(absoluteTime, timerMapValue);
207 timerEmpty_.store(false);
208
209 if (wake) {
210 WakeUp();
211 }
212
213 return timerHandle_;
214 }
215
DeregisterTimer(int handle)216 void Poller::DeregisterTimer(int handle) noexcept
217 {
218 if (flag_ == EpollStatus::TEARDOWN) {
219 return;
220 }
221
222 std::lock_guard lock(timerMutex_);
223 auto it = executedHandle_.find(handle);
224 if (it != executedHandle_.end()) {
225 while (it->second == TimerStatus::EXECUTING) {
226 std::this_thread::yield();
227 }
228 executedHandle_.erase(it);
229 return;
230 }
231
232 bool wake = false;
233 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
234 if (cur->second.handle == handle) {
235 if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
236 wake = true;
237 }
238 timerMap_.erase(cur);
239 break;
240 }
241 }
242
243 timerEmpty_.store(timerMap_.empty());
244
245 if (wake) {
246 WakeUp();
247 }
248 }
249
DetermineEmptyMap()250 bool Poller::DetermineEmptyMap() noexcept
251 {
252 return fdEmpty_ && timerEmpty_;
253 }
254
GetTimerStatus(int handle)255 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
256 {
257 if (flag_ == EpollStatus::TEARDOWN) {
258 return ffrt_timer_notfound;
259 }
260
261 std::lock_guard lock(timerMutex_);
262 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
263 if (cur->second.handle == handle) {
264 return ffrt_timer_not_executed;
265 }
266 }
267
268 auto it = executedHandle_.find(handle);
269 if (it != executedHandle_.end()) {
270 while (it->second == TimerStatus::EXECUTING) {
271 std::this_thread::yield();
272 }
273 return ffrt_timer_executed;
274 }
275
276 return ffrt_timer_notfound;
277 }
278 }
279 #endif