• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FFRT_POLLER_MANAGER_H
17 #define FFRT_POLLER_MANAGER_H
18 
19 #ifndef _MSC_VER
20 #include <sys/epoll.h>
21 #include <sys/eventfd.h>
22 #endif
23 #include <list>
24 #include <thread>
25 #include <unordered_map>
26 #include <array>
27 #ifdef USE_OHOS_QOS
28 #include "qos.h"
29 #else
30 #include "staging_qos/sched/qos.h"
31 #endif
32 #include "sync/sync.h"
33 #include "internal_inc/non_copyable.h"
34 #include "c/executor_task.h"
35 #include "sync/poller.h"
36 #include "tm/task_base.h"
37 #ifdef FFRT_ENABLE_HITRACE_CHAIN
38 #include "dfx/trace/ffrt_trace_chain.h"
39 #endif
40 
41 namespace ffrt {
42 enum class PollerState {
43     HANDLING, // worker执行事件回调(如果是同步回调函数执行有可能阻塞worker)
44     POLLING, // worker处于epoll_wait睡眠(事件响应)
45     EXITED, // worker没有事件时销毁线程(重新注册时触发创建线程)
46 };
47 
48 // 根据历史继承的能力
49 enum class PollerType {
50     WAKEUP,
51     SYNC_IO,
52     ASYNC_CB,
53     ASYNC_IO,
54 };
55 
56 struct WakeData {
WakeDataWakeData57     WakeData() {}
WakeDataWakeData58     WakeData(int fdVal, CoTask *taskVal) : fd(fdVal), task(taskVal)
59     {
60         mode = PollerType::SYNC_IO;
61     }
WakeDataWakeData62     WakeData(int fdVal, void *dataVal, std::function<void(void *, uint32_t)> cbVal, CoTask *taskVal)
63         : fd(fdVal), data(dataVal), cb(cbVal), task(taskVal)
64     {
65         if (cb == nullptr) {
66             mode = PollerType::ASYNC_IO;
67         } else {
68             mode = PollerType::ASYNC_CB;
69 #ifdef FFRT_ENABLE_HITRACE_CHAIN
70             if (TraceChainAdapter::Instance().HiTraceChainGetId().valid == HITRACE_ID_VALID) {
71                 traceId = TraceChainAdapter::Instance().HiTraceChainCreateSpan();
72             };
73 #endif
74         }
75     }
76 
77     PollerType mode;
78     int fd = 0;
79     void* data = nullptr;
80     std::function<void(void*, uint32_t)> cb = nullptr;
81     CoTask* task = nullptr;
82     uint32_t monitorEvents = 0;
83     HiTraceIdStruct traceId;
84 };
85 
86 struct TimeOutReport {
TimeOutReportTimeOutReport87     TimeOutReport() {}
88     std::atomic<uint64_t> cbStartTime = 0; // block info report
89     uint64_t reportCount = 0;
90 };
91 
92 using EventVec = typename std::vector<epoll_event>;
93 class IOPoller : private NonCopyable {
94     static constexpr int EPOLL_EVENT_SIZE = 1024;
95     using WakeDataList = typename std::list<std::unique_ptr<struct WakeData>>;
96 public:
97     static IOPoller& Instance();
98     ~IOPoller() noexcept;
99 
100     int AddFdEvent(int op, uint32_t events, int fd, void* data, ffrt_poller_cb cb) noexcept;
101     int DelFdEvent(int fd) noexcept;
102     int WaitFdEvent(struct epoll_event *eventsVec, int maxevents, int timeout) noexcept;
103     void WaitFdEvent(int fd) noexcept;
104 
GetPollCount()105     inline uint64_t GetPollCount() noexcept
106     {
107         return pollerCount_;
108     }
109 
GetTaskWaitTime(CoTask * task)110     inline uint64_t GetTaskWaitTime(CoTask* task) noexcept
111     {
112         std::lock_guard lock(m_mapMutex);
113         auto iter = m_waitTaskMap.find(task);
114         if (iter == m_waitTaskMap.end()) {
115             return 0;
116         }
117         return std::chrono::duration_cast<std::chrono::seconds>(
118             iter->second.waitTP.time_since_epoch()).count();
119     }
120 
ClearCachedEvents(CoTask * task)121     inline void ClearCachedEvents(CoTask* task) noexcept
122     {
123         std::lock_guard lock(m_mapMutex);
124         auto iter = m_cachedTaskEvents.find(task);
125         if (iter == m_cachedTaskEvents.end()) {
126             return;
127         }
128         m_cachedTaskEvents.erase(iter);
129         ClearMaskWakeDataCache(task);
130     }
131 
132     void WakeUp() noexcept;
133     void WakeTimeoutTask(CoTask* task) noexcept;
134     void MonitTimeOut();
135 
136 private:
137     IOPoller() noexcept;
138 
139     void ThreadInit();
140     void Run();
141     int PollOnce(int timeout = -1) noexcept;
142 
143     void ReleaseFdWakeData() noexcept;
144     void WakeSyncTask(std::unordered_map<CoTask*, EventVec>& syncTaskEvents) noexcept;
145 
146     void CacheEventsAndDoMask(CoTask* task, EventVec& eventVec) noexcept;
147     int FetchCachedEventAndDoUnmask(CoTask* task, struct epoll_event* eventsVec) noexcept;
148     int FetchCachedEventAndDoUnmask(EventVec& cachedEventsVec, struct epoll_event* eventsVec) noexcept;
149     void CacheMaskFdAndEpollDel(int fd, CoTask *task) noexcept;
150     int ClearMaskWakeDataCache(CoTask *task) noexcept;
151     int ClearMaskWakeDataCacheWithFd(CoTask *task, int fd) noexcept;
152     int ClearDelFdCache(int fd) noexcept;
153 
154     int m_epFd; // epoll fd
155     struct WakeData m_wakeData; // self wakeup fd
156     mutable spin_mutex m_mapMutex;
157     struct TimeOutReport timeOutReport;
158 
159     std::atomic_uint64_t m_syncFdCnt { 0 }; // record sync fd counts
160     // record async fd and events
161     std::unordered_map<int, WakeDataList> m_wakeDataMap;
162     std::unordered_map<int, int> m_delCntMap;
163     std::unordered_map<CoTask*, SyncData> m_waitTaskMap;
164     std::unordered_map<CoTask*, EventVec> m_cachedTaskEvents;
165     std::unordered_map<int, CoTask*> m_delFdCacheMap;
166     std::unordered_map<CoTask*, WakeDataList> m_maskWakeDataMap;
167 
168     std::unique_ptr<std::thread> m_runner { nullptr }; // ffrt_io_poller thread
169     bool m_exitFlag { true }; // thread exit
170     bool m_teardown { false }; // process teardown
171     std::atomic<uint64_t> pollerCount_ { 0 };
172     std::atomic<PollerState> m_state { PollerState::EXITED }; // worker state
173 
174     std::array<queue*, QoS::MaxNum()> workQue; // queue(per qos) for execute async cb
175 };
176 }
177 #endif
178