1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "delayed_worker.h"
17
18 #include <unistd.h>
19 #include <sys/syscall.h>
20 #include <sys/prctl.h>
21 #include <thread>
22 #include <linux/futex.h>
23 #include "dfx/log/ffrt_log_api.h"
24 #include "util/name_manager.h"
25 namespace ffrt {
DelayedWorker()26 DelayedWorker::DelayedWorker() : futex(0)
27 {
28 struct sched_param param;
29 param.sched_priority = 1;
30 int ret = pthread_setschedparam(pthread_self(), SCHED_RR, ¶m);
31 if (ret != 0) {
32 FFRT_LOGE("[%d] set priority failed ret[%d] errno[%d]\n", pthread_self(), ret, errno);
33 }
34 std::thread t([this]() {
35 prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
36 for (;;) {
37 lock.lock();
38 if (futex < 0) {
39 lock.unlock();
40 exited = true;
41 break;
42 }
43 struct timespec ts;
44 struct timespec *p = &ts;
45 HandleWork(&p);
46 lock.unlock();
47 syscall(SYS_futex, &futex, FUTEX_WAIT_BITSET, 0, p, 0, -1);
48 }
49 });
50 t.detach();
51 }
52
~DelayedWorker()53 DelayedWorker::~DelayedWorker()
54 {
55 lock.lock();
56 futex = -1;
57 lock.unlock();
58
59 while (!exited) {
60 syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
61 }
62 }
63
HandleWork(struct timespec ** p)64 void DelayedWorker::HandleWork(struct timespec** p)
65 {
66 const int NS_PER_SEC = 1000000000;
67
68 while (!map.empty()) {
69 time_point_t now = std::chrono::steady_clock::now();
70 auto cur = map.begin();
71 if (cur->first <= now) {
72 DelayedWork w = cur->second;
73 map.erase(cur);
74 lock.unlock();
75 (*w.cb)(w.we);
76 lock.lock();
77 if (futex < 0) {
78 return;
79 }
80 } else {
81 std::chrono::nanoseconds ns = cur->first.time_since_epoch();
82 (*p)->tv_sec = ns.count() / NS_PER_SEC;
83 (*p)->tv_nsec = ns.count() % NS_PER_SEC;
84 futex = 0;
85 return;
86 }
87 }
88
89 *p = nullptr;
90 futex = 0;
91 }
92
dispatch(const time_point_t & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)93 bool DelayedWorker::dispatch(const time_point_t& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
94 {
95 bool w = false;
96 std::lock_guard<decltype(lock)> l(lock);
97
98 if (futex < 0) {
99 return false;
100 }
101
102 time_point_t now = std::chrono::steady_clock::now();
103 if (to <= now) {
104 return false;
105 }
106
107 if (map.empty() || to < map.begin()->first) {
108 w = true;
109 }
110 map.emplace(to, DelayedWork {we, &wakeup});
111 if (w) {
112 futex = 1;
113 syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
114 }
115
116 return true;
117 }
118 } // namespace ffrt