1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "poller.h"
16 #include "sched/execute_ctx.h"
17 #include "tm/scpu_task.h"
18 #include "dfx/log/ffrt_log_api.h"
19
20 namespace ffrt {
Poller()21 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
22 {
23 m_wakeData.cb = nullptr;
24 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
25 epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
26 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
27 FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
28 std::terminate();
29 }
30 }
31
~Poller()32 Poller::~Poller() noexcept
33 {
34 ::close(m_wakeData.fd);
35 ::close(m_epFd);
36 timerHandle_ = -1;
37 m_wakeDataMap.clear();
38 m_delCntMap.clear();
39 timerMap_.clear();
40 executedHandle_.clear();
41 flag_ = EpollStatus::TEARDOWN;
42 m_waitTaskMap.clear();
43 m_cachedTaskEvents.clear();
44 }
45
Instance()46 PollerProxy& PollerProxy::Instance()
47 {
48 static PollerProxy pollerInstance;
49 return pollerInstance;
50 }
51
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)52 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
53 {
54 auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, ExecuteCtx::Cur()->task);
55 void* ptr = static_cast<void*>(wakeData.get());
56 if (ptr == nullptr || wakeData == nullptr) {
57 FFRT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
58 return -1;
59 }
60 wakeData->monitorEvents = events;
61
62 epoll_event ev = { .events = events, .data = { .ptr = ptr } };
63 std::unique_lock lock(m_mapMutex);
64 if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
65 FFRT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
66 return -1;
67 }
68
69 if (op == EPOLL_CTL_ADD) {
70 m_wakeDataMap[fd].emplace_back(std::move(wakeData));
71 fdEmpty_.store(false);
72 } else if (op == EPOLL_CTL_MOD) {
73 auto iter = m_wakeDataMap.find(fd);
74 FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
75 if (iter->second.size() != 1) {
76 FFRT_LOGE("epoll_ctl mod fd wakedata num invalid");
77 return -1;
78 }
79 iter->second.pop_back();
80 iter->second.emplace_back(std::move(wakeData));
81 }
82 return 0;
83 }
84
CacheMaskFdAndEpollDel(int fd,CPUEUTask * task)85 void Poller::CacheMaskFdAndEpollDel(int fd, CPUEUTask* task) noexcept
86 {
87 auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
88 if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
89 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
90 FFRT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
91 }
92 CacheDelFd(fd, task);
93 }
94 }
95
ClearMaskWakeDataWithCbCache(CPUEUTask * task)96 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask* task) noexcept
97 {
98 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
99 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
100 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
101 for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
102 WakeDataWithCb* ptr = iter->get();
103 m_delFdCacheMap.erase(ptr->fd);
104 }
105 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
106 }
107 return 0;
108 }
109
ClearMaskWakeDataWithCbCache(CPUEUTask * task,int fd)110 int Poller::ClearMaskWakeDataWithCbCache(CPUEUTask* task, int fd) noexcept
111 {
112 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
113 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
114 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
115 auto pred = [fd](auto& value) { return value->fd == fd; };
116 wakeDataList.remove_if(pred);
117 if (wakeDataList.size() == 0) {
118 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
119 }
120 }
121 return 0;
122 }
123
ClearDelFdCache(int fd)124 int Poller::ClearDelFdCache(int fd) noexcept
125 {
126 auto fdDelCacheIter = m_delFdCacheMap.find(fd);
127 if (fdDelCacheIter != m_delFdCacheMap.end()) {
128 CPUEUTask* task = fdDelCacheIter->second;
129 ClearMaskWakeDataWithCbCache(task, fd);
130 m_delFdCacheMap.erase(fdDelCacheIter);
131 }
132 return 0;
133 }
134
DelFdEvent(int fd)135 int Poller::DelFdEvent(int fd) noexcept
136 {
137 std::unique_lock lock(m_mapMutex);
138 ClearDelFdCache(fd);
139 auto wakeDataIter = m_wakeDataMap.find(fd);
140 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
141 FFRT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
142 return -1;
143 }
144 auto delCntIter = m_delCntMap.find(fd);
145 if (delCntIter != m_delCntMap.end()) {
146 int diff = wakeDataIter->second.size() - delCntIter->second;
147 if (diff == 0) {
148 FFRT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
149 wakeDataIter->second.size(), delCntIter->second);
150 return -1;
151 }
152 }
153
154 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
155 FFRT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
156 return -1;
157 }
158
159 for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
160 auto& events = it->second;
161 events.erase(std::remove_if(events.begin(), events.end(),
162 [fd](const epoll_event& event) {
163 return event.data.fd == fd;
164 }), events.end());
165
166 if (events.empty()) {
167 it = m_cachedTaskEvents.erase(it);
168 } else {
169 ++it;
170 }
171 }
172
173 m_delCntMap[fd]++;
174 WakeUp();
175 return 0;
176 }
177
ClearCachedEvents(CPUEUTask * task)178 void Poller::ClearCachedEvents(CPUEUTask* task) noexcept
179 {
180 std::unique_lock lock(m_mapMutex);
181 auto iter = m_cachedTaskEvents.find(task);
182 if (iter == m_cachedTaskEvents.end()) {
183 return;
184 }
185 m_cachedTaskEvents.erase(iter);
186 ClearMaskWakeDataWithCbCache(task);
187 }
188
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)189 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
190 {
191 std::unordered_map<int, int> seenFd;
192 int fdCnt = 0;
193 for (size_t i = 0; i < cachedEventsVec.size(); i++) {
194 auto eventInfo = cachedEventsVec[i];
195 int currFd = eventInfo.data.fd;
196 // check if seen
197 auto iter = seenFd.find(currFd);
198 if (iter == seenFd.end()) {
199 // if not seen, copy cached events and record idx
200 eventsVec[fdCnt].data.fd = currFd;
201 eventsVec[fdCnt].events = eventInfo.events;
202 seenFd[currFd] = fdCnt;
203 fdCnt++;
204 } else {
205 // if seen, update event to newest
206 eventsVec[iter->second].events |= eventInfo.events;
207 FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
208 continue;
209 }
210
211 // Unmask to origin events
212 auto wakeDataIter = m_wakeDataMap.find(currFd);
213 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
214 FFRT_LOGD("fd[%d] may be deleted", currFd);
215 continue;
216 }
217
218 auto& wakeData = wakeDataIter->second.back();
219 epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
220 auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
221 if (fdDelCacheIter != m_delFdCacheMap.end()) {
222 ClearDelFdCache(currFd);
223 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
224 FFRT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
225 continue;
226 }
227 } else {
228 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
229 FFRT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
230 continue;
231 }
232 }
233 }
234 return fdCnt;
235 }
236
FetchCachedEventAndDoUnmask(CPUEUTask * task,struct epoll_event * eventsVec)237 int Poller::FetchCachedEventAndDoUnmask(CPUEUTask* task, struct epoll_event* eventsVec) noexcept
238 {
239 // should used in lock
240 auto syncTaskIter = m_cachedTaskEvents.find(task);
241 if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
242 return 0;
243 }
244
245 int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
246 m_cachedTaskEvents.erase(syncTaskIter);
247 ClearMaskWakeDataWithCbCache(task);
248 return nfds;
249 }
250
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)251 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
252 {
253 FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
254
255 auto task = ExecuteCtx::Cur()->task;
256 if (!task) {
257 FFRT_LOGE("nonworker shall not call this fun.");
258 return -1;
259 }
260
261 FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
262
263 int nfds = 0;
264 if (ThreadWaitMode(task)) {
265 std::unique_lock<std::mutex> lck(task->mutex_);
266 m_mapMutex.lock();
267 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
268 if (cachedNfds > 0) {
269 m_mapMutex.unlock();
270 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
271 task->label.c_str(), task->gid, cachedNfds);
272 return cachedNfds;
273 }
274
275 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
276 FFRT_LOGE("task has waited before");
277 m_mapMutex.unlock();
278 return 0;
279 }
280 if (FFRT_UNLIKELY(LegacyMode(task))) {
281 task->blockType = BlockType::BLOCK_THREAD;
282 }
283 auto currTime = std::chrono::steady_clock::now();
284 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
285 if (timeout > -1) {
286 FFRT_LOGD("poller meet timeout={%d}", timeout);
287 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
288 }
289 m_mapMutex.unlock();
290 reinterpret_cast<SCPUEUTask*>(task)->waitCond_.wait(lck);
291 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
292 return nfds;
293 }
294
295 CoWait([&](CPUEUTask *task)->bool {
296 m_mapMutex.lock();
297 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
298 if (cachedNfds > 0) {
299 m_mapMutex.unlock();
300 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
301 task->label.c_str(), task->gid, cachedNfds);
302 nfds = cachedNfds;
303 return false;
304 }
305
306 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
307 FFRT_LOGE("task has waited before");
308 m_mapMutex.unlock();
309 return false;
310 }
311 auto currTime = std::chrono::steady_clock::now();
312 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
313 if (timeout > -1) {
314 FFRT_LOGD("poller meet timeout={%d}", timeout);
315 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
316 }
317 m_mapMutex.unlock();
318 // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
319 return true;
320 });
321 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->label.c_str(), task->gid, nfds);
322 return nfds;
323 }
324
WakeUp()325 void Poller::WakeUp() noexcept
326 {
327 uint64_t one = 1;
328 (void)::write(m_wakeData.fd, &one, sizeof one);
329 }
330
ProcessWaitedFds(int nfds,std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)331 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents,
332 std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
333 {
334 for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
335 struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
336 int currFd = data->fd;
337 if (currFd == m_wakeData.fd) {
338 uint64_t one = 1;
339 (void)::read(m_wakeData.fd, &one, sizeof one);
340 continue;
341 }
342
343 if (data->cb != nullptr) {
344 data->cb(data->data, waitedEvents[i].events);
345 continue;
346 }
347
348 if (data->task != nullptr) {
349 epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
350 syncTaskEvents[data->task].push_back(ev);
351 if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
352 std::unique_lock lock(m_mapMutex);
353 // 检查当前task是否做过掩码监听业务,如果是,取消fd的监听并且标记fd为FFRT主动取消监听
354 CacheMaskFdAndEpollDel(currFd, data->task);
355 }
356 }
357 }
358 }
359
360 namespace {
WakeTask(CPUEUTask * task)361 void WakeTask(CPUEUTask* task)
362 {
363 if (ThreadNotifyMode(task)) {
364 std::unique_lock<std::mutex> lck(task->mutex_);
365 if (BlockThread(task)) {
366 task->blockType = BlockType::BLOCK_COROUTINE;
367 }
368 reinterpret_cast<SCPUEUTask*>(task)->waitCond_.notify_one();
369 } else {
370 CoRoutineFactory::CoWakeFunc(task, false);
371 }
372 }
373
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)374 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
375 {
376 int nfds = cachedEventsVec.size();
377 for (int i = 0; i < nfds; i++) {
378 eventsVec[i].events = cachedEventsVec[i].events;
379 eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
380 }
381 return nfds;
382 }
383
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)384 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
385 {
386 epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
387 int* nfdsPtr = taskInfo.nfdsPtr;
388 if (eventsPtr == nullptr || nfdsPtr == nullptr) {
389 FFRT_LOGE("usr ptr is nullptr");
390 return;
391 }
392 *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
393 }
394 } // namespace
395
CacheEventsAndDoMask(CPUEUTask * task,EventVec & eventVec)396 void Poller::CacheEventsAndDoMask(CPUEUTask* task, EventVec& eventVec) noexcept
397 {
398 auto& syncTaskEvents = m_cachedTaskEvents[task];
399 for (size_t i = 0; i < eventVec.size(); i++) {
400 int currFd = eventVec[i].data.fd;
401 auto wakeDataIter = m_wakeDataMap.find(currFd);
402 if (wakeDataIter == m_wakeDataMap.end() ||
403 wakeDataIter->second.size() == 0 ||
404 wakeDataIter->second.back()->task != task) {
405 FFRT_LOGD("fd[%d] may be deleted", currFd);
406 continue;
407 }
408
409 auto delIter = m_delCntMap.find(currFd);
410 if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
411 FFRT_LOGD("fd[%d] may be deleted", currFd);
412 continue;
413 }
414
415 struct epoll_event maskEv;
416 maskEv.events = 0;
417 auto& wakeData = wakeDataIter->second.back();
418 std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
419 wakeData->data, wakeData->cb, wakeData->task);
420 void* ptr = static_cast<void*>(maskWakeData.get());
421 if (ptr == nullptr || maskWakeData == nullptr) {
422 FFRT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
423 continue;
424 }
425 maskWakeData->monitorEvents = 0;
426 CacheMaskWakeData(task, maskWakeData);
427
428 maskEv.data = {.ptr = ptr};
429 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
430 // ENOENT indicate fd is not in epfd, may be deleted
431 FFRT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
432 }
433 FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
434 syncTaskEvents.push_back(eventVec[i]);
435 }
436 }
437
WakeSyncTask(std::unordered_map<CPUEUTask *,EventVec> & syncTaskEvents)438 void Poller::WakeSyncTask(std::unordered_map<CPUEUTask*, EventVec>& syncTaskEvents) noexcept
439 {
440 if (syncTaskEvents.empty()) {
441 return;
442 }
443
444 std::unordered_set<int> timerHandlesToRemove;
445 std::unordered_set<CPUEUTask*> tasksToWake;
446 m_mapMutex.lock();
447 for (auto& taskEventPair : syncTaskEvents) {
448 CPUEUTask* currTask = taskEventPair.first;
449 auto iter = m_waitTaskMap.find(currTask);
450 if (iter == m_waitTaskMap.end()) {
451 CacheEventsAndDoMask(currTask, taskEventPair.second);
452 continue;
453 }
454 CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
455 auto timerHandle = iter->second.timerHandle;
456 if (timerHandle > -1) {
457 timerHandlesToRemove.insert(timerHandle);
458 }
459 tasksToWake.insert(currTask);
460 m_waitTaskMap.erase(iter);
461 }
462 m_mapMutex.unlock();
463 if (timerHandlesToRemove.size() > 0) {
464 std::lock_guard lock(timerMutex_);
465 for (auto it = timerMap_.begin(); it != timerMap_.end();) {
466 if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
467 it = timerMap_.erase(it);
468 } else {
469 ++it;
470 }
471 }
472 timerEmpty_.store(timerMap_.empty());
473 }
474
475 for (auto task : tasksToWake) {
476 WakeTask(task);
477 }
478 }
479
GetTaskWaitTime(CPUEUTask * task)480 uint64_t Poller::GetTaskWaitTime(CPUEUTask* task) noexcept
481 {
482 std::unique_lock lock(m_mapMutex);
483 auto iter = m_waitTaskMap.find(task);
484 if (iter == m_waitTaskMap.end()) {
485 return 0;
486 }
487
488 return std::chrono::duration_cast<std::chrono::seconds>(
489 iter->second.waitTP.time_since_epoch()).count();
490 }
491
PollOnce(int timeout)492 PollerRet Poller::PollOnce(int timeout) noexcept
493 {
494 int realTimeout = timeout;
495 int timerHandle = -1;
496
497 timerMutex_.lock();
498 if (!timerMap_.empty()) {
499 auto cur = timerMap_.begin();
500 timerHandle = cur->second.handle;
501 TimePoint now = std::chrono::steady_clock::now();
502 realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
503 cur->first - now).count();
504 if (realTimeout <= 0) {
505 ExecuteTimerCb(now);
506 return PollerRet::RET_TIMER;
507 }
508
509 if (timeout != -1 && realTimeout > timeout) {
510 timerHandle = -1;
511 realTimeout = timeout;
512 }
513
514 flag_ = EpollStatus::WAIT;
515 }
516 timerMutex_.unlock();
517
518 pollerCount_++;
519 std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
520 int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
521 flag_ = EpollStatus::WAKE;
522 if (nfds < 0) {
523 if (errno != EINTR) {
524 FFRT_LOGE("epoll_wait error, errorno= %d.", errno);
525 }
526 return PollerRet::RET_NULL;
527 }
528 if (nfds == 0) {
529 if (timerHandle != -1) {
530 timerMutex_.lock();
531 for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
532 if (it->second.handle == timerHandle) {
533 ExecuteTimerCb(it->first);
534 return PollerRet::RET_TIMER;
535 }
536 }
537 timerMutex_.unlock();
538 }
539 return PollerRet::RET_NULL;
540 }
541
542 std::unordered_map<CPUEUTask*, EventVec> syncTaskEvents;
543 ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
544 WakeSyncTask(syncTaskEvents);
545 ReleaseFdWakeData();
546 return PollerRet::RET_EPOLL;
547 }
548
ReleaseFdWakeData()549 void Poller::ReleaseFdWakeData() noexcept
550 {
551 std::unique_lock lock(m_mapMutex);
552 for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
553 int delFd = delIter->first;
554 unsigned int delCnt = static_cast<unsigned int>(delIter->second);
555 auto& wakeDataList = m_wakeDataMap[delFd];
556 int diff = wakeDataList.size() - delCnt;
557 if (diff == 0) {
558 m_wakeDataMap.erase(delFd);
559 m_delCntMap.erase(delIter++);
560 continue;
561 } else if (diff == 1) {
562 for (unsigned int i = 0; i < delCnt - 1; i++) {
563 wakeDataList.pop_front();
564 }
565 m_delCntMap[delFd] = 1;
566 } else {
567 FFRT_LOGD("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
568 }
569 delIter++;
570 }
571
572 fdEmpty_.store(m_wakeDataMap.empty());
573 }
574
ProcessTimerDataCb(CPUEUTask * task)575 void Poller::ProcessTimerDataCb(CPUEUTask* task) noexcept
576 {
577 m_mapMutex.lock();
578 auto iter = m_waitTaskMap.find(task);
579 if (iter != m_waitTaskMap.end()) {
580 WakeTask(task);
581 m_waitTaskMap.erase(iter);
582 }
583 m_mapMutex.unlock();
584 }
585
ExecuteTimerCb(TimePoint timer)586 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
587 {
588 std::vector<TimerDataWithCb> timerData;
589 for (auto iter = timerMap_.begin(); iter != timerMap_.end();) {
590 if (iter->first <= timer) {
591 timerData.emplace_back(iter->second);
592 if (iter->second.cb != nullptr) {
593 executedHandle_[iter->second.handle] = TimerStatus::EXECUTING;
594 }
595 iter = timerMap_.erase(iter);
596 continue;
597 }
598 break;
599 }
600 timerEmpty_.store(timerMap_.empty());
601 timerMutex_.unlock();
602 for (const auto& data : timerData) {
603 if (data.cb) {
604 data.cb(data.data);
605 } else if (data.task != nullptr) {
606 ProcessTimerDataCb(data.task);
607 }
608
609 if (data.cb != nullptr) {
610 std::lock_guard lock(timerMutex_);
611 executedHandle_[data.handle] = TimerStatus::EXECUTED;
612 }
613 if (data.repeat) {
614 std::lock_guard lock(timerMutex_);
615 auto iter = executedHandle_.find(data.handle);
616 if (iter != executedHandle_.end()) {
617 executedHandle_.erase(data.handle);
618 RegisterTimerImpl(data);
619 }
620 }
621 }
622 }
623
RegisterTimerImpl(const TimerDataWithCb & data)624 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
625 {
626 if (flag_ == EpollStatus::TEARDOWN) {
627 return;
628 }
629
630 TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
631 bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
632
633 timerMap_.emplace(absoluteTime, data);
634 timerEmpty_.store(false);
635
636 if (wake) {
637 WakeUp();
638 }
639 }
640
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)641 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
642 {
643 if (flag_ == EpollStatus::TEARDOWN) {
644 return -1;
645 }
646
647 std::lock_guard lock(timerMutex_);
648 timerHandle_ += 1;
649
650 TimerDataWithCb timerMapValue(data, cb, ExecuteCtx::Cur()->task, repeat, timeout);
651 timerMapValue.handle = timerHandle_;
652 RegisterTimerImpl(timerMapValue);
653
654 return timerHandle_;
655 }
656
UnregisterTimer(int handle)657 int Poller::UnregisterTimer(int handle) noexcept
658 {
659 if (flag_ == EpollStatus::TEARDOWN) {
660 return -1;
661 }
662
663 std::lock_guard lock(timerMutex_);
664 auto it = executedHandle_.find(handle);
665 if (it != executedHandle_.end()) {
666 while (it->second == TimerStatus::EXECUTING) {
667 timerMutex_.unlock();
668 std::this_thread::yield();
669 timerMutex_.lock();
670 it = executedHandle_.find(handle);
671 if (it == executedHandle_.end()) {
672 break;
673 }
674 }
675
676 if (it != executedHandle_.end()) {
677 executedHandle_.erase(it);
678 return 0;
679 }
680 }
681
682 bool wake = false;
683 int ret = -1;
684 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
685 if (cur->second.handle == handle) {
686 if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
687 wake = true;
688 }
689 timerMap_.erase(cur);
690 ret = 0;
691 break;
692 }
693 }
694
695 timerEmpty_.store(timerMap_.empty());
696
697 if (wake) {
698 WakeUp();
699 }
700 return ret;
701 }
702
DetermineEmptyMap()703 bool Poller::DetermineEmptyMap() noexcept
704 {
705 return fdEmpty_ && timerEmpty_;
706 }
707
DeterminePollerReady()708 bool Poller::DeterminePollerReady() noexcept
709 {
710 return IsFdExist() || IsTimerReady();
711 }
712
IsFdExist()713 bool Poller::IsFdExist() noexcept
714 {
715 return !fdEmpty_;
716 }
717
IsTimerReady()718 bool Poller::IsTimerReady() noexcept
719 {
720 TimePoint now = std::chrono::steady_clock::now();
721 std::lock_guard lock(timerMutex_);
722 if (timerMap_.empty()) {
723 return false;
724 }
725
726 if (now >= timerMap_.begin()->first) {
727 return true;
728 }
729 return false;
730 }
731
GetTimerStatus(int handle)732 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
733 {
734 if (flag_ == EpollStatus::TEARDOWN) {
735 return ffrt_timer_notfound;
736 }
737
738 std::lock_guard lock(timerMutex_);
739 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
740 if (cur->second.handle == handle) {
741 return ffrt_timer_not_executed;
742 }
743 }
744
745 auto it = executedHandle_.find(handle);
746 if (it != executedHandle_.end()) {
747 while (it->second == TimerStatus::EXECUTING) {
748 timerMutex_.unlock();
749 std::this_thread::yield();
750 timerMutex_.lock();
751 it = executedHandle_.find(handle);
752 if (it == executedHandle_.end()) {
753 break;
754 }
755 }
756 return ffrt_timer_executed;
757 }
758
759 return ffrt_timer_notfound;
760 }
761
GetPollCount()762 uint64_t Poller::GetPollCount() noexcept
763 {
764 return pollerCount_;
765 }
766 }