1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef FFRT_POLLER_MANAGER_H 17 #define FFRT_POLLER_MANAGER_H 18 19 #ifndef _MSC_VER 20 #include <sys/epoll.h> 21 #include <sys/eventfd.h> 22 #endif 23 #include <list> 24 #include <thread> 25 #include <unordered_map> 26 #include <array> 27 #ifdef USE_OHOS_QOS 28 #include "qos.h" 29 #else 30 #include "staging_qos/sched/qos.h" 31 #endif 32 #include "sync/sync.h" 33 #include "internal_inc/non_copyable.h" 34 #include "c/executor_task.h" 35 #include "sync/poller.h" 36 #include "tm/task_base.h" 37 #ifdef FFRT_ENABLE_HITRACE_CHAIN 38 #include "dfx/trace/ffrt_trace_chain.h" 39 #endif 40 41 namespace ffrt { 42 enum class PollerState { 43 HANDLING, // worker执行事件回调(如果是同步回调函数执行有可能阻塞worker) 44 POLLING, // worker处于epoll_wait睡眠(事件响应) 45 EXITED, // worker没有事件时销毁线程(重新注册时触发创建线程) 46 }; 47 48 // 根据历史继承的能力 49 enum class PollerType { 50 WAKEUP, 51 SYNC_IO, 52 ASYNC_CB, 53 ASYNC_IO, 54 }; 55 56 struct WakeData { WakeDataWakeData57 WakeData() {} WakeDataWakeData58 WakeData(int fdVal, CoTask *taskVal) : fd(fdVal), task(taskVal) 59 { 60 mode = PollerType::SYNC_IO; 61 } WakeDataWakeData62 WakeData(int fdVal, void *dataVal, std::function<void(void *, uint32_t)> cbVal, CoTask *taskVal) 63 : fd(fdVal), data(dataVal), cb(cbVal), task(taskVal) 64 { 65 if (cb == nullptr) { 66 mode = PollerType::ASYNC_IO; 67 } else { 68 mode = PollerType::ASYNC_CB; 69 #ifdef FFRT_ENABLE_HITRACE_CHAIN 70 if (TraceChainAdapter::Instance().HiTraceChainGetId().valid == HITRACE_ID_VALID) { 71 traceId = TraceChainAdapter::Instance().HiTraceChainCreateSpan(); 72 }; 73 #endif 74 } 75 } 76 77 PollerType mode; 78 int fd = 0; 79 void* data = nullptr; 80 std::function<void(void*, uint32_t)> cb = nullptr; 81 CoTask* task = nullptr; 82 uint32_t monitorEvents = 0; 83 HiTraceIdStruct traceId; 84 }; 85 86 struct TimeOutReport { TimeOutReportTimeOutReport87 TimeOutReport() {} 88 std::atomic<uint64_t> cbStartTime = 0; // block info report 89 uint64_t reportCount = 0; 90 }; 91 92 using EventVec = typename std::vector<epoll_event>; 93 class IOPoller : private NonCopyable { 94 static constexpr int EPOLL_EVENT_SIZE = 1024; 95 using WakeDataList = typename std::list<std::unique_ptr<struct WakeData>>; 96 public: 97 static IOPoller& Instance(); 98 ~IOPoller() noexcept; 99 100 int AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept; 101 int DelFdEvent(int fd) noexcept; 102 int WaitFdEvent(struct epoll_event *eventsVec, int maxevents, int timeout) noexcept; 103 void WaitFdEvent(int fd) noexcept; 104 GetPollCount()105 inline uint64_t GetPollCount() noexcept 106 { 107 return pollerCount_; 108 } 109 GetTaskWaitTime(CoTask * task)110 inline uint64_t GetTaskWaitTime(CoTask* task) noexcept 111 { 112 std::lock_guard lock(m_mapMutex); 113 auto iter = m_waitTaskMap.find(task); 114 if (iter == m_waitTaskMap.end()) { 115 return 0; 116 } 117 return std::chrono::duration_cast<std::chrono::seconds>( 118 iter->second.waitTP.time_since_epoch()).count(); 119 } 120 ClearCachedEvents(CoTask * task)121 inline void ClearCachedEvents(CoTask* task) noexcept 122 { 123 std::lock_guard lock(m_mapMutex); 124 auto iter = m_cachedTaskEvents.find(task); 125 if (iter == m_cachedTaskEvents.end()) { 126 return; 127 } 128 m_cachedTaskEvents.erase(iter); 129 ClearMaskWakeDataCache(task); 130 } 131 132 void WakeUp() noexcept; 133 void WakeTimeoutTask(CoTask* task) noexcept; 134 void MonitTimeOut(); 135 136 private: 137 IOPoller() noexcept; 138 139 void ThreadInit(); 140 void Run(); 141 int PollOnce(int timeout = -1) noexcept; 142 143 void ReleaseFdWakeData() noexcept; 144 void WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept; 145 146 void CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept; 147 int FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept; 148 int FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept; 149 void CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept; 150 int ClearMaskWakeDataCache(CoTask *task) noexcept; 151 int ClearMaskWakeDataCacheWithFd(CoTask *task, int fd) noexcept; 152 int ClearDelFdCache(int fd) noexcept; 153 154 int m_epFd; // epoll fd 155 struct WakeData m_wakeData; // self wakeup fd 156 mutable spin_mutex m_mapMutex; 157 struct TimeOutReport timeOutReport; 158 159 std::atomic_uint64_t m_syncFdCnt { 0 }; // record sync fd counts 160 // record async fd and events 161 std::unordered_map<int, WakeDataList> m_wakeDataMap; 162 std::unordered_map<int, int> m_delCntMap; 163 std::unordered_map<CoTask*, SyncData> m_waitTaskMap; 164 std::unordered_map<CoTask*, EventVec> m_cachedTaskEvents; 165 std::unordered_map<int, CoTask*> m_delFdCacheMap; 166 std::unordered_map<CoTask*, WakeDataList> m_maskWakeDataMap; 167 168 std::unique_ptr<std::thread> m_runner { nullptr }; // ffrt_io_poller thread 169 bool m_exitFlag { true }; // thread exit 170 bool m_teardown { false }; // process teardown 171 std::atomic<uint64_t> pollerCount_ { 0 }; 172 std::atomic<PollerState> m_state { PollerState::EXITED }; // worker state 173 174 std::array<queue*, QoS::MaxNum()> workQue; // queue(per qos) for execute async cb 175 }; 176 } 177 #endif 178