1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "io_poller.h"
17 #include "sched/execute_ctx.h"
18 #include "eu/co_routine.h"
19 #include "dfx/log/ffrt_log_api.h"
20 #include "ffrt_trace.h"
21 #include "internal_inc/assert.h"
22 #include "internal_inc/types.h"
23 #include "tm/scpu_task.h"
24 #include "util/name_manager.h"
25
26 namespace ffrt {
27 constexpr unsigned int DEFAULT_CPUINDEX_LIMIT = 7;
28 struct IOPollerInstance: public IOPoller {
IOPollerInstanceffrt::IOPollerInstance29 IOPollerInstance() noexcept: m_runner([&] { RunForever(); })
30 {
31 pthread_setname_np(m_runner.native_handle(), IO_POLLER_NAME);
32 }
33
RunForeverffrt::IOPollerInstance34 void RunForever() noexcept
35 {
36 struct sched_param param;
37 param.sched_priority = 1;
38 int ret = pthread_setschedparam(pthread_self(), SCHED_RR, ¶m);
39 if (ret != 0) {
40 FFRT_LOGE("[%d] set priority failed ret[%d] errno[%d]\n", pthread_self(), ret, errno);
41 }
42 while (!m_exitFlag.load(std::memory_order_relaxed)) {
43 IOPoller::PollOnce(-1);
44 }
45 }
46
~IOPollerInstanceffrt::IOPollerInstance47 ~IOPollerInstance() noexcept override
48 {
49 Stop();
50 m_runner.join();
51 }
52 private:
Stopffrt::IOPollerInstance53 void Stop() noexcept
54 {
55 m_exitFlag.store(true, std::memory_order_relaxed);
56 std::atomic_thread_fence(std::memory_order_acq_rel);
57 IOPoller::WakeUp();
58 }
59
60 private:
61 std::thread m_runner;
62 std::atomic<bool> m_exitFlag { false };
63 };
64
GetIOPoller()65 IOPoller& GetIOPoller() noexcept
66 {
67 static IOPollerInstance inst;
68 return inst;
69 }
70
IOPoller()71 IOPoller::IOPoller() noexcept: m_epFd { ::epoll_create1(EPOLL_CLOEXEC) },
72 m_events(32)
73 {
74 FFRT_ASSERT(m_epFd >= 0);
75 {
76 m_wakeData.data = nullptr;
77 m_wakeData.fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
78 FFRT_ASSERT(m_wakeData.fd >= 0);
79 epoll_event ev{ .events = EPOLLIN, .data = { .ptr = static_cast<void*>(&m_wakeData) } };
80 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, m_wakeData.fd, &ev) < 0) {
81 std::terminate();
82 }
83 }
84 }
85
~IOPoller()86 IOPoller::~IOPoller() noexcept
87 {
88 ::close(m_wakeData.fd);
89 ::close(m_epFd);
90 }
91
CasStrong(std::atomic<int> & a,int cmp,int exc)92 bool IOPoller::CasStrong(std::atomic<int>& a, int cmp, int exc)
93 {
94 return a.compare_exchange_strong(cmp, exc);
95 }
96
WakeUp()97 void IOPoller::WakeUp() noexcept
98 {
99 uint64_t one = 1;
100 ssize_t n = ::write(m_wakeData.fd, &one, sizeof one);
101 FFRT_ASSERT(n == sizeof one);
102 }
103
WaitFdEvent(int fd)104 void IOPoller::WaitFdEvent(int fd) noexcept
105 {
106 auto ctx = ExecuteCtx::Cur();
107 if (!ctx->task) {
108 FFRT_LOGI("nonworker shall not call this fun.");
109 return;
110 }
111 struct WakeData data = {.fd = fd, .data = static_cast<void *>(ctx->task)};
112
113 epoll_event ev = { .events = EPOLLIN, .data = {.ptr = static_cast<void*>(&data)} };
114 FFRT_BLOCK_TRACER(ctx->task->gid, fd);
115 if (!USE_COROUTINE || ctx->task->coRoutine->legacyMode) {
116 std::unique_lock<std::mutex> lck(ctx->task->lock);
117 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
118 ctx->task->coRoutine->blockType = BlockType::BLOCK_THREAD;
119 reinterpret_cast<SCPUEUTask*>(ctx->task)->childWaitCond_.wait(lck);
120 }
121 return;
122 }
123
124 CoWait([&](CPUEUTask *task)->bool {
125 (void)task;
126 if (epoll_ctl(m_epFd, EPOLL_CTL_ADD, fd, &ev) == 0) {
127 return true;
128 }
129 FFRT_LOGI("epoll_ctl add err:efd:=%d, fd=%d errorno = %d", m_epFd, fd, errno);
130 return false;
131 });
132 }
133
PollOnce(int timeout)134 void IOPoller::PollOnce(int timeout) noexcept
135 {
136 int ndfs = epoll_wait(m_epFd, m_events.data(), m_events.size(), timeout);
137 if (ndfs <= 0) {
138 FFRT_LOGE("epoll_wait error: efd = %d, errorno= %d", m_epFd, errno);
139 return;
140 }
141
142 for (unsigned int i = 0; i < static_cast<unsigned int>(ndfs); ++i) {
143 struct WakeData *data = reinterpret_cast<struct WakeData *>(m_events[i].data.ptr);
144
145 if (data->fd == m_wakeData.fd) {
146 uint64_t one = 1;
147 ssize_t n = ::read(m_wakeData.fd, &one, sizeof one);
148 FFRT_ASSERT(n == sizeof one);
149 continue;
150 }
151
152 if (epoll_ctl(m_epFd, EPOLL_CTL_DEL, data->fd, nullptr) == 0) {
153 auto task = reinterpret_cast<CPUEUTask *>(data->data);
154 bool blockThread = task != nullptr ?
155 (task->coRoutine != nullptr ? task->coRoutine->blockType == BlockType::BLOCK_THREAD : false) : false;
156 if (!USE_COROUTINE || blockThread) {
157 std::unique_lock<std::mutex> lck(task->lock);
158 if (blockThread) {
159 task->coRoutine->blockType = BlockType::BLOCK_COROUTINE;
160 }
161 reinterpret_cast<SCPUEUTask*>(task)->childWaitCond_.notify_one();
162 } else {
163 CoWake(task, false);
164 }
165 continue;
166 }
167
168 FFRT_LOGI("epoll_ctl fd = %d errorno = %d", data->fd, errno);
169 }
170 }
171 }
172