1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync/poller.h"
17 #include <securec.h>
18 #include "sched/execute_ctx.h"
19 #include "tm/scpu_task.h"
20 #include "dfx/log/ffrt_log_api.h"
21 #ifdef FFRT_ENABLE_HITRACE_CHAIN
22 #include "dfx/trace/ffrt_trace_chain.h"
23 #endif
24
25 constexpr uint64_t MAX_TIMER_MS_COUNT = 1000ULL * 100 * 60 * 60 * 24 * 365; // 100 year
26
27 namespace ffrt {
Poller()28 Poller::Poller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) }
29 {
30 if (m_epFd < 0) {
31 FFRT_LOGE("epoll_create1 failed: errno=%d", errno);
32 }
33 #ifdef OHOS_STANDARD_SYSTEM
34 fdsan_exchange_owner_tag(m_epFd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
35 #endif
36 m_wakeData.cb = nullptr;
37 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
38 if (m_wakeData.fd < 0) {
39 FFRT_LOGE("eventfd failed: errno=%d", errno);
40 }
41 #ifdef OHOS_STANDARD_SYSTEM
42 fdsan_exchange_owner_tag(m_wakeData.fd, 0, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
43 static_cast<uint64_t>(m_wakeData.fd)));
44 #endif
45 epoll_event ev { .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
46 FFRT_COND_TERMINATE((epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0),
47 "epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, m_wakeData.fd, errno);
48 }
49
~Poller()50 Poller::~Poller() noexcept
51 {
52 #ifdef OHOS_STANDARD_SYSTEM
53 fdsan_close_with_tag(m_wakeData.fd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE,
54 static_cast<uint64_t>(m_wakeData.fd)));
55 fdsan_close_with_tag(m_epFd, fdsan_create_owner_tag(FDSAN_OWNER_TYPE_FILE, static_cast<uint64_t>(m_epFd)));
56 #else
57 ::close(m_wakeData.fd);
58 ::close(m_epFd);
59 #endif
60 timerHandle_ = -1;
61 {
62 std::lock_guard lg(m_mapMutex);
63 m_wakeDataMap.clear();
64 m_delCntMap.clear();
65 m_waitTaskMap.clear();
66 m_cachedTaskEvents.clear();
67 }
68 {
69 std::lock_guard lg(timerMutex_);
70 timerMap_.clear();
71 executedHandle_.clear();
72 }
73 flag_ = EpollStatus::TEARDOWN;
74 }
75
Instance()76 PollerProxy& PollerProxy::Instance()
77 {
78 static PollerProxy pollerInstance;
79 return pollerInstance;
80 }
81
AddFdEvent(int op,uint32_t events,int fd,void * data,ffrt_poller_cb cb)82 int Poller::AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept
83 {
84 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
85 auto wakeData = std::make_unique<WakeDataWithCb>(fd, data, cb, task);
86 if (task) {
87 task->pollerEnable = true;
88 }
89 void* ptr = static_cast<void*>(wakeData.get());
90 if (ptr == nullptr || wakeData == nullptr) {
91 FFRT_SYSEVENT_LOGE("Construct WakeDataWithCb instance failed! or wakeData is nullptr");
92 return -1;
93 }
94 wakeData->monitorEvents = events;
95
96 epoll_event ev = { .events = events, .data = { .ptr = ptr } };
97 std::lock_guard lg(m_mapMutex);
98 if (epoll_ctl(m_epFd, op, fd, &ev) != 0) {
99 FFRT_SYSEVENT_LOGE("epoll_ctl add fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
100 return -1;
101 }
102
103 if (op == EPOLL_CTL_ADD) {
104 m_wakeDataMap[fd].emplace_back(std::move(wakeData));
105 fdEmpty_.store(false);
106 } else if (op == EPOLL_CTL_MOD) {
107 auto iter = m_wakeDataMap.find(fd);
108 FFRT_COND_RETURN_ERROR(iter == m_wakeDataMap.end(), -1, "fd %d does not exist in wakeDataMap", fd);
109 if (iter->second.size() != 1) {
110 FFRT_SYSEVENT_LOGE("epoll_ctl mod fd wakedata num invalid");
111 return -1;
112 }
113 iter->second.pop_back();
114 iter->second.emplace_back(std::move(wakeData));
115 }
116 return 0;
117 }
118
CacheMaskFdAndEpollDel(int fd,CoTask * task)119 void Poller::CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept
120 {
121 auto maskWakeDataWithCb = m_maskWakeDataWithCbMap.find(task);
122 if (maskWakeDataWithCb != m_maskWakeDataWithCbMap.end()) {
123 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
124 FFRT_SYSEVENT_LOGE("fd[%d] ffrt epoll ctl del fail errorno=%d", fd, errno);
125 }
126 CacheDelFd(fd, task);
127 }
128 }
129
ClearMaskWakeDataWithCbCache(CoTask * task)130 int Poller::ClearMaskWakeDataWithCbCache(CoTask *task) noexcept
131 {
132 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
133 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
134 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
135 for (auto iter = wakeDataList.begin(); iter != wakeDataList.end(); ++iter) {
136 WakeDataWithCb* ptr = iter->get();
137 m_delFdCacheMap.erase(ptr->fd);
138 }
139 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
140 }
141 return 0;
142 }
143
ClearMaskWakeDataWithCbCacheWithFd(CoTask * task,int fd)144 int Poller::ClearMaskWakeDataWithCbCacheWithFd(CoTask *task, int fd) noexcept
145 {
146 auto maskWakeDataWithCbIter = m_maskWakeDataWithCbMap.find(task);
147 if (maskWakeDataWithCbIter != m_maskWakeDataWithCbMap.end()) {
148 WakeDataList& wakeDataList = maskWakeDataWithCbIter->second;
149 auto pred = [fd](auto& value) { return value->fd == fd; };
150 wakeDataList.remove_if(pred);
151 if (wakeDataList.size() == 0) {
152 m_maskWakeDataWithCbMap.erase(maskWakeDataWithCbIter);
153 }
154 }
155 return 0;
156 }
157
ClearDelFdCache(int fd)158 int Poller::ClearDelFdCache(int fd) noexcept
159 {
160 auto fdDelCacheIter = m_delFdCacheMap.find(fd);
161 if (fdDelCacheIter != m_delFdCacheMap.end()) {
162 CoTask *task = fdDelCacheIter->second;
163 ClearMaskWakeDataWithCbCacheWithFd(task, fd);
164 m_delFdCacheMap.erase(fdDelCacheIter);
165 }
166 return 0;
167 }
168
DelFdEvent(int fd)169 int Poller::DelFdEvent(int fd) noexcept
170 {
171 std::lock_guard lg(m_mapMutex);
172 ClearDelFdCache(fd);
173 auto wakeDataIter = m_wakeDataMap.find(fd);
174 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
175 FFRT_SYSEVENT_LOGW("fd[%d] has not been added to epoll, ignore", fd);
176 return -1;
177 }
178 auto delCntIter = m_delCntMap.find(fd);
179 if (delCntIter != m_delCntMap.end()) {
180 int diff = static_cast<int>(wakeDataIter->second.size()) - delCntIter->second;
181 if (diff == 0) {
182 FFRT_SYSEVENT_LOGW("fd:%d, addCnt:%d, delCnt:%d has not been added to epoll, ignore", fd,
183 wakeDataIter->second.size(), delCntIter->second);
184 return -1;
185 }
186 }
187
188 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, fd, nullptr) != 0) {
189 FFRT_SYSEVENT_LOGE("epoll_ctl del fd error: efd=%d, fd=%d, errorno=%d", m_epFd, fd, errno);
190 return -1;
191 }
192
193 for (auto it = m_cachedTaskEvents.begin(); it != m_cachedTaskEvents.end();) {
194 auto& events = it->second;
195 events.erase(std::remove_if(events.begin(), events.end(),
196 [fd](const epoll_event& event) {
197 return event.data.fd == fd;
198 }), events.end());
199
200 if (events.empty()) {
201 it = m_cachedTaskEvents.erase(it);
202 } else {
203 ++it;
204 }
205 }
206
207 m_delCntMap[fd]++;
208 WakeUp();
209 return 0;
210 }
211
ClearCachedEvents(CoTask * task)212 void Poller::ClearCachedEvents(CoTask* task) noexcept
213 {
214 std::lock_guard lg(m_mapMutex);
215 auto iter = m_cachedTaskEvents.find(task);
216 if (iter == m_cachedTaskEvents.end()) {
217 return;
218 }
219 m_cachedTaskEvents.erase(iter);
220 ClearMaskWakeDataWithCbCache(task);
221 }
222
FetchCachedEventAndDoUnmask(EventVec & cachedEventsVec,struct epoll_event * eventsVec)223 int Poller::FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
224 {
225 std::unordered_map<int, int> seenFd;
226 int fdCnt = 0;
227 for (size_t i = 0; i < cachedEventsVec.size(); i++) {
228 auto eventInfo = cachedEventsVec[i];
229 int currFd = eventInfo.data.fd;
230 // check if seen
231 auto iter = seenFd.find(currFd);
232 if (iter == seenFd.end()) {
233 // if not seen, copy cached events and record idx
234 eventsVec[fdCnt].data.fd = currFd;
235 eventsVec[fdCnt].events = eventInfo.events;
236 seenFd[currFd] = fdCnt;
237 fdCnt++;
238 } else {
239 // if seen, update event to newest
240 eventsVec[iter->second].events |= eventInfo.events;
241 FFRT_LOGD("fd[%d] has mutilple cached events", currFd);
242 continue;
243 }
244
245 // Unmask to origin events
246 auto wakeDataIter = m_wakeDataMap.find(currFd);
247 if (wakeDataIter == m_wakeDataMap.end() || wakeDataIter->second.size() == 0) {
248 FFRT_LOGD("fd[%d] may be deleted", currFd);
249 continue;
250 }
251
252 auto& wakeData = wakeDataIter->second.back();
253 epoll_event ev = { .events = wakeData->monitorEvents, .data = { .ptr = static_cast<void*>(wakeData.get()) } };
254 auto fdDelCacheIter = m_delFdCacheMap.find(currFd);
255 if (fdDelCacheIter != m_delFdCacheMap.end()) {
256 ClearDelFdCache(currFd);
257 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, currFd, &ev) != 0) {
258 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl add fail, errorno=%d", currFd, errno);
259 continue;
260 }
261 } else {
262 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &ev) != 0) {
263 FFRT_SYSEVENT_LOGE("fd[%d] epoll ctl mod fail, errorno=%d", currFd, errno);
264 continue;
265 }
266 }
267 }
268 return fdCnt;
269 }
270
FetchCachedEventAndDoUnmask(CoTask * task,struct epoll_event * eventsVec)271 int Poller::FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept
272 {
273 // should used in lock
274 auto syncTaskIter = m_cachedTaskEvents.find(task);
275 if (syncTaskIter == m_cachedTaskEvents.end() || syncTaskIter->second.size() == 0) {
276 return 0;
277 }
278
279 int nfds = FetchCachedEventAndDoUnmask(syncTaskIter->second, eventsVec);
280 m_cachedTaskEvents.erase(syncTaskIter);
281 ClearMaskWakeDataWithCbCache(task);
282 return nfds;
283 }
284
WaitFdEvent(struct epoll_event * eventsVec,int maxevents,int timeout)285 int Poller::WaitFdEvent(struct epoll_event* eventsVec, int maxevents, int timeout) noexcept
286 {
287 FFRT_COND_DO_ERR((eventsVec == nullptr), return -1, "eventsVec cannot be null");
288
289 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
290 if (!task) {
291 FFRT_SYSEVENT_LOGE("nonworker shall not call this fun.");
292 return -1;
293 }
294
295 FFRT_COND_DO_ERR((maxevents < EPOLL_EVENT_SIZE), return -1, "maxEvents:%d cannot be less than 1024", maxevents);
296
297 int nfds = 0;
298 if (task->Block() == BlockType::BLOCK_THREAD) {
299 std::unique_lock<std::mutex> lck(task->mutex_);
300 std::unique_lock mapLock(m_mapMutex);
301 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
302 if (cachedNfds > 0) {
303 mapLock.unlock();
304 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
305 task->GetLabel().c_str(), task->gid, cachedNfds);
306 task->Wake();
307 return cachedNfds;
308 }
309
310 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
311 FFRT_SYSEVENT_LOGE("task has waited before");
312 mapLock.unlock();
313 task->Wake();
314 return 0;
315 }
316 auto currTime = std::chrono::steady_clock::now();
317 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
318 if (timeout > -1) {
319 FFRT_LOGD("poller meet timeout={%d}", timeout);
320 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
321 }
322 mapLock.unlock();
323 task->waitCond_.wait(lck);
324 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
325 task->Wake();
326 return nfds;
327 }
328
329 CoWait([&](CoTask *task)->bool {
330 std::unique_lock mapLock(m_mapMutex);
331 int cachedNfds = FetchCachedEventAndDoUnmask(task, eventsVec);
332 if (cachedNfds > 0) {
333 mapLock.unlock();
334 FFRT_LOGD("task[%s] id[%d] has [%d] cached events, return directly",
335 task->GetLabel().c_str(), task->gid, cachedNfds);
336 nfds = cachedNfds;
337 return false;
338 }
339
340 if (m_waitTaskMap.find(task) != m_waitTaskMap.end()) {
341 FFRT_SYSEVENT_LOGE("task has waited before");
342 return false;
343 }
344 auto currTime = std::chrono::steady_clock::now();
345 m_waitTaskMap[task] = {static_cast<void*>(eventsVec), maxevents, &nfds, currTime};
346 if (timeout > -1) {
347 FFRT_LOGD("poller meet timeout={%d}", timeout);
348 m_waitTaskMap[task].timerHandle = RegisterTimer(timeout, nullptr, nullptr);
349 }
350 // The ownership of the task belongs to m_waitTaskMap, and the task cannot be accessed any more.
351 return true;
352 });
353 FFRT_LOGD("task[%s] id[%d] has [%d] events", task->GetLabel().c_str(), task->gid, nfds);
354 return nfds;
355 }
356
WakeUp()357 void Poller::WakeUp() noexcept
358 {
359 uint64_t one = 1;
360 (void)::write(m_wakeData.fd, &one, sizeof one);
361 }
362
ProcessWaitedFds(int nfds,std::unordered_map<CoTask *,EventVec> & syncTaskEvents,std::array<epoll_event,EPOLL_EVENT_SIZE> & waitedEvents)363 void Poller::ProcessWaitedFds(int nfds, std::unordered_map<CoTask*, EventVec>& syncTaskEvents,
364 std::array<epoll_event, EPOLL_EVENT_SIZE>& waitedEvents) noexcept
365 {
366 for (unsigned int i = 0; i < static_cast<unsigned int>(nfds); ++i) {
367 struct WakeDataWithCb *data = reinterpret_cast<struct WakeDataWithCb *>(waitedEvents[i].data.ptr);
368 int currFd = data->fd;
369 if (currFd == m_wakeData.fd) {
370 uint64_t one = 1;
371 (void)::read(m_wakeData.fd, &one, sizeof one);
372 continue;
373 }
374
375 if (data->cb != nullptr) {
376 #ifdef FFRT_ENABLE_HITRACE_CHAIN
377 if (data->traceId.valid == HITRACE_ID_VALID) {
378 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data->traceId);
379 }
380 #endif
381 data->cb(data->data, waitedEvents[i].events);
382 #ifdef FFRT_ENABLE_HITRACE_CHAIN
383 if (data->traceId.valid == HITRACE_ID_VALID) {
384 TraceChainAdapter::Instance().HiTraceChainClearId();
385 }
386 #endif
387 continue;
388 }
389
390 if (data->task != nullptr) {
391 epoll_event ev = { .events = waitedEvents[i].events, .data = {.fd = currFd} };
392 syncTaskEvents[data->task].push_back(ev);
393 if (waitedEvents[i].events & (EPOLLHUP | EPOLLERR)) {
394 std::lock_guard lg(m_mapMutex);
395 CacheMaskFdAndEpollDel(currFd, data->task);
396 }
397 }
398 }
399 }
400
401 namespace {
WakeTask(CoTask * task)402 void WakeTask(CoTask* task)
403 {
404 if (task->GetBlockType() == BlockType::BLOCK_THREAD) {
405 std::lock_guard<std::mutex> lg(task->mutex_);
406 task->waitCond_.notify_one();
407 } else {
408 CoRoutineFactory::CoWakeFunc(task, CoWakeType::NO_TIMEOUT_WAKE);
409 }
410 }
411
CopyEventsToConsumer(EventVec & cachedEventsVec,struct epoll_event * eventsVec)412 int CopyEventsToConsumer(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept
413 {
414 int nfds = cachedEventsVec.size();
415 for (int i = 0; i < nfds; i++) {
416 eventsVec[i].events = cachedEventsVec[i].events;
417 eventsVec[i].data.fd = cachedEventsVec[i].data.fd;
418 }
419 return nfds;
420 }
421
CopyEventsInfoToConsumer(SyncData & taskInfo,EventVec & cachedEventsVec)422 void CopyEventsInfoToConsumer(SyncData& taskInfo, EventVec& cachedEventsVec)
423 {
424 epoll_event* eventsPtr = (epoll_event*)taskInfo.eventsPtr;
425 int* nfdsPtr = taskInfo.nfdsPtr;
426 if (eventsPtr == nullptr || nfdsPtr == nullptr) {
427 FFRT_LOGE("usr ptr is nullptr");
428 return;
429 }
430 *nfdsPtr = CopyEventsToConsumer(cachedEventsVec, eventsPtr);
431 }
432 } // namespace
433
CacheEventsAndDoMask(CoTask * task,EventVec & eventVec)434 void Poller::CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept
435 {
436 auto& syncTaskEvents = m_cachedTaskEvents[task];
437 for (size_t i = 0; i < eventVec.size(); i++) {
438 int currFd = eventVec[i].data.fd;
439 auto wakeDataIter = m_wakeDataMap.find(currFd);
440 if (wakeDataIter == m_wakeDataMap.end() ||
441 wakeDataIter->second.size() == 0 ||
442 wakeDataIter->second.back()->task != task) {
443 FFRT_LOGD("fd[%d] may be deleted", currFd);
444 continue;
445 }
446
447 auto delIter = m_delCntMap.find(currFd);
448 if (delIter != m_delCntMap.end() && wakeDataIter->second.size() == static_cast<size_t>(delIter->second)) {
449 FFRT_LOGD("fd[%d] may be deleted", currFd);
450 continue;
451 }
452
453 struct epoll_event maskEv;
454 maskEv.events = 0;
455 auto& wakeData = wakeDataIter->second.back();
456 std::unique_ptr<struct WakeDataWithCb> maskWakeData = std::make_unique<WakeDataWithCb>(currFd,
457 wakeData->data, wakeData->cb, wakeData->task);
458 void* ptr = static_cast<void*>(maskWakeData.get());
459 if (ptr == nullptr || maskWakeData == nullptr) {
460 FFRT_SYSEVENT_LOGE("CacheEventsAndDoMask Construct WakeDataWithCb instance failed! or wakeData is nullptr");
461 continue;
462 }
463 maskWakeData->monitorEvents = 0;
464 CacheMaskWakeData(task, maskWakeData);
465 maskEv.data = {.ptr = ptr};
466 if (epoll_ctl(m_epFd, EPOLL_CTL_MOD, currFd, &maskEv) != 0 && errno != ENOENT) {
467 // ENOENT indicate fd is not in epfd, may be deleted
468 FFRT_SYSEVENT_LOGW("epoll_ctl mod fd error: efd=%d, fd=%d, errorno=%d", m_epFd, currFd, errno);
469 }
470 FFRT_LOGD("fd[%d] event has no consumer, so cache it", currFd);
471 syncTaskEvents.push_back(eventVec[i]);
472 }
473 }
474
WakeSyncTask(std::unordered_map<CoTask *,EventVec> & syncTaskEvents)475 void Poller::WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept
476 {
477 if (syncTaskEvents.empty()) {
478 return;
479 }
480
481 std::unordered_set<int> timerHandlesToRemove;
482 std::unordered_set<CoTask*> tasksToWake;
483 {
484 std::lock_guard lg(m_mapMutex);
485 for (auto& taskEventPair : syncTaskEvents) {
486 CoTask* currTask = taskEventPair.first;
487 auto iter = m_waitTaskMap.find(currTask);
488 if (iter == m_waitTaskMap.end()) {
489 CacheEventsAndDoMask(currTask, taskEventPair.second);
490 continue;
491 }
492 CopyEventsInfoToConsumer(iter->second, taskEventPair.second);
493 auto timerHandle = iter->second.timerHandle;
494 if (timerHandle > -1) {
495 timerHandlesToRemove.insert(timerHandle);
496 }
497 tasksToWake.insert(currTask);
498 m_waitTaskMap.erase(iter);
499 }
500 }
501 if (timerHandlesToRemove.size() > 0) {
502 std::lock_guard lock(timerMutex_);
503 for (auto it = timerMap_.begin(); it != timerMap_.end();) {
504 if (timerHandlesToRemove.find(it->second.handle) != timerHandlesToRemove.end()) {
505 it = timerMap_.erase(it);
506 } else {
507 ++it;
508 }
509 }
510 timerEmpty_.store(timerMap_.empty());
511 }
512
513 for (auto task : tasksToWake) {
514 WakeTask(task);
515 }
516 }
517
GetTaskWaitTime(CoTask * task)518 uint64_t Poller::GetTaskWaitTime(CoTask* task) noexcept
519 {
520 std::lock_guard lg(m_mapMutex);
521 auto iter = m_waitTaskMap.find(task);
522 if (iter == m_waitTaskMap.end()) {
523 return 0;
524 }
525
526 return std::chrono::duration_cast<std::chrono::seconds>(
527 iter->second.waitTP.time_since_epoch()).count();
528 }
529
PollOnce(int timeout)530 PollerRet Poller::PollOnce(int timeout) noexcept
531 {
532 int realTimeout = timeout;
533 int timerHandle = -1;
534 {
535 std::lock_guard lg(timerMutex_);
536 if (!timerMap_.empty()) {
537 auto cur = timerMap_.begin();
538 timerHandle = cur->second.handle;
539 TimePoint now = std::chrono::steady_clock::now();
540 realTimeout = std::chrono::duration_cast<std::chrono::milliseconds>(
541 cur->first - now).count();
542 if (realTimeout <= 0) {
543 ExecuteTimerCb(now);
544 return PollerRet::RET_TIMER;
545 }
546
547 if (timeout != -1 && realTimeout > timeout) {
548 timerHandle = -1;
549 realTimeout = timeout;
550 }
551
552 flag_ = EpollStatus::WAIT;
553 }
554 }
555
556 pollerCount_++;
557 std::array<epoll_event, EPOLL_EVENT_SIZE> waitedEvents;
558 int nfds = epoll_wait(m_epFd, waitedEvents.data(), waitedEvents.size(), realTimeout);
559 flag_ = EpollStatus::WAKE;
560 if (nfds < 0) {
561 if (errno != EINTR) {
562 FFRT_SYSEVENT_LOGE("epoll_wait error, errorno= %d.", errno);
563 }
564 return PollerRet::RET_NULL;
565 }
566 if (nfds == 0) {
567 if (timerHandle != -1) {
568 std::lock_guard lg(timerMutex_);
569 for (auto it = timerMap_.begin(); it != timerMap_.end(); it++) {
570 if (it->second.handle == timerHandle) {
571 ExecuteTimerCb(it->first);
572 return PollerRet::RET_TIMER;
573 }
574 }
575 }
576 return PollerRet::RET_NULL;
577 }
578
579 std::unordered_map<CoTask*, EventVec> syncTaskEvents;
580 ProcessWaitedFds(nfds, syncTaskEvents, waitedEvents);
581 WakeSyncTask(syncTaskEvents);
582 ReleaseFdWakeData();
583 return PollerRet::RET_EPOLL;
584 }
585
ReleaseFdWakeData()586 void Poller::ReleaseFdWakeData() noexcept
587 {
588 std::lock_guard lg(m_mapMutex);
589 for (auto delIter = m_delCntMap.begin(); delIter != m_delCntMap.end();) {
590 int delFd = delIter->first;
591 unsigned int delCnt = static_cast<unsigned int>(delIter->second);
592 auto& wakeDataList = m_wakeDataMap[delFd];
593 int diff = wakeDataList.size() - delCnt;
594 if (diff == 0) {
595 m_wakeDataMap.erase(delFd);
596 m_delCntMap.erase(delIter++);
597 continue;
598 } else if (diff == 1) {
599 for (unsigned int i = 0; i < delCnt - 1; i++) {
600 wakeDataList.pop_front();
601 }
602 m_delCntMap[delFd] = 1;
603 } else {
604 FFRT_SYSEVENT_LOGE("fd=%d count unexpected, added num=%d, del num=%d", delFd, wakeDataList.size(), delCnt);
605 }
606 delIter++;
607 }
608
609 fdEmpty_.store(m_wakeDataMap.empty());
610 }
611
ProcessTimerDataCb(CoTask * task)612 void Poller::ProcessTimerDataCb(CoTask* task) noexcept
613 {
614 std::lock_guard lg(m_mapMutex);
615 auto iter = m_waitTaskMap.find(task);
616 if (iter != m_waitTaskMap.end()) {
617 WakeTask(task);
618 m_waitTaskMap.erase(iter);
619 }
620 }
621
ExecuteTimerCb(TimePoint timer)622 void Poller::ExecuteTimerCb(TimePoint timer) noexcept
623 {
624 while (!timerMap_.empty()) {
625 auto iter = timerMap_.begin();
626 if (iter->first > timer) {
627 break;
628 }
629
630 TimerDataWithCb data = iter->second;
631 if (data.cb != nullptr) {
632 executedHandle_[data.handle] = TimerStatus::EXECUTING;
633 }
634
635 timerMap_.erase(iter);
636 timerEmpty_.store(timerMap_.empty());
637
638 if (data.cb != nullptr) {
639 timerMutex_.unlock();
640 #ifdef FFRT_ENABLE_HITRACE_CHAIN
641 if (data.traceId.valid == HITRACE_ID_VALID) {
642 TraceChainAdapter::Instance().HiTraceChainRestoreId(&data.traceId);
643 }
644 #endif
645 data.cb(data.data);
646 #ifdef FFRT_ENABLE_HITRACE_CHAIN
647 if (data.traceId.valid == HITRACE_ID_VALID) {
648 TraceChainAdapter::Instance().HiTraceChainClearId();
649 }
650 #endif
651 timerMutex_.lock();
652 executedHandle_[data.handle] = TimerStatus::EXECUTED;
653 } else if (data.task != nullptr) {
654 timerMutex_.unlock();
655 ProcessTimerDataCb(data.task);
656 timerMutex_.lock();
657 }
658
659 if (data.repeat && (executedHandle_.find(data.handle) != executedHandle_.end())) {
660 executedHandle_.erase(data.handle);
661 RegisterTimerImpl(data);
662 }
663 }
664 }
665
RegisterTimerImpl(const TimerDataWithCb & data)666 void Poller::RegisterTimerImpl(const TimerDataWithCb& data) noexcept
667 {
668 if (flag_ == EpollStatus::TEARDOWN) {
669 return;
670 }
671
672 TimePoint absoluteTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(data.timeout);
673 bool wake = timerMap_.empty() || (absoluteTime < timerMap_.begin()->first && flag_ == EpollStatus::WAIT);
674
675 timerMap_.emplace(absoluteTime, data);
676 timerEmpty_.store(false);
677
678 if (wake) {
679 WakeUp();
680 }
681 }
682
RegisterTimer(uint64_t timeout,void * data,ffrt_timer_cb cb,bool repeat)683 int Poller::RegisterTimer(uint64_t timeout, void* data, ffrt_timer_cb cb, bool repeat) noexcept
684 {
685 if (flag_ == EpollStatus::TEARDOWN) {
686 return -1;
687 }
688
689 std::lock_guard lock(timerMutex_);
690 timerHandle_ += 1;
691
692 CoTask* task = IsCoTask(ExecuteCtx::Cur()->task) ? static_cast<CoTask*>(ExecuteCtx::Cur()->task) : nullptr;
693 if (timeout > MAX_TIMER_MS_COUNT) {
694 FFRT_LOGW("timeout exceeds maximum allowed value %llu ms. Clamping to %llu ms.", timeout, MAX_TIMER_MS_COUNT);
695 timeout = MAX_TIMER_MS_COUNT;
696 }
697 TimerDataWithCb timerMapValue(data, cb, task, repeat, timeout);
698 timerMapValue.handle = timerHandle_;
699 RegisterTimerImpl(timerMapValue);
700
701 return timerHandle_;
702 }
703
UnregisterTimer(int handle)704 int Poller::UnregisterTimer(int handle) noexcept
705 {
706 if (flag_ == EpollStatus::TEARDOWN) {
707 return -1;
708 }
709
710 std::lock_guard lock(timerMutex_);
711 auto it = executedHandle_.find(handle);
712 if (it != executedHandle_.end()) {
713 while (it->second == TimerStatus::EXECUTING) {
714 timerMutex_.unlock();
715 std::this_thread::yield();
716 timerMutex_.lock();
717 it = executedHandle_.find(handle);
718 if (it == executedHandle_.end()) {
719 break;
720 }
721 }
722
723 if (it != executedHandle_.end()) {
724 executedHandle_.erase(it);
725 return 0;
726 }
727 }
728
729 bool wake = false;
730 int ret = -1;
731 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
732 if (cur->second.handle == handle) {
733 if (cur == timerMap_.begin() && flag_ == EpollStatus::WAIT) {
734 wake = true;
735 }
736 timerMap_.erase(cur);
737 ret = 0;
738 break;
739 }
740 }
741
742 timerEmpty_.store(timerMap_.empty());
743
744 if (wake) {
745 WakeUp();
746 }
747 return ret;
748 }
749
DetermineEmptyMap()750 bool Poller::DetermineEmptyMap() noexcept
751 {
752 return fdEmpty_ && timerEmpty_;
753 }
754
DeterminePollerReady()755 bool Poller::DeterminePollerReady() noexcept
756 {
757 return IsFdExist() || IsTimerReady();
758 }
759
IsFdExist()760 bool Poller::IsFdExist() noexcept
761 {
762 return !fdEmpty_;
763 }
764
IsTimerReady()765 bool Poller::IsTimerReady() noexcept
766 {
767 TimePoint now = std::chrono::steady_clock::now();
768 std::lock_guard lock(timerMutex_);
769 if (timerMap_.empty()) {
770 return false;
771 }
772
773 if (now >= timerMap_.begin()->first) {
774 return true;
775 }
776 return false;
777 }
778
GetTimerStatus(int handle)779 ffrt_timer_query_t Poller::GetTimerStatus(int handle) noexcept
780 {
781 if (flag_ == EpollStatus::TEARDOWN) {
782 return ffrt_timer_notfound;
783 }
784
785 std::lock_guard lock(timerMutex_);
786 for (auto cur = timerMap_.begin(); cur != timerMap_.end(); cur++) {
787 if (cur->second.handle == handle) {
788 return ffrt_timer_not_executed;
789 }
790 }
791
792 auto it = executedHandle_.find(handle);
793 if (it != executedHandle_.end()) {
794 while (it->second == TimerStatus::EXECUTING) {
795 timerMutex_.unlock();
796 std::this_thread::yield();
797 timerMutex_.lock();
798 it = executedHandle_.find(handle);
799 if (it == executedHandle_.end()) {
800 break;
801 }
802 }
803 return ffrt_timer_executed;
804 }
805
806 return ffrt_timer_notfound;
807 }
808
GetPollCount()809 uint64_t Poller::GetPollCount() noexcept
810 {
811 return pollerCount_;
812 }
813 }
814