1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "delayed_worker.h"
17
18 #include <array>
19 #include <unistd.h>
20 #include <sstream>
21 #include <sys/prctl.h>
22 #include <sys/timerfd.h>
23 #include <thread>
24 #include <pthread.h>
25 #include "eu/blockaware.h"
26 #include "eu/execute_unit.h"
27 #include "dfx/log/ffrt_log_api.h"
28 #include "internal_inc/assert.h"
29 #include "util/name_manager.h"
30 #include "sched/scheduler.h"
31 #include "util/ffrt_facade.h"
32 namespace {
33 const uintptr_t FFRT_DELAY_WORKER_MAGICNUM = 0x5aa5;
34 const int FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS = 3 * 60;
35 const int EPOLL_WAIT_TIMEOUT__MILISECONDS = 3 * 60 * 1000;
36 const int NS_PER_SEC = 1000 * 1000 * 1000;
37 const int FAKE_WAKE_UP_ERROR = 4;
38 const int WAIT_EVENT_SIZE = 5;
39 const int64_t EXECUTION_TIMEOUT_MILISECONDS = 500;
40 const int DUMP_MAP_MAX_COUNT = 3;
41 constexpr int ASYNC_TASK_SLEEP_MS = 1;
42 }
43
44 namespace ffrt {
45 pthread_key_t g_ffrtDelayWorkerFlagKey;
46 pthread_once_t g_ffrtDelayWorkerThreadKeyOnce = PTHREAD_ONCE_INIT;
FFRTDelayWorkeEnvKeyCreate()47 void FFRTDelayWorkeEnvKeyCreate()
48 {
49 pthread_key_create(&g_ffrtDelayWorkerFlagKey, nullptr);
50 }
51
ThreadEnvCreate()52 void DelayedWorker::ThreadEnvCreate()
53 {
54 pthread_once(&g_ffrtDelayWorkerThreadKeyOnce, FFRTDelayWorkeEnvKeyCreate);
55 }
56
IsDelayerWorkerThread()57 bool DelayedWorker::IsDelayerWorkerThread()
58 {
59 bool isDelayerWorkerFlag = false;
60 void* flag = pthread_getspecific(g_ffrtDelayWorkerFlagKey);
61 if ((flag != nullptr) && (reinterpret_cast<uintptr_t>(flag) == FFRT_DELAY_WORKER_MAGICNUM)) {
62 isDelayerWorkerFlag = true;
63 }
64 return isDelayerWorkerFlag;
65 }
66
IsDelayedWorkerPreserved()67 bool IsDelayedWorkerPreserved()
68 {
69 std::unordered_set<std::string> whitelist = { "foundation", "com.ohos.sceneboard" };
70 if (whitelist.find(GetCurrentProcessName()) != whitelist.end()) {
71 return true;
72 }
73
74 return false;
75 }
76
DumpMap()77 void DelayedWorker::DumpMap()
78 {
79 lock.lock();
80 if (map.empty()) {
81 lock.unlock();
82 return;
83 }
84
85 TimePoint now = std::chrono::steady_clock::now();
86 if (now < map.begin()->first) {
87 lock.unlock();
88 return;
89 }
90
91 int count = 0;
92 std::stringstream ss;
93 int printCount = map.size() < DUMP_MAP_MAX_COUNT ? map.size() : DUMP_MAP_MAX_COUNT;
94 for (auto it = map.begin(); it != map.end() && count < DUMP_MAP_MAX_COUNT; ++it, ++count) {
95 ss << it->first.time_since_epoch().count();
96 if (count < printCount - 1) {
97 ss << ",";
98 }
99 }
100 lock.unlock();
101 FFRT_LOGW("DumpMap:now=%lu,%s", now.time_since_epoch().count(), ss.str().c_str());
102 }
103
ThreadInit()104 void DelayedWorker::ThreadInit()
105 {
106 if (delayWorker != nullptr && delayWorker->joinable()) {
107 delayWorker->join();
108 }
109 delayWorker = std::make_unique<std::thread>([this]() {
110 struct sched_param param;
111 param.sched_priority = 1;
112 int ret = pthread_setschedparam(pthread_self(), SCHED_RR, ¶m);
113 if (ret != 0) {
114 FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
115 } else {
116 FFRT_LOGW("delayedWorker init");
117 }
118 prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
119 pthread_setspecific(g_ffrtDelayWorkerFlagKey, reinterpret_cast<void*>(FFRT_DELAY_WORKER_MAGICNUM));
120 std::array<epoll_event, WAIT_EVENT_SIZE> waitedEvents;
121 static bool preserved = IsDelayedWorkerPreserved();
122 for (;;) {
123 std::unique_lock lk(lock);
124 if (toExit) {
125 exited_ = true;
126 FFRT_LOGW("delayedWorker exit");
127 break;
128 }
129 int result = HandleWork();
130 if (toExit) {
131 exited_ = true;
132 FFRT_LOGW("delayedWorker exit");
133 break;
134 }
135 if (result == 0) {
136 uint64_t ns = map.begin()->first.time_since_epoch().count();
137 itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
138 ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
139 if (ret != 0) {
140 FFRT_LOGE("timerfd_settime error,ns=%lu,ret= %d.", ns, ret);
141 }
142 } else if ((result == 1) && (!preserved)) {
143 if (++noTaskDelayCount_ > 1 && ffrt::FFRTFacade::GetEUInstance().GetWorkerNum() == 0) {
144 exited_ = true;
145 FFRT_LOGW("delayedWorker exit");
146 break;
147 }
148 itimerspec its = { {0, 0}, {FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS, 0} };
149 ret = timerfd_settime(timerfd_, 0, &its, nullptr);
150 if (ret != 0) {
151 FFRT_LOGE("timerfd_settime error, ret= %d.", ret);
152 }
153 }
154 lk.unlock();
155
156 int nfds = epoll_wait(epollfd_, waitedEvents.data(), waitedEvents.size(),
157 EPOLL_WAIT_TIMEOUT__MILISECONDS);
158 if (nfds == 0) {
159 DumpMap();
160 }
161
162 if (nfds < 0) {
163 if (errno != FAKE_WAKE_UP_ERROR) {
164 FFRT_LOGW("epoll_wait error, errorno= %d.", errno);
165 }
166 continue;
167 }
168 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
169 for (int i = 0; i < nfds; i++) {
170 if (waitedEvents[i].data.fd == monitorfd_) {
171 char buffer;
172 size_t n = ::read(monitorfd_, &buffer, sizeof buffer);
173 if (n == 1) {
174 monitor->MonitorMain();
175 } else {
176 FFRT_LOGE("monitor read fail:%d, %s", n, errno);
177 }
178 break;
179 }
180 }
181 #endif
182 }
183 });
184 }
185
DelayedWorker()186 DelayedWorker::DelayedWorker(): epollfd_ { ::epoll_create1(EPOLL_CLOEXEC) },
187 timerfd_ { ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC) }
188 {
189 FFRT_ASSERT(epollfd_ >= 0);
190 FFRT_ASSERT(timerfd_ >= 0);
191
192 epoll_event timer_event { .events = EPOLLIN | EPOLLET, .data = { .fd = timerfd_ } };
193 if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, timerfd_, &timer_event) < 0) {
194 FFRT_LOGE("epoll_ctl add tfd error: efd=%d, fd=%d, errorno=%d", epollfd_, timerfd_, errno);
195 std::terminate();
196 }
197 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
198 monitor = ExecuteUnit::Instance().GetCPUMonitor();
199 monitorfd_ = BlockawareMonitorfd(-1, monitor->WakeupCond());
200 FFRT_ASSERT(monitorfd_ >= 0);
201 FFRT_LOGI("timerfd:%d, monitorfd:%d", timerfd_, monitorfd_);
202 /* monitorfd does not support 'CLOEXEC', and current kernel does not inherit monitorfd after 'fork'.
203 * 1. if user calls 'exec' directly after 'fork' and does not use ffrt, it's ok.
204 * 2. if user calls 'exec' directly, the original process cannot close monitorfd automatically, and
205 * it will be fail when new program use ffrt to create monitorfd.
206 */
207 epoll_event monitor_event {.events = EPOLLIN, .data = {.fd = monitorfd_}};
208 int ret = epoll_ctl(epollfd_, EPOLL_CTL_ADD, monitorfd_, &monitor_event);
209 if (ret < 0) {
210 FFRT_LOGE("monitor:%d add fail, ret:%d, errno:%d, %s", monitorfd_, ret, errno, strerror(errno));
211 }
212 #endif
213 }
214
~DelayedWorker()215 DelayedWorker::~DelayedWorker()
216 {
217 lock.lock();
218 toExit = true;
219 lock.unlock();
220 itimerspec its = { {0, 0}, {0, 1} };
221 timerfd_settime(timerfd_, 0, &its, nullptr);
222 if (delayWorker != nullptr && delayWorker->joinable()) {
223 delayWorker->join();
224 }
225 while (asyncTaskCnt_.load() > 0) {
226 std::this_thread::sleep_for(std::chrono::microseconds(ASYNC_TASK_SLEEP_MS));
227 }
228 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
229 ::close(monitorfd_);
230 #endif
231 ::close(timerfd_);
232 }
233
GetInstance()234 DelayedWorker& DelayedWorker::GetInstance()
235 {
236 static DelayedWorker instance;
237 return instance;
238 }
239
CheckTimeInterval(const TimePoint & startTp,const TimePoint & endTp)240 void CheckTimeInterval(const TimePoint& startTp, const TimePoint& endTp)
241 {
242 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTp - startTp);
243 int64_t durationMs = duration.count();
244 if (durationMs > EXECUTION_TIMEOUT_MILISECONDS) {
245 FFRT_LOGW("handle work more than [%lld]ms", durationMs);
246 }
247 }
248
HandleWork()249 int DelayedWorker::HandleWork()
250 {
251 if (!map.empty()) {
252 noTaskDelayCount_ = 0;
253 TimePoint startTp = std::chrono::steady_clock::now();
254 do {
255 auto cur = map.begin();
256 if (!toExit && cur != map.end() && cur->first <= startTp) {
257 DelayedWork w = cur->second;
258 map.erase(cur);
259 lock.unlock();
260 std::function<void(WaitEntry*)> workCb = *w.cb;
261 (workCb)(w.we);
262 lock.lock();
263 FFRT_COND_DO_ERR(toExit, return -1, "HandleWork exit, map size:%d", map.size());
264 TimePoint endTp = std::chrono::steady_clock::now();
265 CheckTimeInterval(startTp, endTp);
266 startTp = std::move(endTp);
267 } else {
268 return 0;
269 }
270 } while (!map.empty());
271 }
272 return 1;
273 }
274
275 // There is no requirement that to be less than now
dispatch(const TimePoint & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)276 bool DelayedWorker::dispatch(const TimePoint& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
277 {
278 bool w = false;
279 lock.lock();
280 if (toExit) {
281 lock.unlock();
282 FFRT_LOGE("DelayedWorker destroy, dispatch failed\n");
283 return false;
284 }
285
286 TimePoint now = std::chrono::steady_clock::now();
287 if (to <= now) {
288 lock.unlock();
289 return false;
290 }
291
292 if (exited_) {
293 ThreadInit();
294 exited_ = false;
295 }
296
297 if (map.empty() || to < map.begin()->first) {
298 w = true;
299 }
300 map.emplace(to, DelayedWork {we, &wakeup});
301 if (w) {
302 uint64_t ns = static_cast<uint64_t>(to.time_since_epoch().count());
303 itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
304 int ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
305 if (ret != 0) {
306 FFRT_LOGE("timerfd_settime error, ns=%lu, ret= %d.", ns, ret);
307 }
308 }
309 lock.unlock();
310 return true;
311 }
312
remove(const TimePoint & to,WaitEntry * we)313 bool DelayedWorker::remove(const TimePoint& to, WaitEntry* we)
314 {
315 std::lock_guard<decltype(lock)> l(lock);
316
317 auto range = map.equal_range(to);
318 for (auto it = range.first; it != range.second; ++it) {
319 if (it->second.we == we) {
320 map.erase(it);
321 return true;
322 }
323 }
324
325 return false;
326 }
327
SubmitAsyncTask(std::function<void ()> && func)328 void DelayedWorker::SubmitAsyncTask(std::function<void()>&& func)
329 {
330 asyncTaskCnt_.fetch_add(1);
331 ffrt::submit([this, func = std::move(func)]() {
332 if (toExit) {
333 asyncTaskCnt_.fetch_sub(1);
334 return;
335 }
336
337 func();
338 asyncTaskCnt_.fetch_sub(1);
339 }, {}, {this}, ffrt::task_attr().qos(qos_background));
340 }
341 } // namespace ffrt