1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/io_poller.h"
17 #include <securec.h>
18 #include <sys/prctl.h>
19 #include "eu/blockaware.h"
20 #include "eu/execute_unit.h"
21 #include "sched/execute_ctx.h"
22 #include "tm/scpu_task.h"
23 #include "dfx/log/ffrt_log_api.h"
24 #include "util/ffrt_facade.h"
25 #include "util/time_format.h"
26 #include "util/name_manager.h"
27 #include "sync/timer_manager.h"
28 #ifdef FFRT_OH_TRACE_ENABLE
29 #include "backtrace_local.h"
30 #endif
31
32 namespace {
33 const std::vector<uint64_t> TIMEOUT_RECORD_CYCLE_LIST = { 1, 3, 5, 10, 30, 60, 10 * 60, 30 * 60 };
34 }
35 namespace ffrt {
36 namespace {
TimeoutProc(void * task)37 static void TimeoutProc(void* task)
38 {
39 IOPoller& ins = IOPoller::Instance();
40 ins.WakeTimeoutTask(reinterpret_cast<CoTask*>(task));
41 }
42
WakeTask(CoTask * task)43 void WakeTask(CoTask* task)
44 {
45 std::unique_lock<std::mutex> lck(task->mutex_);
46 if (task->GetBlockType() == BlockType::BLOCK_THREAD) {
47 task->waitCond_.notify_one();
48 } else {
49 lck.unlock();
50 CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
51 }
52 }
53
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)54 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
55 {
56 int nfds = static_cast<int>(cachedEventsVec.size());
57 for (int i = 0; i < nfds; i++) {
58 eventsVec[i].events = cachedEventsVec[i].events;
59 eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
60 }
61 return nfds;
62 }
63
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)64 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
65 {
66 epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
67 int* nfdsPtr = taskInfo.nfdsPtr;
68 if (eventsPtr == nullptr || nfdsPtr == nullptr) {
69 FFRT_LOGE("usr ptr is nullptr");
70 return;
71 }
72 *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
73 }
74 } // namespace
75
Instance()76 IOPoller& IOPoller::Instance()
77 {
78 static IOPoller ins;
79 return ins;
80 }
81
IOPoller()82 IOPoller::IOPoller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
83 {
84 #ifdef OHOS_STANDARD_SYSTEM
85 fdsan_exchange_owner_tag(m_epFd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
86 #endif
87 m_wakeData.mode = PollerType::WAKEUP;
88 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
89 #ifdef OHOS_STANDARD_SYSTEM
90 fdsan_exchange_owner_tag(m_wakeData.fd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
91 static_cast<uint64_t>(m_wakeData.fd)));
92 #endif
93 epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
94 FFRT_COND_TERMINATE((epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0),
95 "epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
96 }
97
~IOPoller()98 IOPoller::~IOPoller() noexcept
99 {
100 {
101 std::lock_guard lock(m_mapMutex);
102 m_teardown = true;
103 WakeUp();
104 }
105 if (m_runner != nullptr && m_runner->joinable()) {
106 m_runner->join();
107 }
108 #ifdef OHOS_STANDARD_SYSTEM
109 fdsan_close_with_tag(m_wakeData.fd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
110 static_cast<uint64_t>(m_wakeData.fd)));
111 fdsan_close_with_tag(m_epFd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
112 #else
113 ::close(m_wakeData.fd);
114 ::close(m_epFd);
115 #endif
116 }
117
ThreadInit()118 void IOPoller::ThreadInit()
119 {
120 if (m_runner != nullptr && m_runner->joinable()) {
121 m_runner->join();
122 }
123 m_runner = std::make_unique<std::thread>([this] { Run(); });
124 }
125
Run()126 void IOPoller::Run()
127 {
128 struct sched_param param;
129 param.sched_priority = 1;
130 int ret = pthread_setschedparam(pthread_self(), SCHED_RR, ¶m);
131 if (ret != 0) {
132 FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
133 }
134 prctl(PR_SET_NAME, IO_POLLER_NAME);
135 while (1) {
136 ret = PollOnce(30000);
137 std::lock_guard lock(m_mapMutex);
138 if (m_teardown) {
139 // teardown
140 m_exitFlag = true;
141 return;
142 }
143 if (ret == 0 && m_wakeDataMap.empty() && m_syncFdCnt.load() == 0) {
144 // timeout 30s and no fd added
145 m_exitFlag = true;
146 return;
147 }
148 }
149 }
150
WakeUp()151 void IOPoller::WakeUp() noexcept
152 {
153 uint64_t one = 1;
154 (void)::write(m_wakeData.fd, &one, sizeof one);
155 }
156
PollOnce(int timeout)157 int IOPoller::PollOnce(int timeout) noexcept
158 {
159 pollerCount_++;
160 std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
161 int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), timeout);
162 if (nfds < 0) {
163 if (errno != EINTR) {
164 FFRT_SYSEVENT_LOGE("epoll_wait error, errorno= %d.", errno);
165 }
166 return -1;
167 }
168 if (nfds == 0) {
169 return 0;
170 }
171
172 std::unordered_map<CoTask*, EventVec> syncTaskEvents;
173 for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
174 struct WakeData *data = reinterpret_cast<struct WakeData *>(waitedEvents[i].data.ptr);
175 if (data->mode == PollerType::WAKEUP) {
176 // self wakeup
177 uint64_t one = 1;
178 (void)::read(m_wakeData.fd, &one, sizeof one);
179 continue;
180 }
181
182 if (data->mode == PollerType::SYNC_IO) {
183 // sync io wait fd, del fd when waked up
184 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, data->fd, nullptr) != 0) {
185 FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: fd=%d, errorno=%d", data->fd, errno);
186 continue;
187 }
188 m_syncFdCnt--;
189 WakeTask(data->task);
190 continue;
191 }
192
193 if (data->mode == PollerType::ASYNC_CB) {
194 // async io callback
195 timeOutReport.cbStartTime.store(TimeStamp(), std::memory_order_relaxed);
196 timeOutReport.reportCount = 0;
197 #ifdef FFRT_ENABLE_HITRACE_CHAIN
198 if (data->traceId.valid == HITRACE_ID_VALID) {
199 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data->traceId);
200 }
201 #endif
202 data->cb(data->data, waitedEvents[i].events);
203 timeOutReport.cbStartTime.store(0, std::memory_order_relaxed);
204 #ifdef FFRT_ENABLE_HITRACE_CHAIN
205 if (data->traceId.valid == HITRACE_ID_VALID) {
206 TraceChainAdapter::Instance().HiTraceChainClearId();
207 }
208 #endif
209 timeOutReport.cbStartTime = 0;
210 continue;
211 }
212
213 if (data->mode == PollerType::ASYNC_IO) {
214 // async io task wait fd
215 epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = data->fd} };
216 syncTaskEvents[data->task].push_back(ev);
217 if ((waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) != 0) {
218 std::lock_guard lock(m_mapMutex);
219 CacheMaskFdAndEpollDel(data->fd, data->task);
220 }
221 }
222 }
223
224 WakeSyncTask(syncTaskEvents);
225 ReleaseFdWakeData();
226 return 1;
227 }
228
229 // mode ASYNC_CB/ASYNC_IO
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)230 int IOPoller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
231 {
232 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
233 auto wakeData = std::make_unique<WakeData>(fd, data, cb, task);
234 if (task) {
235 task->pollerEnable = true;
236 }
237 void* ptr = static_cast<void*>(wakeData.get());
238 if (ptr == nullptr || wakeData == nullptr) {
239 FFRT_SYSEVENT_LOGE("Construct WakeData instance failed! or wakeData is nullptr");
240 return -1;
241 }
242 wakeData->monitorEvents = events;
243
244 epoll_event ev = { .events = events, .data = { .ptr = ptr } };
245 std::lock_guard lock(m_mapMutex);
246 if (m_teardown) {
247 return -1;
248 }
249
250 if (m_exitFlag) {
251 ThreadInit();
252 m_exitFlag = false;
253 }
254
255 if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
256 FFRT_SYSEVENT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
257 return -1;
258 }
259
260 if (op == EPOLL_CTL_ADD) {
261 m_wakeDataMap[fd].emplace_back(std::move(wakeData));
262 } else if (op == EPOLL_CTL_MOD) {
263 auto iter = m_wakeDataMap.find(fd);
264 FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
265 if (iter->second.size() != 1) {
266 FFRT_SYSEVENT_LOGE("epoll_ctl mod fd wakedata num invalid");
267 return -1;
268 }
269 iter->second.pop_back();
270 iter->second.emplace_back(std::move(wakeData));
271 }
272 return 0;
273 }
274
DelFdEvent(int fd)275 int IOPoller::DelFdEvent(int fd) noexcept
276 {
277 std::lock_guard lock(m_mapMutex);
278 ClearDelFdCache(fd);
279 auto wakeDataIter = m_wakeDataMap.find(fd);
280 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
281 FFRT_SYSEVENT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
282 return -1;
283 }
284 auto delCntIter = m_delCntMap.find(fd);
285 if (delCntIter != m_delCntMap.end()) {
286 int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
287 if (diff == 0) {
288 FFRT_SYSEVENT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
289 wakeDataIter->second.size(), delCntIter->second);
290 return -1;
291 }
292 }
293
294 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
295 FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
296 return -1;
297 }
298
299 for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
300 auto& events = it->second;
301 events.erase(std::remove_if(events.begin(), events.end(),
302 [fd](const epoll_event& event) {
303 return event.data.fd == fd;
304 }), events.end());
305
306 if (events.empty()) {
307 it = m_cachedTaskEvents.erase(it);
308 } else {
309 ++it;
310 }
311 }
312
313 m_delCntMap[fd]++;
314 WakeUp();
315 return 0;
316 }
317
318 // mode ASYNC_IO
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)319 int IOPoller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
320 {
321 FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
322
323 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
324 if (!task) {
325 FFRT_SYSEVENT_LOGE("nonworker shall not call this fun.");
326 return -1;
327 }
328
329 FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
330
331 int nfds = 0;
332 std::unique_lock<std::mutex> lck(task->mutex_);
333 if (task->Block() == BlockType::BLOCK_THREAD) {
334 std::unique_lock mapLock(m_mapMutex);
335 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
336 if (cachedNfds > 0) {
337 mapLock.unlock();
338 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
339 task->GetLabel().c_str(), task->gid, cachedNfds);
340 task->Wake();
341 return cachedNfds;
342 }
343
344 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
345 FFRT_SYSEVENT_LOGE("task has waited before");
346 mapLock.unlock();
347 task->Wake();
348 return 0;
349 }
350 auto currTime = std::chrono::steady_clock::now();
351 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
352 if (timeout > -1) {
353 FFRT_LOGD("poller meet timeout={%d}", timeout);
354 m_waitTaskMap[task].timerHandle = FFRTFacade::GetTMInstance().RegisterTimer(task->qos_, timeout,
355 reinterpret_cast<void*>(task), TimeoutProc);
356 }
357 mapLock.unlock();
358 task->waitCond_.wait(lck);
359 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
360 task->Wake();
361 return nfds;
362 }
363 lck.unlock();
364
365 CoWait([&](CoTask *task)->bool {
366 std::unique_lock mapLock(m_mapMutex);
367 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
368 if (cachedNfds > 0) {
369 mapLock.unlock();
370 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
371 task->GetLabel().c_str(), task->gid, cachedNfds);
372 nfds = cachedNfds;
373 return false;
374 }
375
376 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
377 FFRT_SYSEVENT_LOGE("task has waited before");
378 return false;
379 }
380 auto currTime = std::chrono::steady_clock::now();
381 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
382 if (timeout > -1) {
383 FFRT_LOGD("poller meet timeout={%d}", timeout);
384 m_waitTaskMap[task].timerHandle = FFRTFacade::GetTMInstance().RegisterTimer(task->qos_, timeout,
385 reinterpret_cast<void*>(task), TimeoutProc);
386 }
387 // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
388 return true;
389 });
390 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
391 return nfds;
392 }
393
WakeTimeoutTask(CoTask * task)394 void IOPoller::WakeTimeoutTask(CoTask* task) noexcept
395 {
396 std::unique_lock mapLock(m_mapMutex);
397 auto iter = m_waitTaskMap.find(task);
398 if (iter != m_waitTaskMap.end()) {
399 // wake task, erase from wait map
400 m_waitTaskMap.erase(iter);
401 mapLock.unlock();
402 WakeTask(task);
403 }
404 }
405
WakeSyncTask(std::unordered_map<CoTask *,EventVec> & syncTaskEvents)406 void IOPoller::WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept
407 {
408 if (syncTaskEvents.empty()) {
409 return;
410 }
411
412 std::unordered_set<int> timerHandlesToRemove;
413 std::unordered_set<CoTask*> tasksToWake;
414 {
415 std::lock_guard lg(m_mapMutex);
416 for (auto& taskEventPair : syncTaskEvents) {
417 CoTask* currTask = taskEventPair.first;
418 auto iter = m_waitTaskMap.find(currTask);
419 if (iter == m_waitTaskMap.end()) { // task not in wait map
420 CacheEventsAndDoMask(currTask, taskEventPair.second);
421 continue;
422 }
423 CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
424 // remove timer, wake task, erase from wait map
425 auto timerHandle = iter->second.timerHandle;
426 if (timerHandle > -1) {
427 timerHandlesToRemove.insert(timerHandle);
428 }
429 tasksToWake.insert(currTask);
430 m_waitTaskMap.erase(iter);
431 }
432 }
433 for (auto timerHandle : timerHandlesToRemove) {
434 FFRTFacade::GetTMInstance().UnregisterTimer(timerHandle);
435 }
436 for (auto task : tasksToWake) {
437 WakeTask(task);
438 }
439 }
440
441 // mode SYNC_IO
WaitFdEvent(int fd)442 void IOPoller::WaitFdEvent(int fd) noexcept
443 {
444 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
445 if (!task) {
446 FFRT_LOGI("nonworker shall not call this fun.");
447 return;
448 }
449
450 struct WakeData data(fd, task);
451 epoll_event ev = { .events = EPOLLIN, .data = {.ptr = static_cast<void*>(&data)} };
452 {
453 std::lock_guard lock(m_mapMutex);
454 if (m_teardown) {
455 return;
456 }
457
458 if (m_exitFlag) {
459 ThreadInit();
460 m_exitFlag = false;
461 }
462
463 m_syncFdCnt++;
464 }
465
466 FFRT_BLOCK_TRACER(task->gid, fd);
467 if (task->Block() == BlockType::BLOCK_THREAD) {
468 std::unique_lock<std::mutex> lck(task->mutex_);
469 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
470 task->waitCond_.wait(lck);
471 }
472 task->Wake();
473 m_syncFdCnt--;
474 return;
475 }
476
477 CoWait([&](CoTask *task)->bool {
478 (void)task;
479 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
480 return true;
481 }
482 // The ownership of the task belongs to epoll, and the task cannot be accessed any more.
483 FFRT_LOGI("epoll_ctl add err:efd:=%d, fd=%d errorno = %d", m_epFd, fd, errno);
484 m_syncFdCnt--;
485 return false;
486 });
487 }
488
ReleaseFdWakeData()489 void IOPoller::ReleaseFdWakeData() noexcept
490 {
491 std::lock_guard lock(m_mapMutex);
492 for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
493 int delFd = delIter->first;
494 unsigned int delCnt = static_cast<unsigned int>(delIter->second);
495 auto& wakeDataList = m_wakeDataMap[delFd];
496 unsigned int diff = wakeDataList.size() - delCnt;
497 if (diff == 0) {
498 m_wakeDataMap.erase(delFd);
499 m_delCntMap.erase(delIter++);
500 continue;
501 } else if (diff == 1) {
502 for (unsigned int i = 0; i < delCnt - 1; i++) {
503 wakeDataList.pop_front();
504 }
505 m_delCntMap[delFd] = 1;
506 } else {
507 FFRT_SYSEVENT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
508 }
509 delIter++;
510 }
511 }
512
CacheMaskFdAndEpollDel(int fd,CoTask * task)513 void IOPoller::CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept
514 {
515 auto maskWakeData = m_maskWakeDataMap.find(task);
516 if (maskWakeData != m_maskWakeDataMap.end()) {
517 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
518 FFRT_SYSEVENT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
519 }
520 m_delFdCacheMap.emplace(fd, task);
521 }
522 }
523
ClearMaskWakeDataCache(CoTask * task)524 int IOPoller::ClearMaskWakeDataCache(CoTask *task) noexcept
525 {
526 auto maskWakeDataIter = m_maskWakeDataMap.find(task);
527 if (maskWakeDataIter != m_maskWakeDataMap.end()) {
528 WakeDataList& wakeDataList = maskWakeDataIter->second;
529 for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
530 WakeData* ptr = iter->get();
531 m_delFdCacheMap.erase(ptr->fd);
532 }
533 m_maskWakeDataMap.erase(maskWakeDataIter);
534 }
535 return 0;
536 }
537
ClearMaskWakeDataCacheWithFd(CoTask * task,int fd)538 int IOPoller::ClearMaskWakeDataCacheWithFd(CoTask *task, int fd) noexcept
539 {
540 auto maskWakeDataIter = m_maskWakeDataMap.find(task);
541 if (maskWakeDataIter != m_maskWakeDataMap.end()) {
542 WakeDataList& wakeDataList = maskWakeDataIter->second;
543 auto pred = [fd](auto& value) { return value->fd == fd; };
544 wakeDataList.remove_if(pred);
545 if (wakeDataList.size() == 0) {
546 m_maskWakeDataMap.erase(maskWakeDataIter);
547 }
548 }
549 return 0;
550 }
551
ClearDelFdCache(int fd)552 int IOPoller::ClearDelFdCache(int fd) noexcept
553 {
554 auto fdDelCacheIter = m_delFdCacheMap.find(fd);
555 if (fdDelCacheIter != m_delFdCacheMap.end()) {
556 CoTask *task = fdDelCacheIter->second;
557 ClearMaskWakeDataCacheWithFd(task, fd);
558 m_delFdCacheMap.erase(fdDelCacheIter);
559 }
560 return 0;
561 }
562
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)563 int IOPoller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
564 {
565 std::unordered_map<int, int> seenFd;
566 int fdCnt = 0;
567 for (size_t i = 0; i < cachedEventsVec.size(); i++) {
568 auto eventInfo = cachedEventsVec[i];
569 int currFd = eventInfo.data.fd;
570 // check if seen
571 auto iter = seenFd.find(currFd);
572 if (iter == seenFd.end()) {
573 // if not seen, copy cached events and record idx
574 eventsVec[fdCnt].data.fd = currFd;
575 eventsVec[fdCnt].events = eventInfo.events;
576 seenFd[currFd] = fdCnt;
577 fdCnt++;
578 } else {
579 // if seen, update event to newest
580 eventsVec[iter->second].events |= eventInfo.events;
581 FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
582 continue;
583 }
584
585 // Unmask to origin events
586 auto wakeDataIter = m_wakeDataMap.find(currFd);
587 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
588 FFRT_LOGD("fd[%d] may be deleted", currFd);
589 continue;
590 }
591
592 auto& wakeData = wakeDataIter->second.back();
593 epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
594 auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
595 if (fdDelCacheIter != m_delFdCacheMap.end()) {
596 ClearDelFdCache(currFd);
597 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
598 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
599 continue;
600 }
601 } else {
602 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
603 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
604 continue;
605 }
606 }
607 }
608 return fdCnt;
609 }
610
FetchCachedEventAndDoUnmask(CoTask * task,struct epoll_event * eventsVec)611 int IOPoller::FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept
612 {
613 // should used in lock
614 auto syncTaskIter = m_cachedTaskEvents.find(task);
615 if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
616 return 0;
617 }
618
619 int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
620 m_cachedTaskEvents.erase(syncTaskIter);
621 ClearMaskWakeDataCache(task);
622 return nfds;
623 }
624
CacheEventsAndDoMask(CoTask * task,EventVec & eventVec)625 void IOPoller::CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept
626 {
627 auto& syncTaskEvents = m_cachedTaskEvents[task];
628 for (size_t i = 0; i < eventVec.size(); i++) {
629 int currFd = eventVec[i].data.fd;
630
631 auto wakeDataIter = m_wakeDataMap.find(currFd);
632 if (wakeDataIter == m_wakeDataMap.end() ||
633 wakeDataIter->second.size() == 0 ||
634 wakeDataIter->second.back()->task != task) {
635 FFRT_LOGD("fd[%d] may be deleted", currFd);
636 continue;
637 }
638
639 auto delIter = m_delCntMap.find(currFd);
640 if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
641 FFRT_LOGD("fd[%d] may be deleted", currFd);
642 continue;
643 }
644
645 struct epoll_event maskEv;
646 maskEv.events = 0;
647 auto& wakeData = wakeDataIter->second.back();
648 std::unique_ptr<struct WakeData> maskWakeData = std::make_unique<WakeData>(currFd,
649 wakeData->data, wakeData->cb, wakeData->task);
650 void* ptr = static_cast<void*>(maskWakeData.get());
651 if (ptr == nullptr || maskWakeData == nullptr) {
652 FFRT_SYSEVENT_LOGE("CacheEventsAndDoMask Construct WakeData instance failed! or wakeData is nullptr");
653 continue;
654 }
655 maskWakeData->monitorEvents = 0;
656 m_maskWakeDataMap[task].emplace_back(std::move(maskWakeData));
657
658 maskEv.data = {.ptr = ptr};
659 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
660 // ENOENT indicate fd is not in epfd, may be deleted
661 FFRT_SYSEVENT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
662 }
663 FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
664 syncTaskEvents.push_back(eventVec[i]);
665 }
666 }
667
MonitTimeOut()668 void IOPoller::MonitTimeOut()
669 {
670 if (m_teardown) {
671 return;
672 }
673
674 if (timeOutReport.cbStartTime == 0) {
675 return;
676 }
677 uint64_t now = TimeStamp();
678 static const uint64_t freq = [] {
679 uint64_t f = Arm64CntFrq();
680 return (f == 1) ? 1000000 : f;
681 } ();
682 uint64_t diff = (now - timeOutReport.cbStartTime) / freq;
683 if (timeOutReport.reportCount < TIMEOUT_RECORD_CYCLE_LIST.size() &&
684 diff >= TIMEOUT_RECORD_CYCLE_LIST[timeOutReport.reportCount]) {
685 #ifdef FFRT_OH_TRACE_ENABLE
686 std::string dumpInfo;
687 static pid_t pid = syscall(SYS_gettid);
688 if (OHOS::HiviewDFX::GetBacktraceStringByTid(dumpInfo, pid, 0, false)) {
689 FFRT_LOGW("IO_Poller Backtrace Info:\n%s", dumpInfo.c_str());
690 }
691 #endif
692 timeOutReport.reportCount++;
693 }
694 }
695 }
696