• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delayed_worker.h"
17 
18 #include <array>
19 #include <unistd.h>
20 #include <sstream>
21 #include <sys/prctl.h>
22 #include <sys/timerfd.h>
23 #include <thread>
24 #include <pthread.h>
25 #include "eu/blockaware.h"
26 #include "eu/execute_unit.h"
27 #include "dfx/log/ffrt_log_api.h"
28 #include "internal_inc/assert.h"
29 #include "util/name_manager.h"
30 #include "sched/scheduler.h"
31 #include "util/ffrt_facade.h"
32 namespace {
33 const uintptr_t FFRT_DELAY_WORKER_MAGICNUM = 0x5aa5;
34 const int FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS = 3 * 60;
35 const int EPOLL_WAIT_TIMEOUT__MILISECONDS = 3 * 60 * 1000;
36 const int NS_PER_SEC = 1000 * 1000 * 1000;
37 const int FAKE_WAKE_UP_ERROR = 4;
38 const int WAIT_EVENT_SIZE = 5;
39 const int64_t EXECUTION_TIMEOUT_MILISECONDS = 500;
40 const int DUMP_MAP_MAX_COUNT = 3;
41 constexpr int ASYNC_TASK_SLEEP_MS = 1;
42 }
43 
44 namespace ffrt {
45 pthread_key_t g_ffrtDelayWorkerFlagKey;
46 pthread_once_t g_ffrtDelayWorkerThreadKeyOnce = PTHREAD_ONCE_INIT;
FFRTDelayWorkeEnvKeyCreate()47 void FFRTDelayWorkeEnvKeyCreate()
48 {
49     pthread_key_create(&g_ffrtDelayWorkerFlagKey, nullptr);
50 }
51 
ThreadEnvCreate()52 void DelayedWorker::ThreadEnvCreate()
53 {
54     pthread_once(&g_ffrtDelayWorkerThreadKeyOnce, FFRTDelayWorkeEnvKeyCreate);
55 }
56 
IsDelayerWorkerThread()57 bool DelayedWorker::IsDelayerWorkerThread()
58 {
59     bool isDelayerWorkerFlag = false;
60     void* flag = pthread_getspecific(g_ffrtDelayWorkerFlagKey);
61     if ((flag != nullptr) && (reinterpret_cast<uintptr_t>(flag) == FFRT_DELAY_WORKER_MAGICNUM)) {
62         isDelayerWorkerFlag = true;
63     }
64     return isDelayerWorkerFlag;
65 }
66 
IsDelayedWorkerPreserved()67 bool IsDelayedWorkerPreserved()
68 {
69     std::unordered_set<std::string> whitelist = { "foundation", "com.ohos.sceneboard" };
70     if (whitelist.find(GetCurrentProcessName()) != whitelist.end()) {
71         return true;
72     }
73 
74     return false;
75 }
76 
DumpMap()77 void DelayedWorker::DumpMap()
78 {
79     lock.lock();
80     if (map.empty()) {
81         lock.unlock();
82         return;
83     }
84 
85     TimePoint now = std::chrono::steady_clock::now();
86     if (now < map.begin()->first) {
87         lock.unlock();
88         return;
89     }
90 
91     int count = 0;
92     std::stringstream ss;
93     int printCount = map.size() < DUMP_MAP_MAX_COUNT ? map.size() : DUMP_MAP_MAX_COUNT;
94     for (auto it = map.begin(); it != map.end() && count < DUMP_MAP_MAX_COUNT; ++it, ++count) {
95         ss << it->first.time_since_epoch().count();
96         if (count < printCount - 1) {
97             ss << ",";
98         }
99     }
100     lock.unlock();
101     FFRT_LOGW("DumpMap:now=%lu,%s", now.time_since_epoch().count(), ss.str().c_str());
102 }
103 
ThreadInit()104 void DelayedWorker::ThreadInit()
105 {
106     if (delayWorker != nullptr && delayWorker->joinable()) {
107         delayWorker->join();
108     }
109     delayWorker = std::make_unique<std::thread>([this]() {
110         struct sched_param param;
111         param.sched_priority = 1;
112         int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
113         if (ret != 0) {
114             FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
115         } else {
116             FFRT_LOGW("delayedWorker init");
117         }
118         prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
119         pthread_setspecific(g_ffrtDelayWorkerFlagKey, reinterpret_cast<void*>(FFRT_DELAY_WORKER_MAGICNUM));
120         std::array<epoll_event, WAIT_EVENT_SIZE> waitedEvents;
121         static bool preserved = IsDelayedWorkerPreserved();
122         for (;;) {
123             std::unique_lock lk(lock);
124             if (toExit) {
125                 exited_ = true;
126                 FFRT_LOGW("delayedWorker exit");
127                 break;
128             }
129             int result = HandleWork();
130             if (toExit) {
131                 exited_ = true;
132                 FFRT_LOGW("delayedWorker exit");
133                 break;
134             }
135             if (result == 0) {
136                 uint64_t ns = map.begin()->first.time_since_epoch().count();
137                 itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
138                 ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
139                 if (ret != 0) {
140                     FFRT_LOGE("timerfd_settime error,ns=%lu,ret= %d.", ns, ret);
141                 }
142             } else if ((result == 1) && (!preserved)) {
143                 if (++noTaskDelayCount_ > 1 && ffrt::FFRTFacade::GetEUInstance().GetWorkerNum() == 0) {
144                     exited_ = true;
145                     FFRT_LOGW("delayedWorker exit");
146                     break;
147                 }
148                 itimerspec its = { {0, 0}, {FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS, 0} };
149                 ret = timerfd_settime(timerfd_, 0, &its, nullptr);
150                 if (ret != 0) {
151                     FFRT_LOGE("timerfd_settime error, ret= %d.", ret);
152                 }
153             }
154             lk.unlock();
155 
156             int nfds = epoll_wait(epollfd_, waitedEvents.data(), waitedEvents.size(),
157                 EPOLL_WAIT_TIMEOUT__MILISECONDS);
158             if (nfds == 0) {
159                 DumpMap();
160             }
161 
162             if (nfds < 0) {
163                 if (errno != FAKE_WAKE_UP_ERROR) {
164                     FFRT_LOGW("epoll_wait error, errorno= %d.", errno);
165                 }
166                 continue;
167             }
168 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
169             for (int i = 0; i < nfds; i++) {
170                 if (waitedEvents[i].data.fd == monitorfd_) {
171                     char buffer;
172                     size_t n = ::read(monitorfd_, &buffer, sizeof buffer);
173                     if (n == 1) {
174                         monitor->MonitorMain();
175                     } else {
176                         FFRT_LOGE("monitor read fail:%d, %s", n, errno);
177                     }
178                     break;
179                 }
180             }
181 #endif
182         }
183     });
184 }
185 
DelayedWorker()186 DelayedWorker::DelayedWorker(): epollfd_ { ::epoll_create1(EPOLL_CLOEXEC) },
187     timerfd_ { ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC) }
188 {
189     FFRT_ASSERT(epollfd_ >= 0);
190     FFRT_ASSERT(timerfd_ >= 0);
191 
192     epoll_event timer_event { .events = EPOLLIN | EPOLLET, .data = { .fd = timerfd_ } };
193     if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, timerfd_, &timer_event) < 0) {
194         FFRT_LOGE("epoll_ctl add tfd error: efd=%d, fd=%d, errorno=%d", epollfd_, timerfd_, errno);
195         std::terminate();
196     }
197 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
198     monitor = ExecuteUnit::Instance().GetCPUMonitor();
199     monitorfd_ = BlockawareMonitorfd(-1, monitor->WakeupCond());
200     FFRT_ASSERT(monitorfd_ >= 0);
201     FFRT_LOGI("timerfd:%d, monitorfd:%d", timerfd_, monitorfd_);
202     /* monitorfd does not support 'CLOEXEC', and current kernel does not inherit monitorfd after 'fork'.
203      * 1. if user calls 'exec' directly after 'fork' and does not use ffrt, it's ok.
204      * 2. if user calls 'exec' directly, the original process cannot close monitorfd automatically, and
205      * it will be fail when new program use ffrt to create monitorfd.
206      */
207     epoll_event monitor_event {.events = EPOLLIN, .data = {.fd = monitorfd_}};
208     int ret = epoll_ctl(epollfd_, EPOLL_CTL_ADD, monitorfd_, &monitor_event);
209     if (ret < 0) {
210         FFRT_LOGE("monitor:%d add fail, ret:%d, errno:%d, %s", monitorfd_, ret, errno, strerror(errno));
211     }
212 #endif
213 }
214 
~DelayedWorker()215 DelayedWorker::~DelayedWorker()
216 {
217     lock.lock();
218     toExit = true;
219     lock.unlock();
220     itimerspec its = { {0, 0}, {0, 1} };
221     timerfd_settime(timerfd_, 0, &its, nullptr);
222     if (delayWorker != nullptr && delayWorker->joinable()) {
223         delayWorker->join();
224     }
225     while (asyncTaskCnt_.load() > 0) {
226         std::this_thread::sleep_for(std::chrono::microseconds(ASYNC_TASK_SLEEP_MS));
227     }
228 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
229     ::close(monitorfd_);
230 #endif
231     ::close(timerfd_);
232 }
233 
GetInstance()234 DelayedWorker& DelayedWorker::GetInstance()
235 {
236     static DelayedWorker instance;
237     return instance;
238 }
239 
CheckTimeInterval(const TimePoint & startTp,const TimePoint & endTp)240 void CheckTimeInterval(const TimePoint& startTp, const TimePoint& endTp)
241 {
242     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTp - startTp);
243     int64_t durationMs = duration.count();
244     if (durationMs > EXECUTION_TIMEOUT_MILISECONDS) {
245         FFRT_LOGW("handle work more than [%lld]ms", durationMs);
246     }
247 }
248 
HandleWork()249 int DelayedWorker::HandleWork()
250 {
251     if (!map.empty()) {
252         noTaskDelayCount_ = 0;
253         TimePoint startTp = std::chrono::steady_clock::now();
254         do {
255             auto cur = map.begin();
256             if (!toExit && cur != map.end() && cur->first <= startTp) {
257                 DelayedWork w = cur->second;
258                 map.erase(cur);
259                 lock.unlock();
260                 std::function<void(WaitEntry*)> workCb = *w.cb;
261                 (workCb)(w.we);
262                 lock.lock();
263                 FFRT_COND_DO_ERR(toExit, return -1, "HandleWork exit, map size:%d", map.size());
264                 TimePoint endTp = std::chrono::steady_clock::now();
265                 CheckTimeInterval(startTp, endTp);
266                 startTp = std::move(endTp);
267             } else {
268                 return 0;
269             }
270         } while (!map.empty());
271     }
272     return 1;
273 }
274 
275 // There is no requirement that to be less than now
dispatch(const TimePoint & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)276 bool DelayedWorker::dispatch(const TimePoint& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
277 {
278     bool w = false;
279     lock.lock();
280     if (toExit) {
281         lock.unlock();
282         FFRT_LOGE("DelayedWorker destroy, dispatch failed\n");
283         return false;
284     }
285 
286     TimePoint now = std::chrono::steady_clock::now();
287     if (to <= now) {
288         lock.unlock();
289         return false;
290     }
291 
292     if (exited_) {
293         ThreadInit();
294         exited_ = false;
295     }
296 
297     if (map.empty() || to < map.begin()->first) {
298         w = true;
299     }
300     map.emplace(to, DelayedWork {we, &wakeup});
301     if (w) {
302         uint64_t ns = static_cast<uint64_t>(to.time_since_epoch().count());
303         itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
304         int ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
305         if (ret != 0) {
306             FFRT_LOGE("timerfd_settime error, ns=%lu, ret= %d.", ns, ret);
307         }
308     }
309     lock.unlock();
310     return true;
311 }
312 
remove(const TimePoint & to,WaitEntry * we)313 bool DelayedWorker::remove(const TimePoint& to, WaitEntry* we)
314 {
315     std::lock_guard<decltype(lock)> l(lock);
316 
317     auto range = map.equal_range(to);
318     for (auto it = range.first; it != range.second; ++it) {
319         if (it->second.we == we) {
320             map.erase(it);
321             return true;
322         }
323     }
324 
325     return false;
326 }
327 
SubmitAsyncTask(std::function<void ()> && func)328 void DelayedWorker::SubmitAsyncTask(std::function<void()>&& func)
329 {
330     asyncTaskCnt_.fetch_add(1);
331     ffrt::submit([this, func = std::move(func)]() {
332         if (toExit) {
333             asyncTaskCnt_.fetch_sub(1);
334             return;
335         }
336 
337         func();
338         asyncTaskCnt_.fetch_sub(1);
339         }, {}, {this}, ffrt::task_attr().qos(qos_background));
340 }
341 } // namespace ffrt