• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "schedule_task_manager.h"
17 
18 #include <ctime>
19 #include <fcntl.h>
20 #include <mutex>
21 #include <pthread.h>
22 #include <cstring>
23 #include <sys/epoll.h>
24 #include <sys/eventfd.h>
25 #include <sys/timerfd.h>
26 #include <unistd.h>
27 
28 namespace {
29 constexpr int32_t TIME_BASE = 1000; // Time progression rate.
30 constexpr int32_t FIRST_TIME = 10; // The start time of the first task is 10 nanoseconds.
31 constexpr int32_t EPOLL_EVENT_MAX = 1024;
32 } // namespace
33 
ScheduleTaskManager()34 ScheduleTaskManager::ScheduleTaskManager()
35 {
36     StartThread();
37 }
38 
~ScheduleTaskManager()39 ScheduleTaskManager::~ScheduleTaskManager()
40 {
41     Shutdown();
42 }
43 
Shutdown()44 void ScheduleTaskManager::Shutdown()
45 {
46     bool expect = true;
47     if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
48         return;
49     }
50     uint64_t value = 1;
51     write(stopFd_, &value, sizeof(value));
52     if (scheduleThread_.joinable()) {
53         scheduleThread_.join();
54     }
55     std::lock_guard<std::mutex> guard(mtx_);
56     for (const auto& [timerFd, func] : tasks_) {
57         close(timerFd);
58     }
59     close(epollFd_);
60     close(stopFd_);
61 }
62 
ScheduleTask(const std::function<void (void)> & callback,const uint64_t interval,bool once)63 int32_t ScheduleTaskManager::ScheduleTask(const std::function<void(void)>& callback, const uint64_t interval, bool once)
64 {
65     int32_t timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
66     if (timerFd == -1) {
67         PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd create failed");
68         return -1;
69     }
70 
71     std::function<void(void)> func;
72     struct itimerspec time;
73     if (once) {
74         if (interval == 0) {
75             PROFILER_LOG_ERROR(LOG_CORE, "the interval parameters of a single execution cannot be 0");
76             return -1;
77         }
78         time.it_value.tv_sec = interval / TIME_BASE;
79         time.it_value.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
80         time.it_interval.tv_sec = 0;
81         time.it_interval.tv_nsec = 0;
82         func = std::bind(&ScheduleTaskManager::HandleSingleTask, this, timerFd, callback);
83     } else {
84         time.it_value.tv_sec = 0;
85         time.it_value.tv_nsec = FIRST_TIME;
86         time.it_interval.tv_sec = interval / TIME_BASE;
87         time.it_interval.tv_nsec = (interval % TIME_BASE) * TIME_BASE * TIME_BASE;
88         func = callback;
89     }
90 
91     int32_t ret = timerfd_settime(timerFd, 0, &time, NULL);
92     if (ret == -1) {
93         PROFILER_LOG_ERROR(LOG_CORE, "ScheduleTaskManager timerfd settime failed");
94         return -1;
95     }
96 
97     struct epoll_event evt;
98     evt.data.fd = timerFd;
99     evt.events = EPOLLIN;
100     epoll_ctl(epollFd_, EPOLL_CTL_ADD, timerFd, &evt);
101     std::lock_guard<std::mutex> guard(mtx_);
102     tasks_[timerFd] = std::move(func);
103     return timerFd;
104 }
105 
UnscheduleTask(const int32_t timerFd)106 bool ScheduleTaskManager::UnscheduleTask(const int32_t timerFd)
107 {
108     std::lock_guard<std::mutex> guard(mtx_);
109     return DeleteTask(timerFd);
110 }
111 
UnscheduleTaskLockless(const int32_t timerFd)112 bool ScheduleTaskManager::UnscheduleTaskLockless(const int32_t timerFd)
113 {
114     return DeleteTask(timerFd);
115 }
116 
DeleteTask(const int32_t timerFd)117 bool ScheduleTaskManager::DeleteTask(const int32_t timerFd)
118 {
119     if (auto iter = tasks_.find(timerFd); iter != tasks_.end()) {
120         close(timerFd);
121         epoll_ctl(epollFd_, EPOLL_CTL_DEL, timerFd, NULL);
122         tasks_.erase(timerFd);
123         return true;
124     }
125     return false;
126 }
127 
ScheduleThread()128 void ScheduleTaskManager::ScheduleThread()
129 {
130     pthread_setname_np(pthread_self(), "SchedTaskMgr");
131     uint64_t exp;
132     while (runScheduleThread_) {
133         struct epoll_event events[EPOLL_EVENT_MAX];
134         int32_t nfd = epoll_wait(epollFd_, events, EPOLL_EVENT_MAX, -1);
135         if (nfd > 0) {
136             for (int32_t i = 0; i < nfd; ++i) {
137                 if (events[i].data.fd == stopFd_) {
138                     return;
139                 }
140 
141                 int32_t ret = read(events[i].data.fd, &exp, sizeof(uint64_t));
142                 if (ret == sizeof(uint64_t)) {
143                     std::lock_guard<std::mutex> guard(mtx_);
144                     if (tasks_.find(events[i].data.fd) != tasks_.end()) {
145                         tasks_[events[i].data.fd]();
146                     }
147                 }
148             }
149         }
150     }
151 }
152 
HandleSingleTask(int32_t fd,std::function<void (void)> callback)153 void ScheduleTaskManager::HandleSingleTask(int32_t fd, std::function<void(void)> callback)
154 {
155     callback();
156     UnscheduleTaskLockless(fd);
157 }
158 
StartThread()159 void ScheduleTaskManager::StartThread()
160 {
161     epollFd_ = epoll_create(0);
162     stopFd_ = eventfd(0, EFD_NONBLOCK); // Specifically designed for stopping epoll_wait.
163     struct epoll_event evt;
164     evt.data.fd = stopFd_;
165     evt.events = EPOLLIN;
166     epoll_ctl(epollFd_, EPOLL_CTL_ADD, stopFd_, &evt);
167 
168     scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
169 }