1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "schedule_task_manager.h"
17
18 #include <ctime>
19 #include <fcntl.h>
20 #include <mutex>
21 #include <pthread.h>
22 #include <cstring>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/timerfd.h>
26 #include <unistd.h>
27
28 namespace {
29 constexpr int32_t TIME_BASE = 1000; // Time progression rate.
30 constexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds.
31 constexpr int32_t EPOLL_EVENT_MAX = 1024;
32 } // namespace
33
ScheduleTaskManager()34 ScheduleTaskManager::ScheduleTaskManager()
35 {
36 StartThread();
37 }
38
~ScheduleTaskManager()39 ScheduleTaskManager::~ScheduleTaskManager()
40 {
41 Shutdown();
42 }
43
Shutdown()44 void ScheduleTaskManager::Shutdown()
45 {
46 bool expect = true;
47 if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
48 return;
49 }
50 uint64_t value = 1;
51 write(stopFd_, &value, sizeof(value));
52 if (scheduleThread_.joinable()) {
53 scheduleThread_.join();
54 }
55 std::lock_guard<std::mutex> guard(mtx_);
56 for (const auto& [timerFd, func] : tasks_) {
57 close(timerFd);
58 }
59 close(epollFd_);
60 close(stopFd_);
61 }
62
ScheduleTask(const std::function<void (void)> & callback,const uint64_t interval,bool once)63 int32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once)
64 {
65 int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
66 if (timerFd == -1) {
67 PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed");
68 return -1;
69 }
70
71 std::function<void(void)> func;
72 struct itimerspec time;
73 if (once) {
74 if (interval == 0) {
75 PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0");
76 return -1;
77 }
78 time.it_value.tv_sec = interval / TIME_BASE;
79 time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
80 time.it_interval.tv_sec = 0;
81 time.it_interval.tv_nsec = 0;
82 func = std::bind(&ScheduleTaskManager::HandleSingleTask, this, timerFd, callback);
83 } else {
84 time.it_value.tv_sec = 0;
85 time.it_value.tv_nsec = FIRST_TIME;
86 time.it_interval.tv_sec = interval / TIME_BASE;
87 time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
88 func = callback;
89 }
90
91 int32_t ret = timerfd_settime(timerFd, 0, &time, NULL);
92 if (ret == -1) {
93 PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed");
94 return -1;
95 }
96
97 struct epoll_event evt;
98 evt.data.fd = timerFd;
99 evt.events = EPOLLIN;
100 epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt);
101 std::lock_guard<std::mutex> guard(mtx_);
102 tasks_[timerFd] = std::move(func);
103 return timerFd;
104 }
105
UnscheduleTask(const int32_t timerFd)106 bool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd)
107 {
108 std::lock_guard<std::mutex> guard(mtx_);
109 return DeleteTask(timerFd);
110 }
111
UnscheduleTaskLockless(const int32_t timerFd)112 bool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd)
113 {
114 return DeleteTask(timerFd);
115 }
116
DeleteTask(const int32_t timerFd)117 bool ScheduleTaskManager::DeleteTask(const int32_t timerFd)
118 {
119 if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) {
120 close(timerFd);
121 epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL);
122 tasks_.erase(timerFd);
123 return true;
124 }
125 return false;
126 }
127
ScheduleThread()128 void ScheduleTaskManager::ScheduleThread()
129 {
130 pthread_setname_np(pthread_self(), "SchedTaskMgr");
131 uint64_t exp;
132 while (runScheduleThread_) {
133 struct epoll_event events[EPOLL_EVENT_MAX];
134 int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1);
135 if (nfd > 0) {
136 for (int32_t i = 0; i < nfd; ++i) {
137 if (events[i].data.fd == stopFd_) {
138 return;
139 }
140
141 int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t));
142 if (ret == sizeof(uint64_t)) {
143 std::lock_guard<std::mutex> guard(mtx_);
144 if (tasks_.find(events[i].data.fd) != tasks_.end()) {
145 tasks_[events[i].data.fd]();
146 }
147 }
148 }
149 }
150 }
151 }
152
HandleSingleTask(int32_t fd,std::function<void (void)> callback)153 void ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback)
154 {
155 callback();
156 UnscheduleTaskLockless(fd);
157 }
158
StartThread()159 void ScheduleTaskManager::StartThread()
160 {
161 epollFd_ = epoll_create(0);
162 stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait.
163 struct epoll_event evt;
164 evt.data.fd = stopFd_;
165 evt.events = EPOLLIN;
166 epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt);
167
168 scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
169 }