• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "poller.h"
16 #include "sched/execute_ctx.h"
17 #include "tm/scpu_task.h"
18 #include "dfx/log/ffrt_log_api.h"
19 
20 namespace ffrt {
Poller()21 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
22 {
23     m_wakeData.cb = nullptr;
24     m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
25     epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
26     if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
27         FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
28         std::terminate();
29     }
30 }
31 
~Poller()32 Poller::~Poller() noexcept
33 {
34     ::close(m_wakeData.fd);
35     ::close(m_epFd);
36     timerHandle_ = -1;
37     m_wakeDataMap.clear();
38     m_delCntMap.clear();
39     timerMap_.clear();
40     executedHandle_.clear();
41     flag_ = EpollStatus::TEARDOWN;
42     m_waitTaskMap.clear();
43     m_cachedTaskEvents.clear();
44 }
45 
Instance()46 PollerProxy& PollerProxy::Instance()
47 {
48     static PollerProxy pollerInstance;
49     return pollerInstance;
50 }
51 
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)52 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
53 {
54     auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, ExecuteCtx::Cur()->task);
55     void* ptr = static_cast<void*>(wakeData.get());
56     if (ptr == nullptr || wakeData == nullptr) {
57         FFRT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
58         return -1;
59     }
60     wakeData->monitorEvents = events;
61 
62     epoll_event ev = { .events = events, .data = { .ptr = ptr } };
63     std::unique_lock lock(m_mapMutex);
64     if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
65         FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
66         return -1;
67     }
68 
69     if (op == EPOLL_CTL_ADD) {
70         m_wakeDataMap[fd].emplace_back(std::move(wakeData));
71         fdEmpty_.store(false);
72     } else if (op == EPOLL_CTL_MOD) {
73         auto iter = m_wakeDataMap.find(fd);
74         FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
75         if (iter->second.size() != 1) {
76             FFRT_LOGE("epoll_ctl mod fd wakedata num invalid");
77             return -1;
78         }
79         iter->second.pop_back();
80         iter->second.emplace_back(std::move(wakeData));
81     }
82     return 0;
83 }
84 
CacheMaskFdAndEpollDel(int fd,CPUEUTask * task)85 void Poller::CacheMaskFdAndEpollDel(int fd, CPUEUTask* task) noexcept
86 {
87     auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
88     if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
89         if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
90             FFRT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
91         }
92         CacheDelFd(fd, task);
93     }
94 }
95 
ClearMaskWakeDataWithCbCache(CPUEUTask * task)96 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask* task) noexcept
97 {
98     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
99     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
100         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
101         for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
102             WakeDataWithCb* ptr = iter->get();
103             m_delFdCacheMap.erase(ptr->fd);
104         }
105         m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
106     }
107     return 0;
108 }
109 
ClearMaskWakeDataWithCbCache(CPUEUTask * task,int fd)110 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask* task, int fd) noexcept
111 {
112     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
113     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
114         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
115         auto pred = [fd](auto& value) { return value->fd == fd; };
116         wakeDataList.remove_if(pred);
117         if (wakeDataList.size() == 0) {
118             m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
119         }
120     }
121     return 0;
122 }
123 
ClearDelFdCache(int fd)124 int Poller::ClearDelFdCache(int fd) noexcept
125 {
126     auto fdDelCacheIter = m_delFdCacheMap.find(fd);
127     if (fdDelCacheIter != m_delFdCacheMap.end()) {
128         CPUEUTask* task = fdDelCacheIter->second;
129         ClearMaskWakeDataWithCbCache(task, fd);
130         m_delFdCacheMap.erase(fdDelCacheIter);
131     }
132     return 0;
133 }
134 
DelFdEvent(int fd)135 int Poller::DelFdEvent(int fd) noexcept
136 {
137     std::unique_lock lock(m_mapMutex);
138     ClearDelFdCache(fd);
139     auto wakeDataIter = m_wakeDataMap.find(fd);
140     if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
141         FFRT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
142         return -1;
143     }
144     auto delCntIter = m_delCntMap.find(fd);
145     if (delCntIter != m_delCntMap.end()) {
146         int diff = wakeDataIter->second.size() - delCntIter->second;
147         if (diff == 0) {
148             FFRT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
149                 wakeDataIter->second.size(), delCntIter->second);
150             return -1;
151         }
152     }
153 
154     if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
155         FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
156         return -1;
157     }
158 
159     for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
160         auto& events = it->second;
161         events.erase(std::remove_if(events.begin(), events.end(),
162             [fd](const epoll_event& event) {
163                 return event.data.fd == fd;
164             }), events.end());
165 
166         if (events.empty()) {
167             it = m_cachedTaskEvents.erase(it);
168         } else {
169             ++it;
170         }
171     }
172 
173     m_delCntMap[fd]++;
174     WakeUp();
175     return 0;
176 }
177 
ClearCachedEvents(CPUEUTask * task)178 void Poller::ClearCachedEvents(CPUEUTask* task) noexcept
179 {
180     std::unique_lock lock(m_mapMutex);
181     auto iter = m_cachedTaskEvents.find(task);
182     if (iter == m_cachedTaskEvents.end()) {
183         return;
184     }
185     m_cachedTaskEvents.erase(iter);
186     ClearMaskWakeDataWithCbCache(task);
187 }
188 
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)189 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
190 {
191     std::unordered_map<int, int> seenFd;
192     int fdCnt = 0;
193     for (size_t i = 0; i < cachedEventsVec.size(); i++) {
194         auto eventInfo = cachedEventsVec[i];
195         int currFd = eventInfo.data.fd;
196         // check if seen
197         auto iter = seenFd.find(currFd);
198         if (iter == seenFd.end()) {
199             // if not seen, copy cached events and record idx
200             eventsVec[fdCnt].data.fd = currFd;
201             eventsVec[fdCnt].events = eventInfo.events;
202             seenFd[currFd] = fdCnt;
203             fdCnt++;
204         } else {
205             // if seen, update event to newest
206             eventsVec[iter->second].events |= eventInfo.events;
207             FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
208             continue;
209         }
210 
211         // Unmask to origin events
212         auto wakeDataIter = m_wakeDataMap.find(currFd);
213         if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
214             FFRT_LOGD("fd[%d] may be deleted", currFd);
215             continue;
216         }
217 
218         auto& wakeData = wakeDataIter->second.back();
219         epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
220         auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
221         if (fdDelCacheIter != m_delFdCacheMap.end()) {
222             ClearDelFdCache(currFd);
223             if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
224                 FFRT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
225                 continue;
226             }
227         } else {
228             if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
229                 FFRT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
230                 continue;
231             }
232         }
233     }
234     return fdCnt;
235 }
236 
FetchCachedEventAndDoUnmask(CPUEUTask * task,struct epoll_event * eventsVec)237 int Poller::FetchCachedEventAndDoUnmask(CPUEUTask* task, struct epoll_event* eventsVec) noexcept
238 {
239     // should used in lock
240     auto syncTaskIter = m_cachedTaskEvents.find(task);
241     if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
242         return 0;
243     }
244 
245     int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
246     m_cachedTaskEvents.erase(syncTaskIter);
247     ClearMaskWakeDataWithCbCache(task);
248     return nfds;
249 }
250 
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)251 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
252 {
253     FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
254 
255     auto task = ExecuteCtx::Cur()->task;
256     if (!task) {
257         FFRT_LOGE("nonworker shall not call this fun.");
258         return -1;
259     }
260 
261     FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
262 
263     int nfds = 0;
264     if (ThreadWaitMode(task)) {
265         std::unique_lock<std::mutex> lck(task->mutex_);
266         m_mapMutex.lock();
267         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
268         if (cachedNfds > 0) {
269             m_mapMutex.unlock();
270             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
271                 task->label.c_str(), task->gid, cachedNfds);
272             return cachedNfds;
273         }
274 
275         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
276             FFRT_LOGE("task has waited before");
277             m_mapMutex.unlock();
278             return 0;
279         }
280         if (FFRT_UNLIKELY(LegacyMode(task))) {
281             task->blockType = BlockType::BLOCK_THREAD;
282         }
283         auto currTime = std::chrono::steady_clock::now();
284         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
285         if (timeout > -1) {
286             FFRT_LOGD("poller meet timeout={%d}", timeout);
287             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
288         }
289         m_mapMutex.unlock();
290         reinterpret_cast<SCPUEUTask*>(task)->waitCond_.wait(lck);
291         FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
292         return nfds;
293     }
294 
295     CoWait([&](CPUEUTask *task)->bool {
296         m_mapMutex.lock();
297         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
298         if (cachedNfds > 0) {
299             m_mapMutex.unlock();
300             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
301                 task->label.c_str(), task->gid, cachedNfds);
302             nfds = cachedNfds;
303             return false;
304         }
305 
306         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
307             FFRT_LOGE("task has waited before");
308             m_mapMutex.unlock();
309             return false;
310         }
311         auto currTime = std::chrono::steady_clock::now();
312         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
313         if (timeout > -1) {
314             FFRT_LOGD("poller meet timeout={%d}", timeout);
315             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
316         }
317         m_mapMutex.unlock();
318         // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
319         return true;
320     });
321     FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
322     return nfds;
323 }
324 
WakeUp()325 void Poller::WakeUp() noexcept
326 {
327     uint64_t one = 1;
328     (void)::write(m_wakeData.fd, &one, sizeof one);
329 }
330 
ProcessWaitedFds(int nfds,std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)331 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents,
332                               std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
333 {
334     for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
335         struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
336         int currFd = data->fd;
337         if (currFd == m_wakeData.fd) {
338             uint64_t one = 1;
339             (void)::read(m_wakeData.fd, &one, sizeof one);
340             continue;
341         }
342 
343         if (data->cb != nullptr) {
344             data->cb(data->data, waitedEvents[i].events);
345             continue;
346         }
347 
348         if (data->task != nullptr) {
349             epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
350             syncTaskEvents[data->task].push_back(ev);
351             if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
352                 std::unique_lock lock(m_mapMutex);
353                 // 检查当前task是否做过掩码监听业务,如果是,取消fd的监听并且标记fd为FFRT主动取消监听
354                 CacheMaskFdAndEpollDel(currFd, data->task);
355             }
356         }
357     }
358 }
359 
360 namespace {
WakeTask(CPUEUTask * task)361 void WakeTask(CPUEUTask* task)
362 {
363     if (ThreadNotifyMode(task)) {
364         std::unique_lock<std::mutex> lck(task->mutex_);
365         if (BlockThread(task)) {
366             task->blockType = BlockType::BLOCK_COROUTINE;
367         }
368         reinterpret_cast<SCPUEUTask*>(task)->waitCond_.notify_one();
369     } else {
370         CoRoutineFactory::CoWakeFunc(task, false);
371     }
372 }
373 
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)374 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
375 {
376     int nfds = cachedEventsVec.size();
377     for (int i = 0; i < nfds; i++) {
378         eventsVec[i].events = cachedEventsVec[i].events;
379         eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
380     }
381     return nfds;
382 }
383 
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)384 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
385 {
386     epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
387     int* nfdsPtr = taskInfo.nfdsPtr;
388     if (eventsPtr == nullptr || nfdsPtr == nullptr) {
389         FFRT_LOGE("usr ptr is nullptr");
390         return;
391     }
392     *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
393 }
394 } // namespace
395 
CacheEventsAndDoMask(CPUEUTask * task,EventVec & eventVec)396 void Poller::CacheEventsAndDoMask(CPUEUTask* task, EventVec& eventVec) noexcept
397 {
398     auto& syncTaskEvents = m_cachedTaskEvents[task];
399     for (size_t i = 0; i < eventVec.size(); i++) {
400         int currFd = eventVec[i].data.fd;
401         auto wakeDataIter = m_wakeDataMap.find(currFd);
402         if (wakeDataIter == m_wakeDataMap.end() ||
403             wakeDataIter->second.size() == 0 ||
404             wakeDataIter->second.back()->task != task) {
405             FFRT_LOGD("fd[%d] may be deleted", currFd);
406             continue;
407         }
408 
409         auto delIter = m_delCntMap.find(currFd);
410         if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
411             FFRT_LOGD("fd[%d] may be deleted", currFd);
412             continue;
413         }
414 
415         struct epoll_event maskEv;
416         maskEv.events = 0;
417         auto& wakeData = wakeDataIter->second.back();
418         std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
419             wakeData->data, wakeData->cb, wakeData->task);
420         void* ptr = static_cast<void*>(maskWakeData.get());
421         if (ptr == nullptr || maskWakeData == nullptr) {
422             FFRT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
423             continue;
424         }
425         maskWakeData->monitorEvents = 0;
426         CacheMaskWakeData(task, maskWakeData);
427 
428         maskEv.data = {.ptr = ptr};
429         if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
430             // ENOENT indicate fd is not in epfd, may be deleted
431             FFRT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
432         }
433         FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
434         syncTaskEvents.push_back(eventVec[i]);
435     }
436 }
437 
WakeSyncTask(std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents)438 void Poller::WakeSyncTask(std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents) noexcept
439 {
440     if (syncTaskEvents.empty()) {
441         return;
442     }
443 
444     std::unordered_set<int> timerHandlesToRemove;
445     std::unordered_set<CPUEUTask*> tasksToWake;
446     m_mapMutex.lock();
447     for (auto& taskEventPair : syncTaskEvents) {
448         CPUEUTask* currTask = taskEventPair.first;
449         auto iter = m_waitTaskMap.find(currTask);
450         if (iter == m_waitTaskMap.end()) {
451             CacheEventsAndDoMask(currTask, taskEventPair.second);
452             continue;
453         }
454         CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
455         auto timerHandle = iter->second.timerHandle;
456         if (timerHandle > -1) {
457             timerHandlesToRemove.insert(timerHandle);
458         }
459         tasksToWake.insert(currTask);
460         m_waitTaskMap.erase(iter);
461     }
462     m_mapMutex.unlock();
463     if (timerHandlesToRemove.size() > 0) {
464         std::lock_guard lock(timerMutex_);
465         for (auto it = timerMap_.begin(); it != timerMap_.end();) {
466             if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
467                 it = timerMap_.erase(it);
468             } else {
469                 ++it;
470             }
471         }
472         timerEmpty_.store(timerMap_.empty());
473     }
474 
475     for (auto task : tasksToWake) {
476         WakeTask(task);
477     }
478 }
479 
GetTaskWaitTime(CPUEUTask * task)480 uint64_t Poller::GetTaskWaitTime(CPUEUTask* task) noexcept
481 {
482     std::unique_lock lock(m_mapMutex);
483     auto iter = m_waitTaskMap.find(task);
484     if (iter == m_waitTaskMap.end()) {
485         return 0;
486     }
487 
488     return std::chrono::duration_cast<std::chrono::seconds>(
489         iter->second.waitTP.time_since_epoch()).count();
490 }
491 
PollOnce(int timeout)492 PollerRet Poller::PollOnce(int timeout) noexcept
493 {
494     int realTimeout = timeout;
495     int timerHandle = -1;
496 
497     timerMutex_.lock();
498     if (!timerMap_.empty()) {
499         auto cur = timerMap_.begin();
500         timerHandle = cur->second.handle;
501         TimePoint now = std::chrono::steady_clock::now();
502         realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
503             cur->first - now).count();
504         if (realTimeout <= 0) {
505             ExecuteTimerCb(now);
506             return PollerRet::RET_TIMER;
507         }
508 
509         if (timeout != -1 && realTimeout > timeout) {
510             timerHandle = -1;
511             realTimeout = timeout;
512         }
513 
514         flag_ = EpollStatus::WAIT;
515     }
516     timerMutex_.unlock();
517 
518     pollerCount_++;
519     std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
520     int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
521     flag_ = EpollStatus::WAKE;
522     if (nfds < 0) {
523         if (errno != EINTR) {
524             FFRT_LOGE("epoll_wait error, errorno= %d.", errno);
525         }
526         return PollerRet::RET_NULL;
527     }
528     if (nfds == 0) {
529         if (timerHandle != -1) {
530             timerMutex_.lock();
531             for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
532                 if (it->second.handle == timerHandle) {
533                     ExecuteTimerCb(it->first);
534                     return PollerRet::RET_TIMER;
535                 }
536             }
537             timerMutex_.unlock();
538         }
539         return PollerRet::RET_NULL;
540     }
541 
542     std::unordered_map<CPUEUTask*, EventVec> syncTaskEvents;
543     ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
544     WakeSyncTask(syncTaskEvents);
545     ReleaseFdWakeData();
546     return PollerRet::RET_EPOLL;
547 }
548 
ReleaseFdWakeData()549 void Poller::ReleaseFdWakeData() noexcept
550 {
551     std::unique_lock lock(m_mapMutex);
552     for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
553         int delFd = delIter->first;
554         unsigned int delCnt = static_cast<unsigned int>(delIter->second);
555         auto& wakeDataList = m_wakeDataMap[delFd];
556         int diff = wakeDataList.size() - delCnt;
557         if (diff == 0) {
558             m_wakeDataMap.erase(delFd);
559             m_delCntMap.erase(delIter++);
560             continue;
561         } else if (diff == 1) {
562             for (unsigned int i = 0; i < delCnt - 1; i++) {
563                 wakeDataList.pop_front();
564             }
565             m_delCntMap[delFd] = 1;
566         } else {
567             FFRT_LOGD("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
568         }
569         delIter++;
570     }
571 
572     fdEmpty_.store(m_wakeDataMap.empty());
573 }
574 
ProcessTimerDataCb(CPUEUTask * task)575 void Poller::ProcessTimerDataCb(CPUEUTask* task) noexcept
576 {
577     m_mapMutex.lock();
578     auto iter = m_waitTaskMap.find(task);
579     if (iter != m_waitTaskMap.end()) {
580         WakeTask(task);
581         m_waitTaskMap.erase(iter);
582     }
583     m_mapMutex.unlock();
584 }
585 
ExecuteTimerCb(TimePoint timer)586 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
587 {
588     std::vector<TimerDataWithCb> timerData;
589     for (auto iter = timerMap_.begin(); iter != timerMap_.end();) {
590         if (iter->first <= timer) {
591             timerData.emplace_back(iter->second);
592             if (iter->second.cb != nullptr) {
593                 executedHandle_[iter->second.handle] = TimerStatus::EXECUTING;
594             }
595             iter = timerMap_.erase(iter);
596             continue;
597         }
598         break;
599     }
600     timerEmpty_.store(timerMap_.empty());
601     timerMutex_.unlock();
602     for (const auto& data : timerData) {
603         if (data.cb) {
604             data.cb(data.data);
605         } else if (data.task != nullptr) {
606             ProcessTimerDataCb(data.task);
607         }
608 
609         if (data.cb != nullptr) {
610             std::lock_guard lock(timerMutex_);
611             executedHandle_[data.handle] = TimerStatus::EXECUTED;
612         }
613         if (data.repeat) {
614             std::lock_guard lock(timerMutex_);
615             auto iter = executedHandle_.find(data.handle);
616             if (iter != executedHandle_.end()) {
617                 executedHandle_.erase(data.handle);
618                 RegisterTimerImpl(data);
619             }
620         }
621     }
622 }
623 
RegisterTimerImpl(const TimerDataWithCb & data)624 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
625 {
626     if (flag_ == EpollStatus::TEARDOWN) {
627         return;
628     }
629 
630     TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
631     bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
632 
633     timerMap_.emplace(absoluteTime, data);
634     timerEmpty_.store(false);
635 
636     if (wake) {
637         WakeUp();
638     }
639 }
640 
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)641 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
642 {
643     if (flag_ == EpollStatus::TEARDOWN) {
644         return -1;
645     }
646 
647     std::lock_guard lock(timerMutex_);
648     timerHandle_ += 1;
649 
650     TimerDataWithCb timerMapValue(data, cb, ExecuteCtx::Cur()->task, repeat, timeout);
651     timerMapValue.handle = timerHandle_;
652     RegisterTimerImpl(timerMapValue);
653 
654     return timerHandle_;
655 }
656 
UnregisterTimer(int handle)657 int Poller::UnregisterTimer(int handle) noexcept
658 {
659     if (flag_ == EpollStatus::TEARDOWN) {
660         return -1;
661     }
662 
663     std::lock_guard lock(timerMutex_);
664     auto it = executedHandle_.find(handle);
665     if (it != executedHandle_.end()) {
666         while (it->second == TimerStatus::EXECUTING) {
667             timerMutex_.unlock();
668             std::this_thread::yield();
669             timerMutex_.lock();
670             it = executedHandle_.find(handle);
671             if (it == executedHandle_.end()) {
672                 break;
673             }
674         }
675 
676         if (it != executedHandle_.end()) {
677             executedHandle_.erase(it);
678             return 0;
679         }
680     }
681 
682     bool wake = false;
683     int ret = -1;
684     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
685         if (cur->second.handle == handle) {
686             if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
687                 wake = true;
688             }
689             timerMap_.erase(cur);
690             ret = 0;
691             break;
692         }
693     }
694 
695     timerEmpty_.store(timerMap_.empty());
696 
697     if (wake) {
698         WakeUp();
699     }
700     return ret;
701 }
702 
DetermineEmptyMap()703 bool Poller::DetermineEmptyMap() noexcept
704 {
705     return fdEmpty_ && timerEmpty_;
706 }
707 
DeterminePollerReady()708 bool Poller::DeterminePollerReady() noexcept
709 {
710     return IsFdExist() || IsTimerReady();
711 }
712 
IsFdExist()713 bool Poller::IsFdExist() noexcept
714 {
715     return !fdEmpty_;
716 }
717 
IsTimerReady()718 bool Poller::IsTimerReady() noexcept
719 {
720     TimePoint now = std::chrono::steady_clock::now();
721     std::lock_guard lock(timerMutex_);
722     if (timerMap_.empty()) {
723         return false;
724     }
725 
726     if (now >= timerMap_.begin()->first) {
727         return true;
728     }
729     return false;
730 }
731 
GetTimerStatus(int handle)732 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
733 {
734     if (flag_ == EpollStatus::TEARDOWN) {
735         return ffrt_timer_notfound;
736     }
737 
738     std::lock_guard lock(timerMutex_);
739     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
740         if (cur->second.handle == handle) {
741             return ffrt_timer_not_executed;
742         }
743     }
744 
745     auto it = executedHandle_.find(handle);
746     if (it != executedHandle_.end()) {
747         while (it->second == TimerStatus::EXECUTING) {
748             timerMutex_.unlock();
749             std::this_thread::yield();
750             timerMutex_.lock();
751             it = executedHandle_.find(handle);
752             if (it == executedHandle_.end()) {
753                 break;
754             }
755         }
756         return ffrt_timer_executed;
757     }
758 
759     return ffrt_timer_notfound;
760 }
761 
GetPollCount()762 uint64_t Poller::GetPollCount() noexcept
763 {
764     return pollerCount_;
765 }
766 }