• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/io_poller.h"
17 #include <securec.h>
18 #include <sys/prctl.h>
19 #include "eu/blockaware.h"
20 #include "eu/execute_unit.h"
21 #include "sched/execute_ctx.h"
22 #include "tm/scpu_task.h"
23 #include "dfx/log/ffrt_log_api.h"
24 #include "util/ffrt_facade.h"
25 #include "util/time_format.h"
26 #include "util/name_manager.h"
27 #include "sync/timer_manager.h"
28 #ifdef FFRT_OH_TRACE_ENABLE
29 #include "backtrace_local.h"
30 #endif
31 
32 namespace {
33 const std::vector<uint64_t> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
34 }
35 namespace ffrt {
36 namespace {
TimeoutProc(void * task)37 static void TimeoutProc(void* task)
38 {
39     IOPoller& ins = IOPoller::Instance();
40     ins.WakeTimeoutTask(reinterpret_cast<CoTask*>(task));
41 }
42 
WakeTask(CoTask * task)43 void WakeTask(CoTask* task)
44 {
45     std::unique_lock<std::mutex> lck(task->mutex_);
46     if (task->GetBlockType() == BlockType::BLOCK_THREAD) {
47         task->waitCond_.notify_one();
48     } else {
49         lck.unlock();
50         CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
51     }
52 }
53 
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)54 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
55 {
56     int nfds = static_cast<int>(cachedEventsVec.size());
57     for (int i = 0; i < nfds; i++) {
58         eventsVec[i].events = cachedEventsVec[i].events;
59         eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
60     }
61     return nfds;
62 }
63 
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)64 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
65 {
66     epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
67     int* nfdsPtr = taskInfo.nfdsPtr;
68     if (eventsPtr == nullptr || nfdsPtr == nullptr) {
69         FFRT_LOGE("usr ptr is nullptr");
70         return;
71     }
72     *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
73 }
74 } // namespace
75 
Instance()76 IOPoller& IOPoller::Instance()
77 {
78     static IOPoller ins;
79     return ins;
80 }
81 
IOPoller()82 IOPoller::IOPoller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
83 {
84 #ifdef OHOS_STANDARD_SYSTEM
85     fdsan_exchange_owner_tag(m_epFd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
86 #endif
87     m_wakeData.mode = PollerType::WAKEUP;
88     m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
89 #ifdef OHOS_STANDARD_SYSTEM
90     fdsan_exchange_owner_tag(m_wakeData.fd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
91         static_cast<uint64_t>(m_wakeData.fd)));
92 #endif
93     epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
94     FFRT_COND_TERMINATE((epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0),
95         "epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
96 }
97 
~IOPoller()98 IOPoller::~IOPoller() noexcept
99 {
100     {
101         std::lock_guard lock(m_mapMutex);
102         m_teardown = true;
103         WakeUp();
104     }
105     if (m_runner != nullptr && m_runner->joinable()) {
106         m_runner->join();
107     }
108 #ifdef OHOS_STANDARD_SYSTEM
109     fdsan_close_with_tag(m_wakeData.fd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
110         static_cast<uint64_t>(m_wakeData.fd)));
111     fdsan_close_with_tag(m_epFd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
112 #else
113     ::close(m_wakeData.fd);
114     ::close(m_epFd);
115 #endif
116 }
117 
ThreadInit()118 void IOPoller::ThreadInit()
119 {
120     if (m_runner != nullptr && m_runner->joinable()) {
121         m_runner->join();
122     }
123     m_runner = std::make_unique<std::thread>([this] { Run(); });
124 }
125 
Run()126 void IOPoller::Run()
127 {
128     struct sched_param param;
129     param.sched_priority = 1;
130     int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
131     if (ret != 0) {
132         FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
133     }
134     prctl(PR_SET_NAME, IO_POLLER_NAME);
135     while (1) {
136         ret = PollOnce(30000);
137         std::lock_guard lock(m_mapMutex);
138         if (m_teardown) {
139             // teardown
140             m_exitFlag = true;
141             return;
142         }
143         if (ret == 0 && m_wakeDataMap.empty() && m_syncFdCnt.load() == 0) {
144             // timeout 30s and no fd added
145             m_exitFlag = true;
146             return;
147         }
148     }
149 }
150 
WakeUp()151 void IOPoller::WakeUp() noexcept
152 {
153     uint64_t one = 1;
154     (void)::write(m_wakeData.fd, &one, sizeof one);
155 }
156 
PollOnce(int timeout)157 int IOPoller::PollOnce(int timeout) noexcept
158 {
159     pollerCount_++;
160     std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
161     int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), timeout);
162     if (nfds < 0) {
163         if (errno != EINTR) {
164             FFRT_SYSEVENT_LOGE("epoll_wait error, errorno= %d.", errno);
165         }
166         return -1;
167     }
168     if (nfds == 0) {
169         return 0;
170     }
171 
172     std::unordered_map<CoTask*, EventVec> syncTaskEvents;
173     for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
174         struct WakeData *data = reinterpret_cast<struct WakeData *>(waitedEvents[i].data.ptr);
175         if (data->mode == PollerType::WAKEUP) {
176             // self wakeup
177             uint64_t one = 1;
178             (void)::read(m_wakeData.fd, &one, sizeof one);
179             continue;
180         }
181 
182         if (data->mode == PollerType::SYNC_IO) {
183             // sync io wait fd, del fd when waked up
184             if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, data->fd, nullptr) != 0) {
185                 FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: fd=%d, errorno=%d", data->fd, errno);
186                 continue;
187             }
188             m_syncFdCnt--;
189             WakeTask(data->task);
190             continue;
191         }
192 
193         if (data->mode == PollerType::ASYNC_CB) {
194             // async io callback
195             timeOutReport.cbStartTime.store(TimeStamp(), std::memory_order_relaxed);
196             timeOutReport.reportCount = 0;
197 #ifdef FFRT_ENABLE_HITRACE_CHAIN
198             if (data->traceId.valid == HITRACE_ID_VALID) {
199                 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data->traceId);
200             }
201 #endif
202             data->cb(data->data, waitedEvents[i].events);
203             timeOutReport.cbStartTime.store(0, std::memory_order_relaxed);
204 #ifdef FFRT_ENABLE_HITRACE_CHAIN
205             if (data->traceId.valid == HITRACE_ID_VALID) {
206                 TraceChainAdapter::Instance().HiTraceChainClearId();
207             }
208 #endif
209             timeOutReport.cbStartTime = 0;
210             continue;
211         }
212 
213         if (data->mode == PollerType::ASYNC_IO) {
214             // async io task wait fd
215             epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = data->fd} };
216             syncTaskEvents[data->task].push_back(ev);
217             if ((waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) != 0) {
218                 std::lock_guard lock(m_mapMutex);
219                 CacheMaskFdAndEpollDel(data->fd, data->task);
220             }
221         }
222     }
223 
224     WakeSyncTask(syncTaskEvents);
225     ReleaseFdWakeData();
226     return 1;
227 }
228 
229 // mode ASYNC_CB/ASYNC_IO
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)230 int IOPoller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
231 {
232     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
233     auto wakeData = std::make_unique<WakeData>(fd, data, cb, task);
234     if (task) {
235         task->pollerEnable = true;
236     }
237     void* ptr = static_cast<void*>(wakeData.get());
238     if (ptr == nullptr || wakeData == nullptr) {
239         FFRT_SYSEVENT_LOGE("Construct WakeData instance failed! or wakeData is nullptr");
240         return -1;
241     }
242     wakeData->monitorEvents = events;
243 
244     epoll_event ev = { .events = events, .data = { .ptr = ptr } };
245     std::lock_guard lock(m_mapMutex);
246     if (m_teardown) {
247         return -1;
248     }
249 
250     if (m_exitFlag) {
251         ThreadInit();
252         m_exitFlag = false;
253     }
254 
255     if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
256         FFRT_SYSEVENT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
257         return -1;
258     }
259 
260     if (op == EPOLL_CTL_ADD) {
261         m_wakeDataMap[fd].emplace_back(std::move(wakeData));
262     } else if (op == EPOLL_CTL_MOD) {
263         auto iter = m_wakeDataMap.find(fd);
264         FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
265         if (iter->second.size() != 1) {
266             FFRT_SYSEVENT_LOGE("epoll_ctl mod fd wakedata num invalid");
267             return -1;
268         }
269         iter->second.pop_back();
270         iter->second.emplace_back(std::move(wakeData));
271     }
272     return 0;
273 }
274 
DelFdEvent(int fd)275 int IOPoller::DelFdEvent(int fd) noexcept
276 {
277     std::lock_guard lock(m_mapMutex);
278     ClearDelFdCache(fd);
279     auto wakeDataIter = m_wakeDataMap.find(fd);
280     if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
281         FFRT_SYSEVENT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
282         return -1;
283     }
284     auto delCntIter = m_delCntMap.find(fd);
285     if (delCntIter != m_delCntMap.end()) {
286         int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
287         if (diff == 0) {
288             FFRT_SYSEVENT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
289                 wakeDataIter->second.size(), delCntIter->second);
290             return -1;
291         }
292     }
293 
294     if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
295         FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
296         return -1;
297     }
298 
299     for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
300         auto& events = it->second;
301         events.erase(std::remove_if(events.begin(), events.end(),
302             [fd](const epoll_event& event) {
303                 return event.data.fd == fd;
304             }), events.end());
305 
306         if (events.empty()) {
307             it = m_cachedTaskEvents.erase(it);
308         } else {
309             ++it;
310         }
311     }
312 
313     m_delCntMap[fd]++;
314     WakeUp();
315     return 0;
316 }
317 
318 // mode ASYNC_IO
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)319 int IOPoller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
320 {
321     FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
322 
323     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
324     if (!task) {
325         FFRT_SYSEVENT_LOGE("nonworker shall not call this fun.");
326         return -1;
327     }
328 
329     FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
330 
331     int nfds = 0;
332     std::unique_lock<std::mutex> lck(task->mutex_);
333     if (task->Block() == BlockType::BLOCK_THREAD) {
334         std::unique_lock mapLock(m_mapMutex);
335         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
336         if (cachedNfds > 0) {
337             mapLock.unlock();
338             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
339                 task->GetLabel().c_str(), task->gid, cachedNfds);
340             task->Wake();
341             return cachedNfds;
342         }
343 
344         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
345             FFRT_SYSEVENT_LOGE("task has waited before");
346             mapLock.unlock();
347             task->Wake();
348             return 0;
349         }
350         auto currTime = std::chrono::steady_clock::now();
351         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
352         if (timeout > -1) {
353             FFRT_LOGD("poller meet timeout={%d}", timeout);
354             m_waitTaskMap[task].timerHandle = FFRTFacade::GetTMInstance().RegisterTimer(task->qos_, timeout,
355                 reinterpret_cast<void*>(task), TimeoutProc);
356         }
357         mapLock.unlock();
358         task->waitCond_.wait(lck);
359         FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
360         task->Wake();
361         return nfds;
362     }
363     lck.unlock();
364 
365     CoWait([&](CoTask *task)->bool {
366         std::unique_lock mapLock(m_mapMutex);
367         int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
368         if (cachedNfds > 0) {
369             mapLock.unlock();
370             FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
371                 task->GetLabel().c_str(), task->gid, cachedNfds);
372             nfds = cachedNfds;
373             return false;
374         }
375 
376         if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
377             FFRT_SYSEVENT_LOGE("task has waited before");
378             return false;
379         }
380         auto currTime = std::chrono::steady_clock::now();
381         m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
382         if (timeout > -1) {
383             FFRT_LOGD("poller meet timeout={%d}", timeout);
384             m_waitTaskMap[task].timerHandle = FFRTFacade::GetTMInstance().RegisterTimer(task->qos_, timeout,
385                 reinterpret_cast<void*>(task), TimeoutProc);
386         }
387         // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
388         return true;
389     });
390     FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
391     return nfds;
392 }
393 
WakeTimeoutTask(CoTask * task)394 void IOPoller::WakeTimeoutTask(CoTask* task) noexcept
395 {
396     std::unique_lock mapLock(m_mapMutex);
397     auto iter = m_waitTaskMap.find(task);
398     if (iter != m_waitTaskMap.end()) {
399         // wake task, erase from wait map
400         m_waitTaskMap.erase(iter);
401         mapLock.unlock();
402         WakeTask(task);
403     }
404 }
405 
WakeSyncTask(std::unordered_map<CoTask *,EventVec> & syncTaskEvents)406 void IOPoller::WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept
407 {
408     if (syncTaskEvents.empty()) {
409         return;
410     }
411 
412     std::unordered_set<int> timerHandlesToRemove;
413     std::unordered_set<CoTask*> tasksToWake;
414     {
415         std::lock_guard lg(m_mapMutex);
416         for (auto& taskEventPair : syncTaskEvents) {
417             CoTask* currTask = taskEventPair.first;
418             auto iter = m_waitTaskMap.find(currTask);
419             if (iter == m_waitTaskMap.end()) { // task not in wait map
420                 CacheEventsAndDoMask(currTask, taskEventPair.second);
421                 continue;
422             }
423             CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
424             // remove timer, wake task, erase from wait map
425             auto timerHandle = iter->second.timerHandle;
426             if (timerHandle > -1) {
427                 timerHandlesToRemove.insert(timerHandle);
428             }
429             tasksToWake.insert(currTask);
430             m_waitTaskMap.erase(iter);
431         }
432     }
433     for (auto timerHandle : timerHandlesToRemove) {
434         FFRTFacade::GetTMInstance().UnregisterTimer(timerHandle);
435     }
436     for (auto task : tasksToWake) {
437         WakeTask(task);
438     }
439 }
440 
441 // mode SYNC_IO
WaitFdEvent(int fd)442 void IOPoller::WaitFdEvent(int fd) noexcept
443 {
444     CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
445     if (!task) {
446         FFRT_LOGI("nonworker shall not call this fun.");
447         return;
448     }
449 
450     struct WakeData data(fd, task);
451     epoll_event ev = { .events = EPOLLIN, .data = {.ptr = static_cast<void*>(&data)} };
452     {
453         std::lock_guard lock(m_mapMutex);
454         if (m_teardown) {
455             return;
456         }
457 
458         if (m_exitFlag) {
459             ThreadInit();
460             m_exitFlag = false;
461         }
462 
463         m_syncFdCnt++;
464     }
465 
466     FFRT_BLOCK_TRACER(task->gid, fd);
467     if (task->Block() == BlockType::BLOCK_THREAD) {
468         std::unique_lock<std::mutex> lck(task->mutex_);
469         if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
470             task->waitCond_.wait(lck);
471         }
472         task->Wake();
473         m_syncFdCnt--;
474         return;
475     }
476 
477     CoWait([&](CoTask *task)->bool {
478         (void)task;
479         if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
480             return true;
481         }
482         // The ownership of the task belongs to epoll, and the task cannot be accessed any more.
483         FFRT_LOGI("epoll_ctl add err:efd:=%d, fd=%d errorno = %d", m_epFd, fd, errno);
484         m_syncFdCnt--;
485         return false;
486     });
487 }
488 
ReleaseFdWakeData()489 void IOPoller::ReleaseFdWakeData() noexcept
490 {
491     std::lock_guard lock(m_mapMutex);
492     for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
493         int delFd = delIter->first;
494         unsigned int delCnt = static_cast<unsigned int>(delIter->second);
495         auto& wakeDataList = m_wakeDataMap[delFd];
496         unsigned int diff = wakeDataList.size() - delCnt;
497         if (diff == 0) {
498             m_wakeDataMap.erase(delFd);
499             m_delCntMap.erase(delIter++);
500             continue;
501         } else if (diff == 1) {
502             for (unsigned int i = 0; i < delCnt - 1; i++) {
503                 wakeDataList.pop_front();
504             }
505             m_delCntMap[delFd] = 1;
506         } else {
507             FFRT_SYSEVENT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
508         }
509         delIter++;
510     }
511 }
512 
CacheMaskFdAndEpollDel(int fd,CoTask * task)513 void IOPoller::CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept
514 {
515     auto maskWakeData = m_maskWakeDataMap.find(task);
516     if (maskWakeData != m_maskWakeDataMap.end()) {
517         if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
518             FFRT_SYSEVENT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
519         }
520         m_delFdCacheMap.emplace(fd, task);
521     }
522 }
523 
ClearMaskWakeDataCache(CoTask * task)524 int IOPoller::ClearMaskWakeDataCache(CoTask *task) noexcept
525 {
526     auto maskWakeDataIter = m_maskWakeDataMap.find(task);
527     if (maskWakeDataIter != m_maskWakeDataMap.end()) {
528         WakeDataList& wakeDataList = maskWakeDataIter->second;
529         for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
530             WakeData* ptr = iter->get();
531             m_delFdCacheMap.erase(ptr->fd);
532         }
533         m_maskWakeDataMap.erase(maskWakeDataIter);
534     }
535     return 0;
536 }
537 
ClearMaskWakeDataCacheWithFd(CoTask * task,int fd)538 int IOPoller::ClearMaskWakeDataCacheWithFd(CoTask *task, int fd) noexcept
539 {
540     auto maskWakeDataIter = m_maskWakeDataMap.find(task);
541     if (maskWakeDataIter != m_maskWakeDataMap.end()) {
542         WakeDataList& wakeDataList = maskWakeDataIter->second;
543         auto pred = [fd](auto& value) { return value->fd == fd; };
544         wakeDataList.remove_if(pred);
545         if (wakeDataList.size() == 0) {
546             m_maskWakeDataMap.erase(maskWakeDataIter);
547         }
548     }
549     return 0;
550 }
551 
ClearDelFdCache(int fd)552 int IOPoller::ClearDelFdCache(int fd) noexcept
553 {
554     auto fdDelCacheIter = m_delFdCacheMap.find(fd);
555     if (fdDelCacheIter != m_delFdCacheMap.end()) {
556         CoTask *task = fdDelCacheIter->second;
557         ClearMaskWakeDataCacheWithFd(task, fd);
558         m_delFdCacheMap.erase(fdDelCacheIter);
559     }
560     return 0;
561 }
562 
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)563 int IOPoller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
564 {
565     std::unordered_map<int, int> seenFd;
566     int fdCnt = 0;
567     for (size_t i = 0; i < cachedEventsVec.size(); i++) {
568         auto eventInfo = cachedEventsVec[i];
569         int currFd = eventInfo.data.fd;
570         // check if seen
571         auto iter = seenFd.find(currFd);
572         if (iter == seenFd.end()) {
573             // if not seen, copy cached events and record idx
574             eventsVec[fdCnt].data.fd = currFd;
575             eventsVec[fdCnt].events = eventInfo.events;
576             seenFd[currFd] = fdCnt;
577             fdCnt++;
578         } else {
579             // if seen, update event to newest
580             eventsVec[iter->second].events |= eventInfo.events;
581             FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
582             continue;
583         }
584 
585         // Unmask to origin events
586         auto wakeDataIter = m_wakeDataMap.find(currFd);
587         if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
588             FFRT_LOGD("fd[%d] may be deleted", currFd);
589             continue;
590         }
591 
592         auto& wakeData = wakeDataIter->second.back();
593         epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
594         auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
595         if (fdDelCacheIter != m_delFdCacheMap.end()) {
596             ClearDelFdCache(currFd);
597             if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
598                 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
599                 continue;
600             }
601         } else {
602             if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
603                 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
604                 continue;
605             }
606         }
607     }
608     return fdCnt;
609 }
610 
FetchCachedEventAndDoUnmask(CoTask * task,struct epoll_event * eventsVec)611 int IOPoller::FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept
612 {
613     // should used in lock
614     auto syncTaskIter = m_cachedTaskEvents.find(task);
615     if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
616         return 0;
617     }
618 
619     int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
620     m_cachedTaskEvents.erase(syncTaskIter);
621     ClearMaskWakeDataCache(task);
622     return nfds;
623 }
624 
CacheEventsAndDoMask(CoTask * task,EventVec & eventVec)625 void IOPoller::CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept
626 {
627     auto& syncTaskEvents = m_cachedTaskEvents[task];
628     for (size_t i = 0; i < eventVec.size(); i++) {
629         int currFd = eventVec[i].data.fd;
630 
631         auto wakeDataIter = m_wakeDataMap.find(currFd);
632         if (wakeDataIter == m_wakeDataMap.end() ||
633             wakeDataIter->second.size() == 0 ||
634             wakeDataIter->second.back()->task != task) {
635             FFRT_LOGD("fd[%d] may be deleted", currFd);
636             continue;
637         }
638 
639         auto delIter = m_delCntMap.find(currFd);
640         if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
641             FFRT_LOGD("fd[%d] may be deleted", currFd);
642             continue;
643         }
644 
645         struct epoll_event maskEv;
646         maskEv.events = 0;
647         auto& wakeData = wakeDataIter->second.back();
648         std::unique_ptr<struct WakeData> maskWakeData = std::make_unique<WakeData>(currFd,
649             wakeData->data, wakeData->cb, wakeData->task);
650         void* ptr = static_cast<void*>(maskWakeData.get());
651         if (ptr == nullptr || maskWakeData == nullptr) {
652             FFRT_SYSEVENT_LOGE("CacheEventsAndDoMask Construct WakeData instance failed! or wakeData is nullptr");
653             continue;
654         }
655         maskWakeData->monitorEvents = 0;
656         m_maskWakeDataMap[task].emplace_back(std::move(maskWakeData));
657 
658         maskEv.data = {.ptr = ptr};
659         if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
660             // ENOENT indicate fd is not in epfd, may be deleted
661             FFRT_SYSEVENT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
662         }
663         FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
664         syncTaskEvents.push_back(eventVec[i]);
665     }
666 }
667 
MonitTimeOut()668 void IOPoller::MonitTimeOut()
669 {
670     if (m_teardown) {
671         return;
672     }
673 
674     if (timeOutReport.cbStartTime == 0) {
675         return;
676     }
677     uint64_t now = TimeStamp();
678     static const uint64_t freq = [] {
679         uint64_t f = Arm64CntFrq();
680         return (f == 1) ? 1000000 : f;
681     } ();
682     uint64_t diff = (now - timeOutReport.cbStartTime) / freq;
683     if (timeOutReport.reportCount < TIMEOUT_RECORD_CYCLE_LIST.size() &&
684         diff >= TIMEOUT_RECORD_CYCLE_LIST[timeOutReport.reportCount]) {
685 #ifdef FFRT_OH_TRACE_ENABLE
686         std::string dumpInfo;
687         static pid_t pid = syscall(SYS_gettid);
688         if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, pid, 0, false)) {
689             FFRT_LOGW("IO_Poller Backtrace Info:\n%s", dumpInfo.c_str());
690         }
691 #endif
692         timeOutReport.reportCount++;
693     }
694 }
695 }
696