• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "poller.h"
16 #include "sched/execute_ctx.h"
17 #include "tm/scpu_task.h"
18 #include "dfx/log/ffrt_log_api.h"
19 
20 namespace ffrt {
Poller()21 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
22 {
23     m_wakeData.cb = nullptr;
24     m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
25     epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
26     if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
27         FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
28         std::terminate();
29     }
30 }
31 
~Poller()32 Poller::~Poller() noexcept
33 {
34     ::close(m_wakeData.fd);
35     ::close(m_epFd);
36     timerHandle_ = -1;
37     m_wakeDataMap.clear();
38     m_delCntMap.clear();
39     timerMap_.clear();
40     executedHandle_.clear();
41     flag_ = EpollStatus::TEARDOWN;
42     m_waitTaskMap.clear();
43     m_cachedTaskEvents.clear();
44 }
45 
Instance()46 PollerProxy& PollerProxy::Instance()
47 {
48     static PollerProxy pollerInstance;
49     return pollerInstance;
50 }
51 
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)52 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
53 {
54     auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, ExecuteCtx::Cur()->task);
55     if (ExecuteCtx::Cur()->task) {
56         ExecuteCtx::Cur()->task->pollerEnable = true;
57     }
58     void* ptr = static_cast<void*>(wakeData.get());
59     if (ptr == nullptr || wakeData == nullptr) {
60         FFRT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
61         return -1;
62     }
63     wakeData->monitorEvents = events;
64 
65     epoll_event ev = { .events = events, .data = { .ptr = ptr } };
66     std::unique_lock lock(m_mapMutex);
67     if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
68         FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
69         return -1;
70     }
71 
72     if (op == EPOLL_CTL_ADD) {
73         m_wakeDataMap[fd].emplace_back(std::move(wakeData));
74         fdEmpty_.store(false);
75     } else if (op == EPOLL_CTL_MOD) {
76         auto iter = m_wakeDataMap.find(fd);
77         FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
78         if (iter->second.size() != 1) {
79             FFRT_LOGE("epoll_ctl mod fd wakedata num invalid");
80             return -1;
81         }
82         iter->second.pop_back();
83         iter->second.emplace_back(std::move(wakeData));
84     }
85     return 0;
86 }
87 
CacheMaskFdAndEpollDel(int fd,CPUEUTask * task)88 void Poller::CacheMaskFdAndEpollDel(int fd, CPUEUTask *task) noexcept
89 {
90     auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
91     if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
92         if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
93             FFRT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
94         }
95         CacheDelFd(fd, task);
96     }
97 }
98 
ClearMaskWakeDataWithCbCache(CPUEUTask * task)99 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask *task) noexcept
100 {
101     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
102     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
103         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
104         for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
105             WakeDataWithCb* ptr = iter->get();
106             m_delFdCacheMap.erase(ptr->fd);
107         }
108         m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
109     }
110     return 0;
111 }
112 
ClearMaskWakeDataWithCbCacheWithFd(CPUEUTask * task,int fd)113 int Poller::ClearMaskWakeDataWithCbCacheWithFd(CPUEUTask *task, int fd) noexcept
114 {
115     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
116     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
117         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
118         auto pred = [fd](auto& value) { return value->fd == fd; };
119         wakeDataList.remove_if(pred);
120         if (wakeDataList.size() == 0) {
121             m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
122         }
123     }
124     return 0;
125 }
126 
ClearDelFdCache(int fd)127 int Poller::ClearDelFdCache(int fd) noexcept
128 {
129     auto fdDelCacheIter = m_delFdCacheMap.find(fd);
130     if (fdDelCacheIter != m_delFdCacheMap.end()) {
131         CPUEUTask *task = fdDelCacheIter->second;
132         ClearMaskWakeDataWithCbCacheWithFd(task, fd);
133         m_delFdCacheMap.erase(fdDelCacheIter);
134     }
135     return 0;
136 }
137 
DelFdEvent(int fd)138 int Poller::DelFdEvent(int fd) noexcept
139 {
140     std::unique_lock lock(m_mapMutex);
141     ClearDelFdCache(fd);
142     auto wakeDataIter = m_wakeDataMap.find(fd);
143     if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
144         FFRT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
145         return -1;
146     }
147     auto delCntIter = m_delCntMap.find(fd);
148     if (delCntIter != m_delCntMap.end()) {
149         int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
150         if (diff == 0) {
151             FFRT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
152                 wakeDataIter->second.size(), delCntIter->second);
153             return -1;
154         }
155     }
156 
157     if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
158         FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
159         return -1;
160     }
161 
162     m_delCntMap[fd]++;
163     WakeUp();
164     return 0;
165 }
166 
ClearCachedEvents(CPUEUTask * task)167 void Poller::ClearCachedEvents(CPUEUTask* task) noexcept
168 {
169     std::unique_lock lock(m_mapMutex);
170     auto iter = m_cachedTaskEvents.find(task);
171     if (iter == m_cachedTaskEvents.end()) {
172         return;
173     }
174     m_cachedTaskEvents.erase(iter);
175     ClearMaskWakeDataWithCbCache(task);
176 }
177 
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)178 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
179 {
180     std::unordered_map<int, int> seenFd;
181     int fdCnt = 0;
182     for (size_t i = 0; i < cachedEventsVec.size(); i++) {
183         auto eventInfo = cachedEventsVec[i];
184         int currFd = eventInfo.data.fd;
185         // check if seen
186         auto iter = seenFd.find(currFd);
187         if (iter == seenFd.end()) {
188             // if not seen, copy cached events and record idx
189             eventsVec[fdCnt].data.fd = currFd;
190             eventsVec[fdCnt].events = eventInfo.events;
191             seenFd[currFd] = fdCnt;
192             fdCnt++;
193         } else {
194             // if seen, update event to newest
195             eventsVec[iter->second].events |= eventInfo.events;
196             FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
197             continue;
198         }
199 
200         // Unmask to origin events
201         auto wakeDataIter = m_wakeDataMap.find(currFd);
202         if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
203             FFRT_LOGD("fd[%d] may be deleted", currFd);
204             continue;
205         }
206 
207         auto& wakeData = wakeDataIter->second.back();
208         epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
209         auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
210         if (fdDelCacheIter != m_delFdCacheMap.end()) {
211             ClearDelFdCache(currFd);
212             if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
213                 FFRT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
214                 continue;
215             }
216         } else {
217             if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
218                 FFRT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
219                 continue;
220             }
221         }
222     }
223     return fdCnt;
224 }
225 
FetchCachedEventAndDoUnmask(CPUEUTask * task,struct epoll_event * eventsVec)226 int Poller::FetchCachedEventAndDoUnmask(CPUEUTask* task, struct epoll_event* eventsVec) noexcept
227 {
228     // should used in lock
229     auto syncTaskIter = m_cachedTaskEvents.find(task);
230     if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
231         return 0;
232     }
233 
234     int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
235     m_cachedTaskEvents.erase(syncTaskIter);
236     ClearMaskWakeDataWithCbCache(task);
237     return nfds;
238 }
239 
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)240 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
241 {
242     FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
243 
244     auto task = ExecuteCtx::Cur()->task;
245     if (!task) {
246         FFRT_LOGE("nonworker shall not call this fun.");
247         return -1;
248     }
249 
250     FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
251 
252     int nfds = 0;
253     if (ThreadWaitMode(task)) {
254         std::unique_lock<std::mutex> lck(task->mutex_);
255         m_mapMutex.lock();
256         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
257         if (cachedNfds > 0) {
258             m_mapMutex.unlock();
259             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
260                 task->label.c_str(), task->gid, cachedNfds);
261             return cachedNfds;
262         }
263 
264         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
265             FFRT_LOGE("task has waited before");
266             m_mapMutex.unlock();
267             return 0;
268         }
269         if (FFRT_UNLIKELY(LegacyMode(task))) {
270             task->blockType = BlockType::BLOCK_THREAD;
271         }
272         auto currTime = std::chrono::steady_clock::now();
273         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
274         if (timeout > -1) {
275             FFRT_LOGD("poller meet timeout={%d}", timeout);
276             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
277         }
278         m_mapMutex.unlock();
279         reinterpret_cast<SCPUEUTask*>(task)->waitCond_.wait(lck);
280         FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
281         return nfds;
282     }
283 
284     CoWait([&](CPUEUTask *task)->bool {
285         m_mapMutex.lock();
286         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
287         if (cachedNfds > 0) {
288             m_mapMutex.unlock();
289             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
290                 task->label.c_str(), task->gid, cachedNfds);
291             nfds = cachedNfds;
292             return false;
293         }
294 
295         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
296             FFRT_LOGE("task has waited before");
297             m_mapMutex.unlock();
298             return false;
299         }
300         auto currTime = std::chrono::steady_clock::now();
301         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
302         if (timeout > -1) {
303             FFRT_LOGD("poller meet timeout={%d}", timeout);
304             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
305         }
306         m_mapMutex.unlock();
307         // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
308         return true;
309     });
310     FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
311     return nfds;
312 }
313 
WakeUp()314 void Poller::WakeUp() noexcept
315 {
316     uint64_t one = 1;
317     (void)::write(m_wakeData.fd, &one, sizeof one);
318 }
319 
ProcessWaitedFds(int nfds,std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)320 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents,
321                               std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
322 {
323     for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
324         struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
325         int currFd = data->fd;
326         if (currFd == m_wakeData.fd) {
327             uint64_t one = 1;
328             (void)::read(m_wakeData.fd, &one, sizeof one);
329             continue;
330         }
331 
332         if (data->cb != nullptr) {
333             data->cb(data->data, waitedEvents[i].events);
334             continue;
335         }
336 
337         if (data->task != nullptr) {
338             epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
339             syncTaskEvents[data->task].push_back(ev);
340             if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
341                 std::unique_lock lock(m_mapMutex);
342                 CacheMaskFdAndEpollDel(currFd, data->task);
343             }
344         }
345     }
346 }
347 
348 namespace {
WakeTask(CPUEUTask * task)349 void WakeTask(CPUEUTask* task)
350 {
351     if (ThreadNotifyMode(task)) {
352         std::unique_lock<std::mutex> lck(task->mutex_);
353         if (BlockThread(task)) {
354             task->blockType = BlockType::BLOCK_COROUTINE;
355         }
356         reinterpret_cast<SCPUEUTask*>(task)->waitCond_.notify_one();
357     } else {
358         CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
359     }
360 }
361 
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)362 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
363 {
364     int nfds = cachedEventsVec.size();
365     for (int i = 0; i < nfds; i++) {
366         eventsVec[i].events = cachedEventsVec[i].events;
367         eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
368     }
369     return nfds;
370 }
371 
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)372 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
373 {
374     epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
375     int* nfdsPtr = taskInfo.nfdsPtr;
376     if (eventsPtr == nullptr || nfdsPtr == nullptr) {
377         FFRT_LOGE("usr ptr is nullptr");
378         return;
379     }
380     *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
381 }
382 } // namespace
383 
CacheEventsAndDoMask(CPUEUTask * task,EventVec & eventVec)384 void Poller::CacheEventsAndDoMask(CPUEUTask* task, EventVec& eventVec) noexcept
385 {
386     auto& syncTaskEvents = m_cachedTaskEvents[task];
387     for (size_t i = 0; i < eventVec.size(); i++) {
388         int currFd = eventVec[i].data.fd;
389         auto wakeDataIter = m_wakeDataMap.find(currFd);
390         if (wakeDataIter == m_wakeDataMap.end() ||
391             wakeDataIter->second.size() == 0 ||
392             wakeDataIter->second.back()->task != task) {
393             FFRT_LOGD("fd[%d] may be deleted", currFd);
394             continue;
395         }
396 
397         auto delIter = m_delCntMap.find(currFd);
398         if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
399             FFRT_LOGD("fd[%d] may be deleted", currFd);
400             continue;
401         }
402 
403         struct epoll_event maskEv;
404         maskEv.events = 0;
405         auto& wakeData = wakeDataIter->second.back();
406         std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
407             wakeData->data, wakeData->cb, wakeData->task);
408         void* ptr = static_cast<void*>(maskWakeData.get());
409         if (ptr == nullptr || maskWakeData == nullptr) {
410             FFRT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
411             continue;
412         }
413         maskWakeData->monitorEvents = 0;
414         CacheMaskWakeData(task, maskWakeData);
415         maskEv.data = {.ptr = ptr};
416         if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
417             // ENOENT indicate fd is not in epfd, may be deleted
418             FFRT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
419         }
420         FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
421         syncTaskEvents.push_back(eventVec[i]);
422     }
423 }
424 
WakeSyncTask(std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents)425 void Poller::WakeSyncTask(std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents) noexcept
426 {
427     if (syncTaskEvents.empty()) {
428         return;
429     }
430 
431     std::unordered_set<int> timerHandlesToRemove;
432     std::unordered_set<CPUEUTask*> tasksToWake;
433     m_mapMutex.lock();
434     for (auto& taskEventPair : syncTaskEvents) {
435         CPUEUTask* currTask = taskEventPair.first;
436         auto iter = m_waitTaskMap.find(currTask);
437         if (iter == m_waitTaskMap.end()) {
438             CacheEventsAndDoMask(currTask, taskEventPair.second);
439             continue;
440         }
441         CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
442         auto timerHandle = iter->second.timerHandle;
443         if (timerHandle > -1) {
444             timerHandlesToRemove.insert(timerHandle);
445         }
446         tasksToWake.insert(currTask);
447         m_waitTaskMap.erase(iter);
448     }
449 
450     m_mapMutex.unlock();
451     if (timerHandlesToRemove.size() > 0) {
452         std::lock_guard lock(timerMutex_);
453         for (auto it = timerMap_.begin(); it != timerMap_.end();) {
454             if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
455                 it = timerMap_.erase(it);
456             } else {
457                 ++it;
458             }
459         }
460         timerEmpty_.store(timerMap_.empty());
461     }
462 
463     for (auto task : tasksToWake) {
464         WakeTask(task);
465     }
466 }
467 
GetTaskWaitTime(CPUEUTask * task)468 uint64_t Poller::GetTaskWaitTime(CPUEUTask* task) noexcept
469 {
470     std::unique_lock lock(m_mapMutex);
471     auto iter = m_waitTaskMap.find(task);
472     if (iter == m_waitTaskMap.end()) {
473         return 0;
474     }
475 
476     return std::chrono::duration_cast<std::chrono::seconds>(
477         iter->second.waitTP.time_since_epoch()).count();
478 }
479 
PollOnce(int timeout)480 PollerRet Poller::PollOnce(int timeout) noexcept
481 {
482     int realTimeout = timeout;
483     int timerHandle = -1;
484 
485     timerMutex_.lock();
486     if (!timerMap_.empty()) {
487         auto cur = timerMap_.begin();
488         timerHandle = cur->second.handle;
489         TimePoint now = std::chrono::steady_clock::now();
490         realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
491             cur->first - now).count();
492         if (realTimeout <= 0) {
493             ExecuteTimerCb(now);
494             return PollerRet::RET_TIMER;
495         }
496 
497         if (timeout != -1 && realTimeout > timeout) {
498             timerHandle = -1;
499             realTimeout = timeout;
500         }
501 
502         flag_ = EpollStatus::WAIT;
503     }
504     timerMutex_.unlock();
505 
506     pollerCount_++;
507     std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
508     int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
509     flag_ = EpollStatus::WAKE;
510     if (nfds < 0) {
511         if (errno != EINTR) {
512             FFRT_LOGE("epoll_wait error, errorno= %d.", errno);
513         }
514         return PollerRet::RET_NULL;
515     }
516     if (nfds == 0) {
517         if (timerHandle != -1) {
518             timerMutex_.lock();
519             for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
520                 if (it->second.handle == timerHandle) {
521                     ExecuteTimerCb(it->first);
522                     return PollerRet::RET_TIMER;
523                 }
524             }
525             timerMutex_.unlock();
526         }
527         return PollerRet::RET_NULL;
528     }
529 
530     std::unordered_map<CPUEUTask*, EventVec> syncTaskEvents;
531     ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
532     WakeSyncTask(syncTaskEvents);
533     ReleaseFdWakeData();
534     return PollerRet::RET_EPOLL;
535 }
536 
ReleaseFdWakeData()537 void Poller::ReleaseFdWakeData() noexcept
538 {
539     std::unique_lock lock(m_mapMutex);
540     for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
541         int delFd = delIter->first;
542         unsigned int delCnt = static_cast<unsigned int>(delIter->second);
543         auto& wakeDataList = m_wakeDataMap[delFd];
544         int diff = wakeDataList.size() - delCnt;
545         if (diff == 0) {
546             m_wakeDataMap.erase(delFd);
547             m_delCntMap.erase(delIter++);
548             continue;
549         } else if (diff == 1) {
550             for (unsigned int i = 0; i < delCnt - 1; i++) {
551                 wakeDataList.pop_front();
552             }
553             m_delCntMap[delFd] = 1;
554         } else {
555             FFRT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
556         }
557         delIter++;
558     }
559 
560     fdEmpty_.store(m_wakeDataMap.empty());
561 }
562 
ProcessTimerDataCb(CPUEUTask * task)563 void Poller::ProcessTimerDataCb(CPUEUTask* task) noexcept
564 {
565     m_mapMutex.lock();
566     auto iter = m_waitTaskMap.find(task);
567     if (iter != m_waitTaskMap.end()) {
568         WakeTask(task);
569         m_waitTaskMap.erase(iter);
570     }
571     m_mapMutex.unlock();
572 }
573 
ExecuteTimerCb(TimePoint timer)574 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
575 {
576     while (!timerMap_.empty()) {
577         auto iter = timerMap_.begin();
578         if (iter->first > timer) {
579             break;
580         }
581 
582         TimerDataWithCb data = iter->second;
583         if (data.cb != nullptr) {
584             executedHandle_[data.handle] = TimerStatus::EXECUTING;
585         }
586 
587         timerMap_.erase(iter);
588         timerEmpty_.store(timerMap_.empty());
589 
590         if (data.cb != nullptr) {
591             timerMutex_.unlock();
592             data.cb(data.data);
593             timerMutex_.lock();
594             executedHandle_[data.handle] = TimerStatus::EXECUTED;
595         } else if (data.task != nullptr) {
596             timerMutex_.unlock();
597             ProcessTimerDataCb(data.task);
598             timerMutex_.lock();
599         }
600 
601         if (data.repeat && (executedHandle_.find(data.handle) != executedHandle_.end())) {
602             executedHandle_.erase(data.handle);
603             RegisterTimerImpl(data);
604         }
605     }
606     timerMutex_.unlock();
607 }
608 
RegisterTimerImpl(const TimerDataWithCb & data)609 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
610 {
611     if (flag_ == EpollStatus::TEARDOWN) {
612         return;
613     }
614 
615     TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
616     bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
617 
618     timerMap_.emplace(absoluteTime, data);
619     timerEmpty_.store(false);
620 
621     if (wake) {
622         WakeUp();
623     }
624 }
625 
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)626 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
627 {
628     if (flag_ == EpollStatus::TEARDOWN) {
629         return -1;
630     }
631 
632     std::lock_guard lock(timerMutex_);
633     timerHandle_ += 1;
634 
635     TimerDataWithCb timerMapValue(data, cb, ExecuteCtx::Cur()->task, repeat, timeout);
636     timerMapValue.handle = timerHandle_;
637     RegisterTimerImpl(timerMapValue);
638 
639     return timerHandle_;
640 }
641 
UnregisterTimer(int handle)642 int Poller::UnregisterTimer(int handle) noexcept
643 {
644     if (flag_ == EpollStatus::TEARDOWN) {
645         return -1;
646     }
647 
648     std::lock_guard lock(timerMutex_);
649     auto it = executedHandle_.find(handle);
650     if (it != executedHandle_.end()) {
651         while (it->second == TimerStatus::EXECUTING) {
652             timerMutex_.unlock();
653             std::this_thread::yield();
654             timerMutex_.lock();
655             it = executedHandle_.find(handle);
656             if (it == executedHandle_.end()) {
657                 break;
658             }
659         }
660 
661         if (it != executedHandle_.end()) {
662             executedHandle_.erase(it);
663             return 0;
664         }
665     }
666 
667     bool wake = false;
668     int ret = -1;
669     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
670         if (cur->second.handle == handle) {
671             if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
672                 wake = true;
673             }
674             timerMap_.erase(cur);
675             ret = 0;
676             break;
677         }
678     }
679 
680     timerEmpty_.store(timerMap_.empty());
681 
682     if (wake) {
683         WakeUp();
684     }
685     return ret;
686 }
687 
DetermineEmptyMap()688 bool Poller::DetermineEmptyMap() noexcept
689 {
690     return fdEmpty_ && timerEmpty_;
691 }
692 
DeterminePollerReady()693 bool Poller::DeterminePollerReady() noexcept
694 {
695     return IsFdExist() || IsTimerReady();
696 }
697 
IsFdExist()698 bool Poller::IsFdExist() noexcept
699 {
700     return !fdEmpty_;
701 }
702 
IsTimerReady()703 bool Poller::IsTimerReady() noexcept
704 {
705     TimePoint now = std::chrono::steady_clock::now();
706     std::lock_guard lock(timerMutex_);
707     if (timerMap_.empty()) {
708         return false;
709     }
710 
711     if (now >= timerMap_.begin()->first) {
712         return true;
713     }
714     return false;
715 }
716 
GetTimerStatus(int handle)717 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
718 {
719     if (flag_ == EpollStatus::TEARDOWN) {
720         return ffrt_timer_notfound;
721     }
722 
723     std::lock_guard lock(timerMutex_);
724     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
725         if (cur->second.handle == handle) {
726             return ffrt_timer_not_executed;
727         }
728     }
729 
730     auto it = executedHandle_.find(handle);
731     if (it != executedHandle_.end()) {
732         while (it->second == TimerStatus::EXECUTING) {
733             timerMutex_.unlock();
734             std::this_thread::yield();
735             timerMutex_.lock();
736             it = executedHandle_.find(handle);
737             if (it == executedHandle_.end()) {
738                 break;
739             }
740         }
741         return ffrt_timer_executed;
742     }
743 
744     return ffrt_timer_notfound;
745 }
746 
GetPollCount()747 uint64_t Poller::GetPollCount() noexcept
748 {
749     return pollerCount_;
750 }
751 }