1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "delayed_worker.h"
17
18 #include <unistd.h>
19 #include <sys/syscall.h>
20 #include <sys/prctl.h>
21 #include <thread>
22 #include <linux/futex.h>
23 namespace ffrt {
DelayedWorker()24 DelayedWorker::DelayedWorker() : futex(0)
25 {
26 std::thread t([this]() {
27 prctl(PR_SET_NAME, "delayed_worker");
28 for (;;) {
29 lock.lock();
30 if (futex < 0) {
31 lock.unlock();
32 exited = true;
33 break;
34 }
35 struct timespec ts;
36 struct timespec *p = &ts;
37 HandleWork(&p);
38 lock.unlock();
39 syscall(SYS_futex, &futex, FUTEX_WAIT_BITSET, 0, p, 0, -1);
40 }
41 });
42 t.detach();
43 }
44
~DelayedWorker()45 DelayedWorker::~DelayedWorker()
46 {
47 lock.lock();
48 futex = -1;
49 lock.unlock();
50
51 while (!exited) {
52 syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
53 }
54 }
55
HandleWork(struct timespec ** p)56 void DelayedWorker::HandleWork(struct timespec** p)
57 {
58 const int NS_PER_SEC = 1000000000;
59
60 while (!map.empty()) {
61 time_point_t now = std::chrono::steady_clock::now();
62 auto cur = map.begin();
63 if (cur->first <= now) {
64 DelayedWork w = cur->second;
65 map.erase(cur);
66 lock.unlock();
67 (*w.cb)(w.we);
68 lock.lock();
69 if (futex < 0) {
70 return;
71 }
72 } else {
73 std::chrono::nanoseconds ns = cur->first.time_since_epoch();
74 (*p)->tv_sec = ns.count() / NS_PER_SEC;
75 (*p)->tv_nsec = ns.count() % NS_PER_SEC;
76 futex = 0;
77 return;
78 }
79 }
80
81 *p = nullptr;
82 futex = 0;
83 }
84
dispatch(const time_point_t & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)85 bool DelayedWorker::dispatch(const time_point_t& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
86 {
87 bool w = false;
88 std::lock_guard<decltype(lock)> l(lock);
89
90 if (futex < 0) {
91 return false;
92 }
93
94 time_point_t now = std::chrono::steady_clock::now();
95 if (to <= now) {
96 return false;
97 }
98
99 if (map.empty() || to < map.begin()->first) {
100 w = true;
101 }
102 map.emplace(to, DelayedWork {we, &wakeup});
103 if (w) {
104 futex = 1;
105 syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
106 }
107
108 return true;
109 }
110 } // namespace ffrt