• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sync/delayed_worker.h"
17 
18 #include <array>
19 #include <unistd.h>
20 #include <sstream>
21 #include <sys/prctl.h>
22 #include <sys/timerfd.h>
23 #include <thread>
24 #include <pthread.h>
25 #include "eu/blockaware.h"
26 #include "eu/execute_unit.h"
27 #include "dfx/log/ffrt_log_api.h"
28 #include "util/name_manager.h"
29 #include "sched/scheduler.h"
30 #include "util/ffrt_facade.h"
31 #include "util/white_list.h"
32 
33 namespace {
34 const uintptr_t FFRT_DELAY_WORKER_MAGICNUM = 0x5aa5;
35 const int FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS = 3 * 60;
36 const int EPOLL_WAIT_TIMEOUT_MILLISECONDS = 3 * 60 * 1000;
37 const int NS_PER_SEC = 1000 * 1000 * 1000;
38 const int FAKE_WAKE_UP_ERROR = 4;
39 const int WAIT_EVENT_SIZE = 5;
40 const int64_t EXECUTION_TIMEOUT_MILLISECONDS = 500;
41 const int DUMP_MAP_MAX_COUNT = 3;
42 constexpr int ASYNC_TASK_SLEEP_MS = 1;
43 }
44 
45 namespace ffrt {
46 pthread_key_t g_ffrtDelayWorkerFlagKey;
47 pthread_once_t g_ffrtDelayWorkerThreadKeyOnce = PTHREAD_ONCE_INIT;
FFRTDelayedWorkerEnvKeyCreate()48 void FFRTDelayedWorkerEnvKeyCreate()
49 {
50     pthread_key_create(&g_ffrtDelayWorkerFlagKey, nullptr);
51 }
52 
ThreadEnvCreate()53 void DelayedWorker::ThreadEnvCreate()
54 {
55     pthread_once(&g_ffrtDelayWorkerThreadKeyOnce, FFRTDelayedWorkerEnvKeyCreate);
56 }
57 
IsDelayerWorkerThread()58 bool DelayedWorker::IsDelayerWorkerThread()
59 {
60     bool isDelayerWorkerFlag = false;
61     void* flag = pthread_getspecific(g_ffrtDelayWorkerFlagKey);
62     if ((flag != nullptr) && (reinterpret_cast<uintptr_t>(flag) == FFRT_DELAY_WORKER_MAGICNUM)) {
63         isDelayerWorkerFlag = true;
64     }
65     return isDelayerWorkerFlag;
66 }
67 
IsDelayedWorkerPreserved()68 bool IsDelayedWorkerPreserved()
69 {
70     return WhiteList::GetInstance().IsEnabled("IsDelayedWorkerPreserved", false);
71 }
72 
DumpMap()73 void DelayedWorker::DumpMap()
74 {
75     std::lock_guard lg(lock);
76     if (map.empty()) {
77         return;
78     }
79 
80     TimePoint now = std::chrono::steady_clock::now();
81     if (now < map.begin()->first) {
82         return;
83     }
84 
85     int count = 0;
86     std::stringstream ss;
87     int printCount = map.size() < DUMP_MAP_MAX_COUNT ? map.size() : DUMP_MAP_MAX_COUNT;
88     for (auto it = map.begin(); it != map.end() && count < DUMP_MAP_MAX_COUNT; ++it, ++count) {
89         ss << it->first.time_since_epoch().count();
90         if (count < printCount - 1) {
91             ss << ",";
92         }
93     }
94     FFRT_SYSEVENT_LOGW("DumpMap:now=%lu,%s", now.time_since_epoch().count(), ss.str().c_str());
95 }
96 
ThreadInit()97 void DelayedWorker::ThreadInit()
98 {
99     if (delayedWorker != nullptr && delayedWorker->joinable()) {
100         delayedWorker->join();
101     }
102     delayedWorker = std::make_unique<std::thread>([this]() {
103         struct sched_param param;
104         param.sched_priority = 1;
105         int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
106         if (ret != 0) {
107             FFRT_LOGW("[%d] set priority warn ret[%d] eno[%d]\n", pthread_self(), ret, errno);
108         } else {
109             FFRT_LOGW("delayedWorker init");
110         }
111         prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
112         pthread_setspecific(g_ffrtDelayWorkerFlagKey, reinterpret_cast<void*>(FFRT_DELAY_WORKER_MAGICNUM));
113         std::array<epoll_event, WAIT_EVENT_SIZE> waitedEvents;
114         static bool preserved = IsDelayedWorkerPreserved();
115         for (;;) {
116             std::unique_lock lk(lock);
117             if (toExit) {
118                 exited_ = true;
119                 FFRT_LOGW("delayedWorker exit");
120                 break;
121             }
122             int result = HandleWork();
123             if (toExit) {
124                 exited_ = true;
125                 FFRT_LOGW("delayedWorker exit");
126                 break;
127             }
128             if (result == 0) {
129                 uint64_t ns = map.begin()->first.time_since_epoch().count();
130                 itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
131                 ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
132                 if (ret != 0) {
133                     FFRT_SYSEVENT_LOGE("timerfd_settime error,ns=%lu,ret= %d.", ns, ret);
134                 }
135             } else if ((result == 1) && (!preserved)) {
136                 if (++noTaskDelayCount_ > 1 && ffrt::FFRTFacade::GetEUInstance().GetWorkerNum() == 0) {
137                     exited_ = true;
138                     FFRT_LOGW("delayedWorker exit");
139                     break;
140                 }
141                 itimerspec its = { {0, 0}, {FFRT_DELAY_WORKER_IDLE_TIMEOUT_SECONDS, 0} };
142                 ret = timerfd_settime(timerfd_, 0, &its, nullptr);
143                 if (ret != 0) {
144                     FFRT_SYSEVENT_LOGE("timerfd_settime error, ret= %d.", ret);
145                 }
146             }
147             lk.unlock();
148 
149             int nfds = epoll_wait(epollfd_, waitedEvents.data(), waitedEvents.size(),
150                 EPOLL_WAIT_TIMEOUT_MILLISECONDS);
151             if (nfds == 0) {
152                 DumpMap();
153             }
154 
155             if (nfds < 0) {
156                 if (errno != FAKE_WAKE_UP_ERROR) {
157                     FFRT_SYSEVENT_LOGW("epoll_wait error, errorno= %d.", errno);
158                 }
159                 continue;
160             }
161 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
162             for (int i = 0; i < nfds; i++) {
163                 if (waitedEvents[i].data.fd == monitorfd_) {
164                     char buffer;
165                     size_t n = ::read(monitorfd_, &buffer, sizeof buffer);
166                     if (n == 1) {
167                         ExecuteUnit::Instance().MonitorMain();
168                     } else {
169                         FFRT_SYSEVENT_LOGE("monitor read fail:%d, %s", n, errno);
170                     }
171                     break;
172                 }
173             }
174 #endif
175         }
176     });
177 }
178 
DelayedWorker()179 DelayedWorker::DelayedWorker()
180 {
181     epollfd_ = ::epoll_create1(EPOLL_CLOEXEC);
182     if (epollfd_ < 0) {
183         FFRT_LOGE("epoll_create1 failed: errno=%d", errno);
184     }
185     timerfd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
186     if (timerfd_ < 0) {
187         FFRT_LOGE("timerfd_create failed: errno=%d", errno);
188     }
189 
190     epoll_event timer_event { .events = EPOLLIN | EPOLLET, .data = { .fd = timerfd_ } };
191     if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, timerfd_, &timer_event) < 0) {
192         FFRT_COND_TERMINATE((epoll_ctl(epollfd_, EPOLL_CTL_ADD, timerfd_, &timer_event) < 0),
193             "epoll_ctl add tfd error: efd=%d, fd=%d, errorno=%d", epollfd_, timerfd_, errno);
194     }
195     DelayedWorker::ThreadEnvCreate();
196 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
197     monitorfd_ = BlockawareMonitorfd(-1, ExecuteUnit::Instance().WakeupCond());
198     FFRT_LOGI("timerfd:%d, monitorfd:%d", timerfd_, monitorfd_);
199     /* monitorfd does not support 'CLOEXEC', and current kernel does not inherit monitorfd after 'fork'.
200      * 1. if user calls 'exec' directly after 'fork' and does not use ffrt, it's ok.
201      * 2. if user calls 'exec' directly, the original process cannot close monitorfd automatically, and
202      * it will be fail when new program use ffrt to create monitorfd.
203      */
204     epoll_event monitor_event {.events = EPOLLIN, .data = {.fd = monitorfd_}};
205     int ret = epoll_ctl(epollfd_, EPOLL_CTL_ADD, monitorfd_, &monitor_event);
206     if (ret < 0) {
207         FFRT_SYSEVENT_LOGE("monitor:%d add fail, ret:%d, errno:%d, %s", monitorfd_, ret, errno, strerror(errno));
208     }
209 #endif
210     FFRT_LOGD("Construction completed.");
211 }
212 
Terminate()213 void DelayedWorker::Terminate()
214 {
215     if (delayedWorker != nullptr && delayedWorker->joinable()) {
216         FFRT_LOGD("Terminating Delayed worker");
217         toExit = true;
218         // Reduce the timeout to zero, in order to
219         // wakeup epoll_wait immediately.
220         itimerspec its = { {0, 0}, {0, 1} };
221         timerfd_settime(timerfd_, 0, &its, nullptr);
222         delayedWorker->join();
223         delayedWorker  = nullptr;
224     }
225 }
226 
~DelayedWorker()227 DelayedWorker::~DelayedWorker()
228 {
229     Terminate();
230     SetDelayedWorkerExitFlag();
231     while (asyncTaskCnt_.load() > 0) {
232         std::this_thread::sleep_for(std::chrono::microseconds(ASYNC_TASK_SLEEP_MS));
233     }
234 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
235     ::close(monitorfd_);
236 #endif
237     ::close(timerfd_);
238     FFRT_LOGD("Destruction completed.");
239 }
240 
GetInstance()241 DelayedWorker& DelayedWorker::GetInstance()
242 {
243     static DelayedWorker instance;
244     return instance;
245 }
246 
CheckTimeInterval(const TimePoint & startTp,const TimePoint & endTp)247 void CheckTimeInterval(const TimePoint& startTp, const TimePoint& endTp)
248 {
249     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(endTp - startTp);
250     int64_t durationMs = duration.count();
251     if (durationMs > EXECUTION_TIMEOUT_MILLISECONDS) {
252         FFRT_LOGW("handle work more than [%lld]ms", durationMs);
253     }
254 }
255 
HandleWork()256 int DelayedWorker::HandleWork()
257 {
258     if (!map.empty()) {
259         noTaskDelayCount_ = 0;
260         TimePoint startTp = std::chrono::steady_clock::now();
261         do {
262             auto cur = map.begin();
263             if (!toExit && cur != map.end() && cur->first <= startTp) {
264                 DelayedWork w = cur->second;
265                 map.erase(cur);
266                 lock.unlock();
267                 std::function<void(WaitEntry*)> workCb = *w.cb;
268                 (workCb)(w.we);
269                 lock.lock();
270                 FFRT_COND_DO_ERR(toExit, return -1, "HandleWork exit, map size:%d", map.size());
271                 TimePoint endTp = std::chrono::steady_clock::now();
272                 CheckTimeInterval(startTp, endTp);
273                 startTp = std::move(endTp);
274             } else {
275                 return 0;
276             }
277         } while (!map.empty());
278     }
279     return 1;
280 }
281 
282 // There is no requirement that to be less than now
dispatch(const TimePoint & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup,bool skipTimeCheck)283 bool DelayedWorker::dispatch(const TimePoint& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup,
284     bool skipTimeCheck)
285 {
286     bool w = false;
287     if (toExit) {
288         FFRT_SYSEVENT_LOGE("DelayedWorker destroy, dispatch failed\n");
289         return false;
290     }
291 
292     if (!skipTimeCheck && to <= std::chrono::steady_clock::now()) {
293         return false;
294     }
295     std::lock_guard lg(lock);
296     if (exited_) {
297         ThreadInit();
298         exited_ = false;
299     }
300 
301     if (map.empty() || to < map.begin()->first) {
302         w = true;
303     }
304     map.emplace(to, DelayedWork {we, &wakeup});
305     if (w) {
306         uint64_t ns = static_cast<uint64_t>(to.time_since_epoch().count());
307         itimerspec its = { {0, 0}, {static_cast<long>(ns / NS_PER_SEC), static_cast<long>(ns % NS_PER_SEC)} };
308         int ret = timerfd_settime(timerfd_, TFD_TIMER_ABSTIME, &its, nullptr);
309         if (ret != 0) {
310             FFRT_SYSEVENT_LOGE("timerfd_settime error, ns=%lu, ret= %d.", ns, ret);
311         }
312     }
313     return true;
314 }
315 
remove(const TimePoint & to,WaitEntry * we)316 bool DelayedWorker::remove(const TimePoint& to, WaitEntry* we)
317 {
318     std::lock_guard lg(lock);
319 
320     auto range = map.equal_range(to);
321     for (auto it = range.first; it != range.second; ++it) {
322         if (it->second.we == we) {
323             map.erase(it);
324             return true;
325         }
326     }
327 
328     return false;
329 }
330 
SubmitAsyncTask(std::function<void ()> && func)331 void DelayedWorker::SubmitAsyncTask(std::function<void()>&& func)
332 {
333     asyncTaskCnt_.fetch_add(1);
334     ffrt::submit([this, func = std::move(func)]() {
335         if (toExit) {
336             asyncTaskCnt_.fetch_sub(1);
337             return;
338         }
339 
340         func();
341         asyncTaskCnt_.fetch_sub(1);
342         }, {}, {this}, ffrt::task_attr().qos(qos_background));
343 }
344 } // namespace ffrt
345