• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "poller.h"
16 
17 #include "sched/execute_ctx.h"
18 #include "dfx/log/ffrt_log_api.h"
19 
20 #ifdef FFRT_IO_TASK_SCHEDULER
21 namespace ffrt {
Poller()22 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) },
23     m_events(1024)
24 {
25     m_wakeData.cb = nullptr;
26     m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
27     epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
28     if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
29         std::terminate();
30     }
31 }
32 
~Poller()33 Poller::~Poller() noexcept
34 {
35     ::close(m_wakeData.fd);
36     ::close(m_epFd);
37     timerHandle_ = -1;
38     m_wakeDataMap.clear();
39     m_delCntMap.clear();
40     timerMap_.clear();
41     executedHandle_.clear();
42     flag_ = EpollStatus::TEARDOWN;
43 }
44 
AddFdEvent(uint32_t events,int fd,void * data,ffrt_poller_cb cb)45 int Poller::AddFdEvent(uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
46 {
47     auto wakeData = std::unique_ptr<WakeDataWithCb>(new (std::nothrow) WakeDataWithCb(fd, data, cb));
48     epoll_event ev = { .events = events, .data = {.ptr = static_cast<void*>(wakeData.get())} };
49     if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) != 0) {
50         FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
51         return -1;
52     }
53 
54     std::unique_lock lock(m_mapMutex);
55     m_wakeDataMap[fd].emplace_back(std::move(wakeData));
56     fdEmpty_.store(false);
57     return 0;
58 }
59 
DelFdEvent(int fd)60 int Poller::DelFdEvent(int fd) noexcept
61 {
62     if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
63         FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
64         return -1;
65     }
66 
67     std::unique_lock lock(m_mapMutex);
68     m_delCntMap[fd]++;
69     WakeUp();
70     return 0;
71 }
72 
WakeUp()73 void Poller::WakeUp() noexcept
74 {
75     uint64_t one = 1;
76     ssize_t n = ::write(m_wakeData.fd, &one, sizeof one);
77 }
78 
PollOnce(int timeout)79 PollerRet Poller::PollOnce(int timeout) noexcept
80 {
81     int realTimeout = timeout;
82     int timerHandle = -1;
83     PollerRet ret = PollerRet::RET_NULL;
84 
85     timerMutex_.lock();
86     if (!timerMap_.empty()) {
87         auto cur = timerMap_.begin();
88         timerHandle = cur->second.handle;
89 
90         realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
91             cur->first - std::chrono::steady_clock::now()).count();
92         if (realTimeout <= 0) {
93             ExecuteTimerCb(cur);
94             return PollerRet::RET_TIMER;
95         }
96 
97         if (timeout != -1) {
98             timerHandle = -1;
99             realTimeout = timeout;
100         }
101 
102         flag_ = EpollStatus::WAIT;
103     }
104     timerMutex_.unlock();
105 
106     pollerCount_++;
107     int nfds = epoll_wait(m_epFd, m_events.data(), m_events.size(), realTimeout);
108     flag_ = EpollStatus::WAKE;
109     if (nfds < 0) {
110         FFRT_LOGE("epoll_wait error.");
111         return PollerRet::RET_NULL;
112     }
113 
114     if (nfds == 0) {
115         if (timerHandle != -1) {
116             timerMutex_.lock();
117             for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
118                 if (it->second.handle == timerHandle) {
119                     ExecuteTimerCb(it);
120                     return PollerRet::RET_TIMER;
121                 }
122             }
123             timerMutex_.unlock();
124         }
125         return PollerRet::RET_NULL;
126     }
127 
128     for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
129         struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(m_events[i].data.ptr);
130         int currFd = data->fd;
131         if (currFd == m_wakeData.fd) {
132             uint64_t one = 1;
133             ssize_t n = ::read(m_wakeData.fd, &one, sizeof one);
134             continue;
135         }
136 
137         if (data->cb == nullptr) {
138             continue;
139         }
140         data->cb(data->data, m_events[i].events, pollerCount_);
141     }
142 
143     ReleaseFdWakeData();
144     return PollerRet::RET_EPOLL;
145 }
146 
ReleaseFdWakeData()147 void Poller::ReleaseFdWakeData() noexcept
148 {
149     std::unique_lock lock(m_mapMutex);
150     for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
151         int delFd = delIter->first;
152         int delCnt = delIter->second;
153         auto& wakeDataList = m_wakeDataMap[delFd];
154         int diff = wakeDataList.size() - delCnt;
155         if (diff == 0) {
156             m_wakeDataMap.erase(delFd);
157             m_delCntMap.erase(delIter++);
158             continue;
159         } else if (diff == 1) {
160             for (int i = 0; i < delCnt - 1; i++) {
161                 wakeDataList.pop_front();
162             }
163             m_delCntMap[delFd] = 1;
164         } else {
165             FFRT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
166         }
167         delIter++;
168     }
169 
170     fdEmpty_.store(m_wakeDataMap.empty());
171 }
ExecuteTimerCb(std::multimap<time_point_t,TimerDataWithCb>::iterator & timer)172 void Poller::ExecuteTimerCb(std::multimap<time_point_t, TimerDataWithCb>::iterator& timer) noexcept
173 {
174     std::vector<TimerDataWithCb> timerData;
175     for (auto iter = timerMap_.begin(); iter != timerMap_.end();) {
176         if (iter->first <= timer->first) {
177             timerData.emplace_back(iter->second);
178             executedHandle_[iter->second.handle] = TimerStatus::EXECUTING;
179             iter = timerMap_.erase(iter);
180             continue;
181         }
182         break;
183     }
184     timerEmpty_.store(timerMap_.empty());
185 
186     timerMutex_.unlock();
187     for (const auto& data : timerData) {
188         data.cb(data.data);
189         executedHandle_[data.handle] = TimerStatus::EXECUTED;
190     }
191 }
192 
RegisterTimer(uint64_t timeout,void * data,void (* cb)(void *))193 int Poller::RegisterTimer(uint64_t timeout, void* data, void(*cb)(void*)) noexcept
194 {
195     if (cb == nullptr || flag_ == EpollStatus::TEARDOWN) {
196         return -1;
197     }
198 
199     std::lock_guard lock(timerMutex_);
200     time_point_t absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout);
201     bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
202 
203     TimerDataWithCb timerMapValue(data, cb);
204     timerHandle_ += 1;
205     timerMapValue.handle = timerHandle_;
206     timerMap_.emplace(absoluteTime, timerMapValue);
207     timerEmpty_.store(false);
208 
209     if (wake) {
210         WakeUp();
211     }
212 
213     return timerHandle_;
214 }
215 
DeregisterTimer(int handle)216 void Poller::DeregisterTimer(int handle) noexcept
217 {
218     if (flag_ == EpollStatus::TEARDOWN) {
219         return;
220     }
221 
222     std::lock_guard lock(timerMutex_);
223     auto it = executedHandle_.find(handle);
224     if (it != executedHandle_.end()) {
225         while (it->second == TimerStatus::EXECUTING) {
226             std::this_thread::yield();
227         }
228         executedHandle_.erase(it);
229         return;
230     }
231 
232     bool wake = false;
233     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
234         if (cur->second.handle == handle) {
235             if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
236                 wake = true;
237             }
238             timerMap_.erase(cur);
239             break;
240         }
241     }
242 
243     timerEmpty_.store(timerMap_.empty());
244 
245     if (wake) {
246         WakeUp();
247     }
248 }
249 
DetermineEmptyMap()250 bool Poller::DetermineEmptyMap() noexcept
251 {
252     return fdEmpty_ && timerEmpty_;
253 }
254 
GetTimerStatus(int handle)255 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
256 {
257     if (flag_ == EpollStatus::TEARDOWN) {
258         return ffrt_timer_notfound;
259     }
260 
261     std::lock_guard lock(timerMutex_);
262     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
263         if (cur->second.handle == handle) {
264             return ffrt_timer_not_executed;
265         }
266     }
267 
268     auto it = executedHandle_.find(handle);
269     if (it != executedHandle_.end()) {
270         while (it->second == TimerStatus::EXECUTING) {
271             std::this_thread::yield();
272         }
273         return ffrt_timer_executed;
274     }
275 
276     return ffrt_timer_notfound;
277 }
278 }
279 #endif