1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "schedule_task_manager.h"
17
18 #include <ctime>
19 #include <iostream>
20 #include <mutex>
21 #include <pthread.h>
22
23 #include "logging.h"
24 #include "securec.h"
25
26 namespace {
27 constexpr std::chrono::milliseconds POLL_INTERVAL = std::chrono::milliseconds(5000);
28 constexpr std::chrono::milliseconds MIN_REPEAT_INTERVAL = std::chrono::milliseconds(10);
29 constexpr std::chrono::milliseconds ZERO_INTERVAL = std::chrono::milliseconds(0);
30 } // namespace
31
GetInstance()32 ScheduleTaskManager& ScheduleTaskManager::GetInstance()
33 {
34 static ScheduleTaskManager instance;
35 return instance;
36 }
37
ScheduleTaskManager()38 ScheduleTaskManager::ScheduleTaskManager()
39 {
40 runScheduleThread_ = true;
41 scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
42 }
43
~ScheduleTaskManager()44 ScheduleTaskManager::~ScheduleTaskManager()
45 {
46 Shutdown();
47 }
48
Shutdown()49 void ScheduleTaskManager::Shutdown()
50 {
51 bool expect = true;
52 if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
53 return;
54 }
55 taskCv_.notify_one();
56 if (scheduleThread_.joinable()) {
57 scheduleThread_.join();
58 }
59 taskMap_.clear();
60 timeMap_.clear();
61 }
62
NormalizeInterval(std::chrono::milliseconds interval)63 std::chrono::milliseconds ScheduleTaskManager::NormalizeInterval(std::chrono::milliseconds interval)
64 {
65 if (interval <= ZERO_INTERVAL) {
66 return ZERO_INTERVAL;
67 }
68 if (interval < MIN_REPEAT_INTERVAL) {
69 return MIN_REPEAT_INTERVAL;
70 }
71 return interval / MIN_REPEAT_INTERVAL * MIN_REPEAT_INTERVAL;
72 }
73
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval)74 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
75 const std::function<void(void)>& callback,
76 const std::chrono::milliseconds& repeatInterval)
77 {
78 return ScheduleTask(name, callback, repeatInterval, repeatInterval);
79 }
80
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval,std::chrono::milliseconds initialDelay)81 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
82 const std::function<void(void)>& callback,
83 const std::chrono::milliseconds& repeatInterval,
84 std::chrono::milliseconds initialDelay)
85 {
86 auto currentTime = Clock::now();
87 auto task = std::make_shared<Task>();
88
89 task->name = name;
90 task->callback = callback;
91 task->initialDelay = initialDelay;
92 task->repeatInterval = NormalizeInterval(repeatInterval);
93 task->nextRunTime = currentTime + initialDelay;
94
95 std::lock_guard<std::mutex> guard(taskMutex_);
96 if (taskMap_.count(name) > 0) {
97 HILOG_WARN(LOG_CORE, "task name %s already exists!", name.c_str());
98 return false;
99 }
100
101 taskMap_[name] = task;
102 timeMap_.insert(std::make_pair(task->nextRunTime, task));
103 taskCv_.notify_one();
104
105 DumpTask(task);
106 HILOG_DEBUG(LOG_CORE, "add schedule %s done, total: %zu", name.c_str(), taskMap_.size());
107 return true;
108 }
109
UnscheduleTask(const std::string & name)110 bool ScheduleTaskManager::UnscheduleTask(const std::string& name)
111 {
112 std::unique_lock<std::mutex> lck(taskMutex_);
113 HILOG_DEBUG(LOG_CORE, "del schedule %s start, total: %zu", name.c_str(), taskMap_.size());
114 auto it = taskMap_.find(name);
115 if (it != taskMap_.end()) {
116 taskMap_.erase(it);
117 HILOG_DEBUG(LOG_CORE, "del schedule %s done, remain: %zu", name.c_str(), taskMap_.size());
118 return true;
119 }
120 HILOG_DEBUG(LOG_CORE, "del schedule %s pass, total: %zu", name.c_str(), taskMap_.size());
121 return false;
122 }
123
TakeFront()124 ScheduleTaskManager::WeakTask ScheduleTaskManager::TakeFront()
125 {
126 std::unique_lock<std::mutex> lck(taskMutex_);
127
128 // thread wait until task insert or shutdown
129 while (timeMap_.empty() && runScheduleThread_) {
130 taskCv_.wait_for(lck, POLL_INTERVAL);
131 }
132
133 if (!runScheduleThread_) {
134 return {};
135 }
136
137 auto task = timeMap_.begin()->second;
138 timeMap_.erase(timeMap_.begin());
139 return task;
140 }
141
DumpTask(const SharedTask & task)142 void ScheduleTaskManager::DumpTask(const SharedTask& task) {}
143
ScheduleThread()144 void ScheduleTaskManager::ScheduleThread()
145 {
146 pthread_setname_np(pthread_self(), "SchedTaskMgr");
147 while (runScheduleThread_) {
148 // take front task from task queue
149 WeakTask weakTask = TakeFront();
150 if (!runScheduleThread_) {
151 break;
152 }
153
154 TimePoint targetTime;
155 {
156 auto taskTime = weakTask.lock(); // promote to shared_ptr
157 if (!taskTime) {
158 // task cancelled with UnschduleTask or not a repeat task
159 HILOG_INFO(LOG_CORE, "front task cacelled or not repeat task");
160 continue;
161 }
162 targetTime = taskTime->nextRunTime;
163 }
164
165 // delay to target time
166 auto currentTime = Clock::now();
167 if (targetTime >= currentTime) {
168 std::this_thread::sleep_for(targetTime - currentTime);
169 }
170
171 auto taskRepeat = weakTask.lock();
172 if (!taskRepeat) {
173 // task cancelled with UnschduleTask
174 HILOG_INFO(LOG_CORE, "front task cacelled");
175 continue;
176 }
177
178 // call task callback
179 DumpTask(taskRepeat);
180 taskRepeat->callback();
181 taskRepeat->nextRunTime = targetTime + taskRepeat->repeatInterval;
182
183 if (taskRepeat->repeatInterval.count() != 0) {
184 // repeat task, re-insert task to timeMap
185 std::unique_lock<std::mutex> guard(taskMutex_);
186 timeMap_.insert(std::make_pair(taskRepeat->nextRunTime, taskRepeat));
187 } else {
188 // not a repeat task.
189 std::unique_lock<std::mutex> guard(taskMutex_);
190 taskMap_.erase(taskRepeat->name);
191 }
192 }
193 }
194