1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "poller.h"
16 #include "sched/execute_ctx.h"
17 #include "tm/scpu_task.h"
18 #include "dfx/log/ffrt_log_api.h"
19
20 namespace ffrt {
Poller()21 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
22 {
23 m_wakeData.cb = nullptr;
24 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
25 epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
26 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
27 FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
28 std::terminate();
29 }
30 }
31
~Poller()32 Poller::~Poller() noexcept
33 {
34 ::close(m_wakeData.fd);
35 ::close(m_epFd);
36 timerHandle_ = -1;
37 m_wakeDataMap.clear();
38 m_delCntMap.clear();
39 timerMap_.clear();
40 executedHandle_.clear();
41 flag_ = EpollStatus::TEARDOWN;
42 m_waitTaskMap.clear();
43 m_cachedTaskEvents.clear();
44 }
45
Instance()46 PollerProxy& PollerProxy::Instance()
47 {
48 static PollerProxy pollerInstance;
49 return pollerInstance;
50 }
51
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)52 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
53 {
54 auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, ExecuteCtx::Cur()->task);
55 if (ExecuteCtx::Cur()->task) {
56 ExecuteCtx::Cur()->task->pollerEnable = true;
57 }
58 void* ptr = static_cast<void*>(wakeData.get());
59 if (ptr == nullptr || wakeData == nullptr) {
60 FFRT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
61 return -1;
62 }
63 wakeData->monitorEvents = events;
64
65 epoll_event ev = { .events = events, .data = { .ptr = ptr } };
66 std::unique_lock lock(m_mapMutex);
67 if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
68 FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
69 return -1;
70 }
71
72 if (op == EPOLL_CTL_ADD) {
73 m_wakeDataMap[fd].emplace_back(std::move(wakeData));
74 fdEmpty_.store(false);
75 } else if (op == EPOLL_CTL_MOD) {
76 auto iter = m_wakeDataMap.find(fd);
77 FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
78 if (iter->second.size() != 1) {
79 FFRT_LOGE("epoll_ctl mod fd wakedata num invalid");
80 return -1;
81 }
82 iter->second.pop_back();
83 iter->second.emplace_back(std::move(wakeData));
84 }
85 return 0;
86 }
87
CacheMaskFdAndEpollDel(int fd,CPUEUTask * task)88 void Poller::CacheMaskFdAndEpollDel(int fd, CPUEUTask *task) noexcept
89 {
90 auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
91 if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
92 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
93 FFRT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
94 }
95 CacheDelFd(fd, task);
96 }
97 }
98
ClearMaskWakeDataWithCbCache(CPUEUTask * task)99 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask *task) noexcept
100 {
101 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
102 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
103 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
104 for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
105 WakeDataWithCb* ptr = iter->get();
106 m_delFdCacheMap.erase(ptr->fd);
107 }
108 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
109 }
110 return 0;
111 }
112
ClearMaskWakeDataWithCbCacheWithFd(CPUEUTask * task,int fd)113 int Poller::ClearMaskWakeDataWithCbCacheWithFd(CPUEUTask *task, int fd) noexcept
114 {
115 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
116 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
117 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
118 auto pred = [fd](auto& value) { return value->fd == fd; };
119 wakeDataList.remove_if(pred);
120 if (wakeDataList.size() == 0) {
121 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
122 }
123 }
124 return 0;
125 }
126
ClearDelFdCache(int fd)127 int Poller::ClearDelFdCache(int fd) noexcept
128 {
129 auto fdDelCacheIter = m_delFdCacheMap.find(fd);
130 if (fdDelCacheIter != m_delFdCacheMap.end()) {
131 CPUEUTask *task = fdDelCacheIter->second;
132 ClearMaskWakeDataWithCbCacheWithFd(task, fd);
133 m_delFdCacheMap.erase(fdDelCacheIter);
134 }
135 return 0;
136 }
137
DelFdEvent(int fd)138 int Poller::DelFdEvent(int fd) noexcept
139 {
140 std::unique_lock lock(m_mapMutex);
141 ClearDelFdCache(fd);
142 auto wakeDataIter = m_wakeDataMap.find(fd);
143 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
144 FFRT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
145 return -1;
146 }
147 auto delCntIter = m_delCntMap.find(fd);
148 if (delCntIter != m_delCntMap.end()) {
149 int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
150 if (diff == 0) {
151 FFRT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
152 wakeDataIter->second.size(), delCntIter->second);
153 return -1;
154 }
155 }
156
157 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
158 FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
159 return -1;
160 }
161
162 m_delCntMap[fd]++;
163 WakeUp();
164 return 0;
165 }
166
ClearCachedEvents(CPUEUTask * task)167 void Poller::ClearCachedEvents(CPUEUTask* task) noexcept
168 {
169 std::unique_lock lock(m_mapMutex);
170 auto iter = m_cachedTaskEvents.find(task);
171 if (iter == m_cachedTaskEvents.end()) {
172 return;
173 }
174 m_cachedTaskEvents.erase(iter);
175 ClearMaskWakeDataWithCbCache(task);
176 }
177
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)178 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
179 {
180 std::unordered_map<int, int> seenFd;
181 int fdCnt = 0;
182 for (size_t i = 0; i < cachedEventsVec.size(); i++) {
183 auto eventInfo = cachedEventsVec[i];
184 int currFd = eventInfo.data.fd;
185 // check if seen
186 auto iter = seenFd.find(currFd);
187 if (iter == seenFd.end()) {
188 // if not seen, copy cached events and record idx
189 eventsVec[fdCnt].data.fd = currFd;
190 eventsVec[fdCnt].events = eventInfo.events;
191 seenFd[currFd] = fdCnt;
192 fdCnt++;
193 } else {
194 // if seen, update event to newest
195 eventsVec[iter->second].events |= eventInfo.events;
196 FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
197 continue;
198 }
199
200 // Unmask to origin events
201 auto wakeDataIter = m_wakeDataMap.find(currFd);
202 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
203 FFRT_LOGD("fd[%d] may be deleted", currFd);
204 continue;
205 }
206
207 auto& wakeData = wakeDataIter->second.back();
208 epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
209 auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
210 if (fdDelCacheIter != m_delFdCacheMap.end()) {
211 ClearDelFdCache(currFd);
212 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
213 FFRT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
214 continue;
215 }
216 } else {
217 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
218 FFRT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
219 continue;
220 }
221 }
222 }
223 return fdCnt;
224 }
225
FetchCachedEventAndDoUnmask(CPUEUTask * task,struct epoll_event * eventsVec)226 int Poller::FetchCachedEventAndDoUnmask(CPUEUTask* task, struct epoll_event* eventsVec) noexcept
227 {
228 // should used in lock
229 auto syncTaskIter = m_cachedTaskEvents.find(task);
230 if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
231 return 0;
232 }
233
234 int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
235 m_cachedTaskEvents.erase(syncTaskIter);
236 ClearMaskWakeDataWithCbCache(task);
237 return nfds;
238 }
239
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)240 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
241 {
242 FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
243
244 auto task = ExecuteCtx::Cur()->task;
245 if (!task) {
246 FFRT_LOGE("nonworker shall not call this fun.");
247 return -1;
248 }
249
250 FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
251
252 int nfds = 0;
253 if (ThreadWaitMode(task)) {
254 std::unique_lock<std::mutex> lck(task->mutex_);
255 m_mapMutex.lock();
256 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
257 if (cachedNfds > 0) {
258 m_mapMutex.unlock();
259 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
260 task->label.c_str(), task->gid, cachedNfds);
261 return cachedNfds;
262 }
263
264 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
265 FFRT_LOGE("task has waited before");
266 m_mapMutex.unlock();
267 return 0;
268 }
269 if (FFRT_UNLIKELY(LegacyMode(task))) {
270 task->blockType = BlockType::BLOCK_THREAD;
271 }
272 auto currTime = std::chrono::steady_clock::now();
273 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
274 if (timeout > -1) {
275 FFRT_LOGD("poller meet timeout={%d}", timeout);
276 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
277 }
278 m_mapMutex.unlock();
279 reinterpret_cast<SCPUEUTask*>(task)->waitCond_.wait(lck);
280 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
281 return nfds;
282 }
283
284 CoWait([&](CPUEUTask *task)->bool {
285 m_mapMutex.lock();
286 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
287 if (cachedNfds > 0) {
288 m_mapMutex.unlock();
289 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
290 task->label.c_str(), task->gid, cachedNfds);
291 nfds = cachedNfds;
292 return false;
293 }
294
295 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
296 FFRT_LOGE("task has waited before");
297 m_mapMutex.unlock();
298 return false;
299 }
300 auto currTime = std::chrono::steady_clock::now();
301 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
302 if (timeout > -1) {
303 FFRT_LOGD("poller meet timeout={%d}", timeout);
304 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
305 }
306 m_mapMutex.unlock();
307 // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
308 return true;
309 });
310 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
311 return nfds;
312 }
313
WakeUp()314 void Poller::WakeUp() noexcept
315 {
316 uint64_t one = 1;
317 (void)::write(m_wakeData.fd, &one, sizeof one);
318 }
319
ProcessWaitedFds(int nfds,std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)320 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents,
321 std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
322 {
323 for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
324 struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
325 int currFd = data->fd;
326 if (currFd == m_wakeData.fd) {
327 uint64_t one = 1;
328 (void)::read(m_wakeData.fd, &one, sizeof one);
329 continue;
330 }
331
332 if (data->cb != nullptr) {
333 data->cb(data->data, waitedEvents[i].events);
334 continue;
335 }
336
337 if (data->task != nullptr) {
338 epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
339 syncTaskEvents[data->task].push_back(ev);
340 if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
341 std::unique_lock lock(m_mapMutex);
342 CacheMaskFdAndEpollDel(currFd, data->task);
343 }
344 }
345 }
346 }
347
348 namespace {
WakeTask(CPUEUTask * task)349 void WakeTask(CPUEUTask* task)
350 {
351 if (ThreadNotifyMode(task)) {
352 std::unique_lock<std::mutex> lck(task->mutex_);
353 if (BlockThread(task)) {
354 task->blockType = BlockType::BLOCK_COROUTINE;
355 }
356 reinterpret_cast<SCPUEUTask*>(task)->waitCond_.notify_one();
357 } else {
358 CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
359 }
360 }
361
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)362 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
363 {
364 int nfds = cachedEventsVec.size();
365 for (int i = 0; i < nfds; i++) {
366 eventsVec[i].events = cachedEventsVec[i].events;
367 eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
368 }
369 return nfds;
370 }
371
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)372 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
373 {
374 epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
375 int* nfdsPtr = taskInfo.nfdsPtr;
376 if (eventsPtr == nullptr || nfdsPtr == nullptr) {
377 FFRT_LOGE("usr ptr is nullptr");
378 return;
379 }
380 *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
381 }
382 } // namespace
383
CacheEventsAndDoMask(CPUEUTask * task,EventVec & eventVec)384 void Poller::CacheEventsAndDoMask(CPUEUTask* task, EventVec& eventVec) noexcept
385 {
386 auto& syncTaskEvents = m_cachedTaskEvents[task];
387 for (size_t i = 0; i < eventVec.size(); i++) {
388 int currFd = eventVec[i].data.fd;
389 auto wakeDataIter = m_wakeDataMap.find(currFd);
390 if (wakeDataIter == m_wakeDataMap.end() ||
391 wakeDataIter->second.size() == 0 ||
392 wakeDataIter->second.back()->task != task) {
393 FFRT_LOGD("fd[%d] may be deleted", currFd);
394 continue;
395 }
396
397 auto delIter = m_delCntMap.find(currFd);
398 if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
399 FFRT_LOGD("fd[%d] may be deleted", currFd);
400 continue;
401 }
402
403 struct epoll_event maskEv;
404 maskEv.events = 0;
405 auto& wakeData = wakeDataIter->second.back();
406 std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
407 wakeData->data, wakeData->cb, wakeData->task);
408 void* ptr = static_cast<void*>(maskWakeData.get());
409 if (ptr == nullptr || maskWakeData == nullptr) {
410 FFRT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
411 continue;
412 }
413 maskWakeData->monitorEvents = 0;
414 CacheMaskWakeData(task, maskWakeData);
415 maskEv.data = {.ptr = ptr};
416 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
417 // ENOENT indicate fd is not in epfd, may be deleted
418 FFRT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
419 }
420 FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
421 syncTaskEvents.push_back(eventVec[i]);
422 }
423 }
424
WakeSyncTask(std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents)425 void Poller::WakeSyncTask(std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents) noexcept
426 {
427 if (syncTaskEvents.empty()) {
428 return;
429 }
430
431 std::unordered_set<int> timerHandlesToRemove;
432 std::unordered_set<CPUEUTask*> tasksToWake;
433 m_mapMutex.lock();
434 for (auto& taskEventPair : syncTaskEvents) {
435 CPUEUTask* currTask = taskEventPair.first;
436 auto iter = m_waitTaskMap.find(currTask);
437 if (iter == m_waitTaskMap.end()) {
438 CacheEventsAndDoMask(currTask, taskEventPair.second);
439 continue;
440 }
441 CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
442 auto timerHandle = iter->second.timerHandle;
443 if (timerHandle > -1) {
444 timerHandlesToRemove.insert(timerHandle);
445 }
446 tasksToWake.insert(currTask);
447 m_waitTaskMap.erase(iter);
448 }
449
450 m_mapMutex.unlock();
451 if (timerHandlesToRemove.size() > 0) {
452 std::lock_guard lock(timerMutex_);
453 for (auto it = timerMap_.begin(); it != timerMap_.end();) {
454 if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
455 it = timerMap_.erase(it);
456 } else {
457 ++it;
458 }
459 }
460 timerEmpty_.store(timerMap_.empty());
461 }
462
463 for (auto task : tasksToWake) {
464 WakeTask(task);
465 }
466 }
467
GetTaskWaitTime(CPUEUTask * task)468 uint64_t Poller::GetTaskWaitTime(CPUEUTask* task) noexcept
469 {
470 std::unique_lock lock(m_mapMutex);
471 auto iter = m_waitTaskMap.find(task);
472 if (iter == m_waitTaskMap.end()) {
473 return 0;
474 }
475
476 return std::chrono::duration_cast<std::chrono::seconds>(
477 iter->second.waitTP.time_since_epoch()).count();
478 }
479
PollOnce(int timeout)480 PollerRet Poller::PollOnce(int timeout) noexcept
481 {
482 int realTimeout = timeout;
483 int timerHandle = -1;
484
485 timerMutex_.lock();
486 if (!timerMap_.empty()) {
487 auto cur = timerMap_.begin();
488 timerHandle = cur->second.handle;
489 TimePoint now = std::chrono::steady_clock::now();
490 realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
491 cur->first - now).count();
492 if (realTimeout <= 0) {
493 ExecuteTimerCb(now);
494 return PollerRet::RET_TIMER;
495 }
496
497 if (timeout != -1 && realTimeout > timeout) {
498 timerHandle = -1;
499 realTimeout = timeout;
500 }
501
502 flag_ = EpollStatus::WAIT;
503 }
504 timerMutex_.unlock();
505
506 pollerCount_++;
507 std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
508 int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
509 flag_ = EpollStatus::WAKE;
510 if (nfds < 0) {
511 if (errno != EINTR) {
512 FFRT_LOGE("epoll_wait error, errorno= %d.", errno);
513 }
514 return PollerRet::RET_NULL;
515 }
516 if (nfds == 0) {
517 if (timerHandle != -1) {
518 timerMutex_.lock();
519 for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
520 if (it->second.handle == timerHandle) {
521 ExecuteTimerCb(it->first);
522 return PollerRet::RET_TIMER;
523 }
524 }
525 timerMutex_.unlock();
526 }
527 return PollerRet::RET_NULL;
528 }
529
530 std::unordered_map<CPUEUTask*, EventVec> syncTaskEvents;
531 ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
532 WakeSyncTask(syncTaskEvents);
533 ReleaseFdWakeData();
534 return PollerRet::RET_EPOLL;
535 }
536
ReleaseFdWakeData()537 void Poller::ReleaseFdWakeData() noexcept
538 {
539 std::unique_lock lock(m_mapMutex);
540 for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
541 int delFd = delIter->first;
542 unsigned int delCnt = static_cast<unsigned int>(delIter->second);
543 auto& wakeDataList = m_wakeDataMap[delFd];
544 int diff = wakeDataList.size() - delCnt;
545 if (diff == 0) {
546 m_wakeDataMap.erase(delFd);
547 m_delCntMap.erase(delIter++);
548 continue;
549 } else if (diff == 1) {
550 for (unsigned int i = 0; i < delCnt - 1; i++) {
551 wakeDataList.pop_front();
552 }
553 m_delCntMap[delFd] = 1;
554 } else {
555 FFRT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
556 }
557 delIter++;
558 }
559
560 fdEmpty_.store(m_wakeDataMap.empty());
561 }
562
ProcessTimerDataCb(CPUEUTask * task)563 void Poller::ProcessTimerDataCb(CPUEUTask* task) noexcept
564 {
565 m_mapMutex.lock();
566 auto iter = m_waitTaskMap.find(task);
567 if (iter != m_waitTaskMap.end()) {
568 WakeTask(task);
569 m_waitTaskMap.erase(iter);
570 }
571 m_mapMutex.unlock();
572 }
573
ExecuteTimerCb(TimePoint timer)574 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
575 {
576 while (!timerMap_.empty()) {
577 auto iter = timerMap_.begin();
578 if (iter->first > timer) {
579 break;
580 }
581
582 TimerDataWithCb data = iter->second;
583 if (data.cb != nullptr) {
584 executedHandle_[data.handle] = TimerStatus::EXECUTING;
585 }
586
587 timerMap_.erase(iter);
588 timerEmpty_.store(timerMap_.empty());
589
590 if (data.cb != nullptr) {
591 timerMutex_.unlock();
592 data.cb(data.data);
593 timerMutex_.lock();
594 executedHandle_[data.handle] = TimerStatus::EXECUTED;
595 } else if (data.task != nullptr) {
596 timerMutex_.unlock();
597 ProcessTimerDataCb(data.task);
598 timerMutex_.lock();
599 }
600
601 if (data.repeat && (executedHandle_.find(data.handle) != executedHandle_.end())) {
602 executedHandle_.erase(data.handle);
603 RegisterTimerImpl(data);
604 }
605 }
606 timerMutex_.unlock();
607 }
608
RegisterTimerImpl(const TimerDataWithCb & data)609 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
610 {
611 if (flag_ == EpollStatus::TEARDOWN) {
612 return;
613 }
614
615 TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
616 bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
617
618 timerMap_.emplace(absoluteTime, data);
619 timerEmpty_.store(false);
620
621 if (wake) {
622 WakeUp();
623 }
624 }
625
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)626 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
627 {
628 if (flag_ == EpollStatus::TEARDOWN) {
629 return -1;
630 }
631
632 std::lock_guard lock(timerMutex_);
633 timerHandle_ += 1;
634
635 TimerDataWithCb timerMapValue(data, cb, ExecuteCtx::Cur()->task, repeat, timeout);
636 timerMapValue.handle = timerHandle_;
637 RegisterTimerImpl(timerMapValue);
638
639 return timerHandle_;
640 }
641
UnregisterTimer(int handle)642 int Poller::UnregisterTimer(int handle) noexcept
643 {
644 if (flag_ == EpollStatus::TEARDOWN) {
645 return -1;
646 }
647
648 std::lock_guard lock(timerMutex_);
649 auto it = executedHandle_.find(handle);
650 if (it != executedHandle_.end()) {
651 while (it->second == TimerStatus::EXECUTING) {
652 timerMutex_.unlock();
653 std::this_thread::yield();
654 timerMutex_.lock();
655 it = executedHandle_.find(handle);
656 if (it == executedHandle_.end()) {
657 break;
658 }
659 }
660
661 if (it != executedHandle_.end()) {
662 executedHandle_.erase(it);
663 return 0;
664 }
665 }
666
667 bool wake = false;
668 int ret = -1;
669 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
670 if (cur->second.handle == handle) {
671 if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
672 wake = true;
673 }
674 timerMap_.erase(cur);
675 ret = 0;
676 break;
677 }
678 }
679
680 timerEmpty_.store(timerMap_.empty());
681
682 if (wake) {
683 WakeUp();
684 }
685 return ret;
686 }
687
DetermineEmptyMap()688 bool Poller::DetermineEmptyMap() noexcept
689 {
690 return fdEmpty_ && timerEmpty_;
691 }
692
DeterminePollerReady()693 bool Poller::DeterminePollerReady() noexcept
694 {
695 return IsFdExist() || IsTimerReady();
696 }
697
IsFdExist()698 bool Poller::IsFdExist() noexcept
699 {
700 return !fdEmpty_;
701 }
702
IsTimerReady()703 bool Poller::IsTimerReady() noexcept
704 {
705 TimePoint now = std::chrono::steady_clock::now();
706 std::lock_guard lock(timerMutex_);
707 if (timerMap_.empty()) {
708 return false;
709 }
710
711 if (now >= timerMap_.begin()->first) {
712 return true;
713 }
714 return false;
715 }
716
GetTimerStatus(int handle)717 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
718 {
719 if (flag_ == EpollStatus::TEARDOWN) {
720 return ffrt_timer_notfound;
721 }
722
723 std::lock_guard lock(timerMutex_);
724 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
725 if (cur->second.handle == handle) {
726 return ffrt_timer_not_executed;
727 }
728 }
729
730 auto it = executedHandle_.find(handle);
731 if (it != executedHandle_.end()) {
732 while (it->second == TimerStatus::EXECUTING) {
733 timerMutex_.unlock();
734 std::this_thread::yield();
735 timerMutex_.lock();
736 it = executedHandle_.find(handle);
737 if (it == executedHandle_.end()) {
738 break;
739 }
740 }
741 return ffrt_timer_executed;
742 }
743
744 return ffrt_timer_notfound;
745 }
746
GetPollCount()747 uint64_t Poller::GetPollCount() noexcept
748 {
749 return pollerCount_;
750 }
751 }