• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delayed_worker.h"
17 
18 #include <unistd.h>
19 #include <sys/syscall.h>
20 #include <sys/prctl.h>
21 #include <thread>
22 #include <linux/futex.h>
23 #include "dfx/log/ffrt_log_api.h"
24 #include "util/name_manager.h"
25 namespace ffrt {
DelayedWorker()26 DelayedWorker::DelayedWorker() : futex(0)
27 {
28     struct sched_param param;
29     param.sched_priority = 1;
30     int ret = pthread_setschedparam(pthread_self(), SCHED_RR, &param);
31     if (ret != 0) {
32         FFRT_LOGE("[%d] set priority failed ret[%d] errno[%d]\n", pthread_self(), ret, errno);
33     }
34     std::thread t([this]() {
35         prctl(PR_SET_NAME, DELAYED_WORKER_NAME);
36         for (;;) {
37             lock.lock();
38             if (futex < 0) {
39                 lock.unlock();
40                 exited = true;
41                 break;
42             }
43             struct timespec ts;
44             struct timespec *p = &ts;
45             HandleWork(&p);
46             lock.unlock();
47             syscall(SYS_futex, &futex, FUTEX_WAIT_BITSET, 0, p, 0, -1);
48         }
49     });
50     t.detach();
51 }
52 
~DelayedWorker()53 DelayedWorker::~DelayedWorker()
54 {
55     lock.lock();
56     futex = -1;
57     lock.unlock();
58 
59     while (!exited) {
60         syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
61     }
62 }
63 
HandleWork(struct timespec ** p)64 void DelayedWorker::HandleWork(struct timespec** p)
65 {
66     const int NS_PER_SEC = 1000000000;
67 
68     while (!map.empty()) {
69         time_point_t now = std::chrono::steady_clock::now();
70         auto cur = map.begin();
71         if (cur->first <= now) {
72             DelayedWork w = cur->second;
73             map.erase(cur);
74             lock.unlock();
75             (*w.cb)(w.we);
76             lock.lock();
77             if (futex < 0) {
78                 return;
79             }
80         } else {
81             std::chrono::nanoseconds ns = cur->first.time_since_epoch();
82             (*p)->tv_sec = ns.count() / NS_PER_SEC;
83             (*p)->tv_nsec = ns.count() % NS_PER_SEC;
84             futex = 0;
85             return;
86         }
87     }
88 
89     *p = nullptr;
90     futex = 0;
91 }
92 
dispatch(const time_point_t & to,WaitEntry * we,const std::function<void (WaitEntry *)> & wakeup)93 bool DelayedWorker::dispatch(const time_point_t& to, WaitEntry* we, const std::function<void(WaitEntry*)>& wakeup)
94 {
95     bool w = false;
96     std::lock_guard<decltype(lock)> l(lock);
97 
98     if (futex < 0) {
99         return false;
100     }
101 
102     time_point_t now = std::chrono::steady_clock::now();
103     if (to <= now) {
104         return false;
105     }
106 
107     if (map.empty() || to < map.begin()->first) {
108         w = true;
109     }
110     map.emplace(to, DelayedWork {we, &wakeup});
111     if (w) {
112         futex = 1;
113         syscall(SYS_futex, &futex, FUTEX_WAKE, 1);
114     }
115 
116     return true;
117 }
118 } // namespace ffrt