• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync/poller.h"
17 #include <securec.h>
18 #include "sched/execute_ctx.h"
19 #include "tm/scpu_task.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #ifdef FFRT_ENABLE_HITRACE_CHAIN
22 #include "dfx/trace/ffrt_trace_chain.h"
23 #endif
24 
25 constexpr uint64_t MAX_TIMER_MS_COUNT = 1000ULL * 100 * 60 * 60 * 24 * 365; // 100 year
26 
27 namespace ffrt {
Poller()28 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
29 {
30     if (m_epFd < 0) {
31         FFRT_LOGE("epoll_create1 failed: errno=%d", errno);
32     }
33 #ifdef OHOS_STANDARD_SYSTEM
34     fdsan_exchange_owner_tag(m_epFd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
35 #endif
36     m_wakeData.cb = nullptr;
37     m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
38     if (m_wakeData.fd < 0) {
39         FFRT_LOGE("eventfd failed: errno=%d", errno);
40     }
41 #ifdef OHOS_STANDARD_SYSTEM
42     fdsan_exchange_owner_tag(m_wakeData.fd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
43         static_cast<uint64_t>(m_wakeData.fd)));
44 #endif
45     epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
46     FFRT_COND_TERMINATE((epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0),
47         "epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
48 }
49 
~Poller()50 Poller::~Poller() noexcept
51 {
52 #ifdef OHOS_STANDARD_SYSTEM
53     fdsan_close_with_tag(m_wakeData.fd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
54         static_cast<uint64_t>(m_wakeData.fd)));
55     fdsan_close_with_tag(m_epFd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
56 #else
57     ::close(m_wakeData.fd);
58     ::close(m_epFd);
59 #endif
60     timerHandle_ = -1;
61     {
62         std::lock_guard lg(m_mapMutex);
63         m_wakeDataMap.clear();
64         m_delCntMap.clear();
65         m_waitTaskMap.clear();
66         m_cachedTaskEvents.clear();
67     }
68     {
69         std::lock_guard lg(timerMutex_);
70         timerMap_.clear();
71         executedHandle_.clear();
72     }
73     flag_ = EpollStatus::TEARDOWN;
74 }
75 
Instance()76 PollerProxy& PollerProxy::Instance()
77 {
78     static PollerProxy pollerInstance;
79     return pollerInstance;
80 }
81 
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)82 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
83 {
84     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
85     auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, task);
86     if (task) {
87         task->pollerEnable = true;
88     }
89     void* ptr = static_cast<void*>(wakeData.get());
90     if (ptr == nullptr || wakeData == nullptr) {
91         FFRT_SYSEVENT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
92         return -1;
93     }
94     wakeData->monitorEvents = events;
95 
96     epoll_event ev = { .events = events, .data = { .ptr = ptr } };
97     std::lock_guard lg(m_mapMutex);
98     if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
99         FFRT_SYSEVENT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
100         return -1;
101     }
102 
103     if (op == EPOLL_CTL_ADD) {
104         m_wakeDataMap[fd].emplace_back(std::move(wakeData));
105         fdEmpty_.store(false);
106     } else if (op == EPOLL_CTL_MOD) {
107         auto iter = m_wakeDataMap.find(fd);
108         FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
109         if (iter->second.size() != 1) {
110             FFRT_SYSEVENT_LOGE("epoll_ctl mod fd wakedata num invalid");
111             return -1;
112         }
113         iter->second.pop_back();
114         iter->second.emplace_back(std::move(wakeData));
115     }
116     return 0;
117 }
118 
CacheMaskFdAndEpollDel(int fd,CoTask * task)119 void Poller::CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept
120 {
121     auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
122     if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
123         if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
124             FFRT_SYSEVENT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
125         }
126         CacheDelFd(fd, task);
127     }
128 }
129 
ClearMaskWakeDataWithCbCache(CoTask * task)130 int Poller::ClearMaskWakeDataWithCbCache(CoTask *task) noexcept
131 {
132     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
133     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
134         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
135         for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
136             WakeDataWithCb* ptr = iter->get();
137             m_delFdCacheMap.erase(ptr->fd);
138         }
139         m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
140     }
141     return 0;
142 }
143 
ClearMaskWakeDataWithCbCacheWithFd(CoTask * task,int fd)144 int Poller::ClearMaskWakeDataWithCbCacheWithFd(CoTask *task, int fd) noexcept
145 {
146     auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
147     if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
148         WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
149         auto pred = [fd](auto& value) { return value->fd == fd; };
150         wakeDataList.remove_if(pred);
151         if (wakeDataList.size() == 0) {
152             m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
153         }
154     }
155     return 0;
156 }
157 
ClearDelFdCache(int fd)158 int Poller::ClearDelFdCache(int fd) noexcept
159 {
160     auto fdDelCacheIter = m_delFdCacheMap.find(fd);
161     if (fdDelCacheIter != m_delFdCacheMap.end()) {
162         CoTask *task = fdDelCacheIter->second;
163         ClearMaskWakeDataWithCbCacheWithFd(task, fd);
164         m_delFdCacheMap.erase(fdDelCacheIter);
165     }
166     return 0;
167 }
168 
DelFdEvent(int fd)169 int Poller::DelFdEvent(int fd) noexcept
170 {
171     std::lock_guard lg(m_mapMutex);
172     ClearDelFdCache(fd);
173     auto wakeDataIter = m_wakeDataMap.find(fd);
174     if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
175         FFRT_SYSEVENT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
176         return -1;
177     }
178     auto delCntIter = m_delCntMap.find(fd);
179     if (delCntIter != m_delCntMap.end()) {
180         int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
181         if (diff == 0) {
182             FFRT_SYSEVENT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
183                 wakeDataIter->second.size(), delCntIter->second);
184             return -1;
185         }
186     }
187 
188     if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
189         FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
190         return -1;
191     }
192 
193     for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
194         auto& events = it->second;
195         events.erase(std::remove_if(events.begin(), events.end(),
196             [fd](const epoll_event& event) {
197                 return event.data.fd == fd;
198             }), events.end());
199 
200         if (events.empty()) {
201             it = m_cachedTaskEvents.erase(it);
202         } else {
203             ++it;
204         }
205     }
206 
207     m_delCntMap[fd]++;
208     WakeUp();
209     return 0;
210 }
211 
ClearCachedEvents(CoTask * task)212 void Poller::ClearCachedEvents(CoTask* task) noexcept
213 {
214     std::lock_guard lg(m_mapMutex);
215     auto iter = m_cachedTaskEvents.find(task);
216     if (iter == m_cachedTaskEvents.end()) {
217         return;
218     }
219     m_cachedTaskEvents.erase(iter);
220     ClearMaskWakeDataWithCbCache(task);
221 }
222 
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)223 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
224 {
225     std::unordered_map<int, int> seenFd;
226     int fdCnt = 0;
227     for (size_t i = 0; i < cachedEventsVec.size(); i++) {
228         auto eventInfo = cachedEventsVec[i];
229         int currFd = eventInfo.data.fd;
230         // check if seen
231         auto iter = seenFd.find(currFd);
232         if (iter == seenFd.end()) {
233             // if not seen, copy cached events and record idx
234             eventsVec[fdCnt].data.fd = currFd;
235             eventsVec[fdCnt].events = eventInfo.events;
236             seenFd[currFd] = fdCnt;
237             fdCnt++;
238         } else {
239             // if seen, update event to newest
240             eventsVec[iter->second].events |= eventInfo.events;
241             FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
242             continue;
243         }
244 
245         // Unmask to origin events
246         auto wakeDataIter = m_wakeDataMap.find(currFd);
247         if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
248             FFRT_LOGD("fd[%d] may be deleted", currFd);
249             continue;
250         }
251 
252         auto& wakeData = wakeDataIter->second.back();
253         epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
254         auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
255         if (fdDelCacheIter != m_delFdCacheMap.end()) {
256             ClearDelFdCache(currFd);
257             if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
258                 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
259                 continue;
260             }
261         } else {
262             if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
263                 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
264                 continue;
265             }
266         }
267     }
268     return fdCnt;
269 }
270 
FetchCachedEventAndDoUnmask(CoTask * task,struct epoll_event * eventsVec)271 int Poller::FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept
272 {
273     // should used in lock
274     auto syncTaskIter = m_cachedTaskEvents.find(task);
275     if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
276         return 0;
277     }
278 
279     int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
280     m_cachedTaskEvents.erase(syncTaskIter);
281     ClearMaskWakeDataWithCbCache(task);
282     return nfds;
283 }
284 
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)285 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
286 {
287     FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
288 
289     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
290     if (!task) {
291         FFRT_SYSEVENT_LOGE("nonworker shall not call this fun.");
292         return -1;
293     }
294 
295     FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
296 
297     int nfds = 0;
298     if (task->Block() == BlockType::BLOCK_THREAD) {
299         std::unique_lock<std::mutex> lck(task->mutex_);
300         std::unique_lock mapLock(m_mapMutex);
301         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
302         if (cachedNfds > 0) {
303             mapLock.unlock();
304             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
305                 task->GetLabel().c_str(), task->gid, cachedNfds);
306             task->Wake();
307             return cachedNfds;
308         }
309 
310         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
311             FFRT_SYSEVENT_LOGE("task has waited before");
312             mapLock.unlock();
313             task->Wake();
314             return 0;
315         }
316         auto currTime = std::chrono::steady_clock::now();
317         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
318         if (timeout > -1) {
319             FFRT_LOGD("poller meet timeout={%d}", timeout);
320             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
321         }
322         mapLock.unlock();
323         task->waitCond_.wait(lck);
324         FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
325         task->Wake();
326         return nfds;
327     }
328 
329     CoWait([&](CoTask *task)->bool {
330         std::unique_lock mapLock(m_mapMutex);
331         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
332         if (cachedNfds > 0) {
333             mapLock.unlock();
334             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
335                 task->GetLabel().c_str(), task->gid, cachedNfds);
336             nfds = cachedNfds;
337             return false;
338         }
339 
340         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
341             FFRT_SYSEVENT_LOGE("task has waited before");
342             return false;
343         }
344         auto currTime = std::chrono::steady_clock::now();
345         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
346         if (timeout > -1) {
347             FFRT_LOGD("poller meet timeout={%d}", timeout);
348             m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
349         }
350         // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
351         return true;
352     });
353     FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
354     return nfds;
355 }
356 
WakeUp()357 void Poller::WakeUp() noexcept
358 {
359     uint64_t one = 1;
360     (void)::write(m_wakeData.fd, &one, sizeof one);
361 }
362 
ProcessWaitedFds(int nfds,std::unordered_map<CoTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)363 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CoTask*, EventVec>& syncTaskEvents,
364                               std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
365 {
366     for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
367         struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
368         int currFd = data->fd;
369         if (currFd == m_wakeData.fd) {
370             uint64_t one = 1;
371             (void)::read(m_wakeData.fd, &one, sizeof one);
372             continue;
373         }
374 
375         if (data->cb != nullptr) {
376 #ifdef FFRT_ENABLE_HITRACE_CHAIN
377             if (data->traceId.valid == HITRACE_ID_VALID) {
378                 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data->traceId);
379             }
380 #endif
381             data->cb(data->data, waitedEvents[i].events);
382 #ifdef FFRT_ENABLE_HITRACE_CHAIN
383             if (data->traceId.valid == HITRACE_ID_VALID) {
384                 TraceChainAdapter::Instance().HiTraceChainClearId();
385             }
386 #endif
387             continue;
388         }
389 
390         if (data->task != nullptr) {
391             epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
392             syncTaskEvents[data->task].push_back(ev);
393             if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
394                 std::lock_guard lg(m_mapMutex);
395                 CacheMaskFdAndEpollDel(currFd, data->task);
396             }
397         }
398     }
399 }
400 
401 namespace {
WakeTask(CoTask * task)402 void WakeTask(CoTask* task)
403 {
404     if (task->GetBlockType() == BlockType::BLOCK_THREAD) {
405         std::lock_guard<std::mutex> lg(task->mutex_);
406         task->waitCond_.notify_one();
407     } else {
408         CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
409     }
410 }
411 
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)412 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
413 {
414     int nfds = cachedEventsVec.size();
415     for (int i = 0; i < nfds; i++) {
416         eventsVec[i].events = cachedEventsVec[i].events;
417         eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
418     }
419     return nfds;
420 }
421 
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)422 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
423 {
424     epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
425     int* nfdsPtr = taskInfo.nfdsPtr;
426     if (eventsPtr == nullptr || nfdsPtr == nullptr) {
427         FFRT_LOGE("usr ptr is nullptr");
428         return;
429     }
430     *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
431 }
432 } // namespace
433 
CacheEventsAndDoMask(CoTask * task,EventVec & eventVec)434 void Poller::CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept
435 {
436     auto& syncTaskEvents = m_cachedTaskEvents[task];
437     for (size_t i = 0; i < eventVec.size(); i++) {
438         int currFd = eventVec[i].data.fd;
439         auto wakeDataIter = m_wakeDataMap.find(currFd);
440         if (wakeDataIter == m_wakeDataMap.end() ||
441             wakeDataIter->second.size() == 0 ||
442             wakeDataIter->second.back()->task != task) {
443             FFRT_LOGD("fd[%d] may be deleted", currFd);
444             continue;
445         }
446 
447         auto delIter = m_delCntMap.find(currFd);
448         if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
449             FFRT_LOGD("fd[%d] may be deleted", currFd);
450             continue;
451         }
452 
453         struct epoll_event maskEv;
454         maskEv.events = 0;
455         auto& wakeData = wakeDataIter->second.back();
456         std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
457             wakeData->data, wakeData->cb, wakeData->task);
458         void* ptr = static_cast<void*>(maskWakeData.get());
459         if (ptr == nullptr || maskWakeData == nullptr) {
460             FFRT_SYSEVENT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
461             continue;
462         }
463         maskWakeData->monitorEvents = 0;
464         CacheMaskWakeData(task, maskWakeData);
465         maskEv.data = {.ptr = ptr};
466         if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
467             // ENOENT indicate fd is not in epfd, may be deleted
468             FFRT_SYSEVENT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
469         }
470         FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
471         syncTaskEvents.push_back(eventVec[i]);
472     }
473 }
474 
WakeSyncTask(std::unordered_map<CoTask *,EventVec> & syncTaskEvents)475 void Poller::WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept
476 {
477     if (syncTaskEvents.empty()) {
478         return;
479     }
480 
481     std::unordered_set<int> timerHandlesToRemove;
482     std::unordered_set<CoTask*> tasksToWake;
483     {
484         std::lock_guard lg(m_mapMutex);
485         for (auto& taskEventPair : syncTaskEvents) {
486             CoTask* currTask = taskEventPair.first;
487             auto iter = m_waitTaskMap.find(currTask);
488             if (iter == m_waitTaskMap.end()) {
489                 CacheEventsAndDoMask(currTask, taskEventPair.second);
490                 continue;
491             }
492             CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
493             auto timerHandle = iter->second.timerHandle;
494             if (timerHandle > -1) {
495                 timerHandlesToRemove.insert(timerHandle);
496             }
497             tasksToWake.insert(currTask);
498             m_waitTaskMap.erase(iter);
499         }
500     }
501     if (timerHandlesToRemove.size() > 0) {
502         std::lock_guard lock(timerMutex_);
503         for (auto it = timerMap_.begin(); it != timerMap_.end();) {
504             if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
505                 it = timerMap_.erase(it);
506             } else {
507                 ++it;
508             }
509         }
510         timerEmpty_.store(timerMap_.empty());
511     }
512 
513     for (auto task : tasksToWake) {
514         WakeTask(task);
515     }
516 }
517 
GetTaskWaitTime(CoTask * task)518 uint64_t Poller::GetTaskWaitTime(CoTask* task) noexcept
519 {
520     std::lock_guard lg(m_mapMutex);
521     auto iter = m_waitTaskMap.find(task);
522     if (iter == m_waitTaskMap.end()) {
523         return 0;
524     }
525 
526     return std::chrono::duration_cast<std::chrono::seconds>(
527         iter->second.waitTP.time_since_epoch()).count();
528 }
529 
PollOnce(int timeout)530 PollerRet Poller::PollOnce(int timeout) noexcept
531 {
532     int realTimeout = timeout;
533     int timerHandle = -1;
534     {
535         std::lock_guard lg(timerMutex_);
536         if (!timerMap_.empty()) {
537             auto cur = timerMap_.begin();
538             timerHandle = cur->second.handle;
539             TimePoint now = std::chrono::steady_clock::now();
540             realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
541                 cur->first - now).count();
542             if (realTimeout <= 0) {
543                 ExecuteTimerCb(now);
544                 return PollerRet::RET_TIMER;
545             }
546 
547             if (timeout != -1 && realTimeout > timeout) {
548                 timerHandle = -1;
549                 realTimeout = timeout;
550             }
551 
552             flag_ = EpollStatus::WAIT;
553         }
554     }
555 
556     pollerCount_++;
557     std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
558     int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
559     flag_ = EpollStatus::WAKE;
560     if (nfds < 0) {
561         if (errno != EINTR) {
562             FFRT_SYSEVENT_LOGE("epoll_wait error, errorno= %d.", errno);
563         }
564         return PollerRet::RET_NULL;
565     }
566     if (nfds == 0) {
567         if (timerHandle != -1) {
568             std::lock_guard lg(timerMutex_);
569             for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
570                 if (it->second.handle == timerHandle) {
571                     ExecuteTimerCb(it->first);
572                     return PollerRet::RET_TIMER;
573                 }
574             }
575         }
576         return PollerRet::RET_NULL;
577     }
578 
579     std::unordered_map<CoTask*, EventVec> syncTaskEvents;
580     ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
581     WakeSyncTask(syncTaskEvents);
582     ReleaseFdWakeData();
583     return PollerRet::RET_EPOLL;
584 }
585 
ReleaseFdWakeData()586 void Poller::ReleaseFdWakeData() noexcept
587 {
588     std::lock_guard lg(m_mapMutex);
589     for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
590         int delFd = delIter->first;
591         unsigned int delCnt = static_cast<unsigned int>(delIter->second);
592         auto& wakeDataList = m_wakeDataMap[delFd];
593         int diff = wakeDataList.size() - delCnt;
594         if (diff == 0) {
595             m_wakeDataMap.erase(delFd);
596             m_delCntMap.erase(delIter++);
597             continue;
598         } else if (diff == 1) {
599             for (unsigned int i = 0; i < delCnt - 1; i++) {
600                 wakeDataList.pop_front();
601             }
602             m_delCntMap[delFd] = 1;
603         } else {
604             FFRT_SYSEVENT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
605         }
606         delIter++;
607     }
608 
609     fdEmpty_.store(m_wakeDataMap.empty());
610 }
611 
ProcessTimerDataCb(CoTask * task)612 void Poller::ProcessTimerDataCb(CoTask* task) noexcept
613 {
614     std::lock_guard lg(m_mapMutex);
615     auto iter = m_waitTaskMap.find(task);
616     if (iter != m_waitTaskMap.end()) {
617         WakeTask(task);
618         m_waitTaskMap.erase(iter);
619     }
620 }
621 
ExecuteTimerCb(TimePoint timer)622 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
623 {
624     while (!timerMap_.empty()) {
625         auto iter = timerMap_.begin();
626         if (iter->first > timer) {
627             break;
628         }
629 
630         TimerDataWithCb data = iter->second;
631         if (data.cb != nullptr) {
632             executedHandle_[data.handle] = TimerStatus::EXECUTING;
633         }
634 
635         timerMap_.erase(iter);
636         timerEmpty_.store(timerMap_.empty());
637 
638         if (data.cb != nullptr) {
639             timerMutex_.unlock();
640 #ifdef FFRT_ENABLE_HITRACE_CHAIN
641             if (data.traceId.valid == HITRACE_ID_VALID) {
642                 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data.traceId);
643             }
644 #endif
645             data.cb(data.data);
646 #ifdef FFRT_ENABLE_HITRACE_CHAIN
647             if (data.traceId.valid == HITRACE_ID_VALID) {
648                 TraceChainAdapter::Instance().HiTraceChainClearId();
649             }
650 #endif
651             timerMutex_.lock();
652             executedHandle_[data.handle] = TimerStatus::EXECUTED;
653         } else if (data.task != nullptr) {
654             timerMutex_.unlock();
655             ProcessTimerDataCb(data.task);
656             timerMutex_.lock();
657         }
658 
659         if (data.repeat && (executedHandle_.find(data.handle) != executedHandle_.end())) {
660             executedHandle_.erase(data.handle);
661             RegisterTimerImpl(data);
662         }
663     }
664 }
665 
RegisterTimerImpl(const TimerDataWithCb & data)666 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
667 {
668     if (flag_ == EpollStatus::TEARDOWN) {
669         return;
670     }
671 
672     TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
673     bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
674 
675     timerMap_.emplace(absoluteTime, data);
676     timerEmpty_.store(false);
677 
678     if (wake) {
679         WakeUp();
680     }
681 }
682 
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)683 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
684 {
685     if (flag_ == EpollStatus::TEARDOWN) {
686         return -1;
687     }
688 
689     std::lock_guard lock(timerMutex_);
690     timerHandle_ += 1;
691 
692     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
693     if (timeout > MAX_TIMER_MS_COUNT) {
694         FFRT_LOGW("timeout exceeds maximum allowed value %llu ms. Clamping to %llu ms.", timeout, MAX_TIMER_MS_COUNT);
695         timeout = MAX_TIMER_MS_COUNT;
696     }
697     TimerDataWithCb timerMapValue(data, cb, task, repeat, timeout);
698     timerMapValue.handle = timerHandle_;
699     RegisterTimerImpl(timerMapValue);
700 
701     return timerHandle_;
702 }
703 
UnregisterTimer(int handle)704 int Poller::UnregisterTimer(int handle) noexcept
705 {
706     if (flag_ == EpollStatus::TEARDOWN) {
707         return -1;
708     }
709 
710     std::lock_guard lock(timerMutex_);
711     auto it = executedHandle_.find(handle);
712     if (it != executedHandle_.end()) {
713         while (it->second == TimerStatus::EXECUTING) {
714             timerMutex_.unlock();
715             std::this_thread::yield();
716             timerMutex_.lock();
717             it = executedHandle_.find(handle);
718             if (it == executedHandle_.end()) {
719                 break;
720             }
721         }
722 
723         if (it != executedHandle_.end()) {
724             executedHandle_.erase(it);
725             return 0;
726         }
727     }
728 
729     bool wake = false;
730     int ret = -1;
731     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
732         if (cur->second.handle == handle) {
733             if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
734                 wake = true;
735             }
736             timerMap_.erase(cur);
737             ret = 0;
738             break;
739         }
740     }
741 
742     timerEmpty_.store(timerMap_.empty());
743 
744     if (wake) {
745         WakeUp();
746     }
747     return ret;
748 }
749 
DetermineEmptyMap()750 bool Poller::DetermineEmptyMap() noexcept
751 {
752     return fdEmpty_ && timerEmpty_;
753 }
754 
DeterminePollerReady()755 bool Poller::DeterminePollerReady() noexcept
756 {
757     return IsFdExist() || IsTimerReady();
758 }
759 
IsFdExist()760 bool Poller::IsFdExist() noexcept
761 {
762     return !fdEmpty_;
763 }
764 
IsTimerReady()765 bool Poller::IsTimerReady() noexcept
766 {
767     TimePoint now = std::chrono::steady_clock::now();
768     std::lock_guard lock(timerMutex_);
769     if (timerMap_.empty()) {
770         return false;
771     }
772 
773     if (now >= timerMap_.begin()->first) {
774         return true;
775     }
776     return false;
777 }
778 
GetTimerStatus(int handle)779 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
780 {
781     if (flag_ == EpollStatus::TEARDOWN) {
782         return ffrt_timer_notfound;
783     }
784 
785     std::lock_guard lock(timerMutex_);
786     for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
787         if (cur->second.handle == handle) {
788             return ffrt_timer_not_executed;
789         }
790     }
791 
792     auto it = executedHandle_.find(handle);
793     if (it != executedHandle_.end()) {
794         while (it->second == TimerStatus::EXECUTING) {
795             timerMutex_.unlock();
796             std::this_thread::yield();
797             timerMutex_.lock();
798             it = executedHandle_.find(handle);
799             if (it == executedHandle_.end()) {
800                 break;
801             }
802         }
803         return ffrt_timer_executed;
804     }
805 
806     return ffrt_timer_notfound;
807 }
808 
GetPollCount()809 uint64_t Poller::GetPollCount() noexcept
810 {
811     return pollerCount_;
812 }
813 }
814