1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "schedule_task_manager.h"
17
18 #include <ctime>
19 #include <iostream>
20 #include <mutex>
21 #include <pthread.h>
22
23 #include "logging.h"
24 #include "securec.h"
25
26 namespace {
27 constexpr std::chrono::milliseconds POLL_INTERVAL = std::chrono::milliseconds(5000);
28 constexpr std::chrono::milliseconds MIN_REPEAT_INTERVAL = std::chrono::milliseconds(10);
29 constexpr std::chrono::milliseconds ZERO_INTERVAL = std::chrono::milliseconds(0);
30 } // namespace
31
GetInstance()32 ScheduleTaskManager& ScheduleTaskManager::GetInstance()
33 {
34 static ScheduleTaskManager instance;
35 return instance;
36 }
37
ScheduleTaskManager()38 ScheduleTaskManager::ScheduleTaskManager()
39 {
40 runScheduleThread_ = true;
41 scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
42 }
43
~ScheduleTaskManager()44 ScheduleTaskManager::~ScheduleTaskManager()
45 {
46 Shutdown();
47 }
48
Shutdown()49 void ScheduleTaskManager::Shutdown()
50 {
51 bool expect = true;
52 if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
53 return;
54 }
55 taskCv_.notify_one();
56 if (scheduleThread_.joinable()) {
57 scheduleThread_.join();
58 }
59 taskMap_.clear();
60 timeMap_.clear();
61 }
62
NormalizeInterval(std::chrono::milliseconds interval)63 std::chrono::milliseconds ScheduleTaskManager::NormalizeInterval(std::chrono::milliseconds interval)
64 {
65 if (interval <= ZERO_INTERVAL) {
66 return ZERO_INTERVAL;
67 }
68 if (interval < MIN_REPEAT_INTERVAL) {
69 return MIN_REPEAT_INTERVAL;
70 }
71 return interval / MIN_REPEAT_INTERVAL * MIN_REPEAT_INTERVAL;
72 }
73
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval)74 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
75 const std::function<void(void)>& callback,
76 const std::chrono::milliseconds& repeatInterval)
77 {
78 return ScheduleTask(name, callback, repeatInterval, repeatInterval);
79 }
80
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval,std::chrono::milliseconds initialDelay)81 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
82 const std::function<void(void)>& callback,
83 const std::chrono::milliseconds& repeatInterval,
84 std::chrono::milliseconds initialDelay)
85 {
86 auto currentTime = Clock::now();
87 auto task = std::make_shared<Task>();
88
89 task->name = name;
90 task->callback = callback;
91 task->initialDelay = initialDelay;
92 task->repeatInterval = NormalizeInterval(repeatInterval);
93 task->nextRunTime = currentTime + initialDelay;
94
95 std::lock_guard<std::mutex> guard(taskMutex_);
96 CHECK_TRUE(taskMap_.count(name) <= 0, false, "task name %s already exists!", name.c_str());
97
98 taskMap_[name] = task;
99 timeMap_.insert(std::make_pair(task->nextRunTime, task));
100 taskCv_.notify_one();
101
102 DumpTask(task);
103 HILOG_DEBUG(LOG_CORE, "add schedule %s done, total: %zu", name.c_str(), taskMap_.size());
104 return true;
105 }
106
UnscheduleTask(const std::string & name)107 bool ScheduleTaskManager::UnscheduleTask(const std::string& name)
108 {
109 std::unique_lock<std::mutex> lck(taskMutex_);
110 HILOG_DEBUG(LOG_CORE, "del schedule %s start, total: %zu", name.c_str(), taskMap_.size());
111 auto it = taskMap_.find(name);
112 if (it != taskMap_.end()) {
113 taskMap_.erase(it);
114 HILOG_DEBUG(LOG_CORE, "del schedule %s done, remain: %zu", name.c_str(), taskMap_.size());
115 return true;
116 }
117 HILOG_DEBUG(LOG_CORE, "del schedule %s pass, total: %zu", name.c_str(), taskMap_.size());
118 return false;
119 }
120
TakeFront()121 ScheduleTaskManager::WeakTask ScheduleTaskManager::TakeFront()
122 {
123 std::unique_lock<std::mutex> lck(taskMutex_);
124
125 // thread wait until task insert or shutdown
126 while (timeMap_.empty() && runScheduleThread_) {
127 taskCv_.wait_for(lck, POLL_INTERVAL);
128 }
129
130 if (!runScheduleThread_) {
131 return {};
132 }
133
134 auto task = timeMap_.begin()->second;
135 timeMap_.erase(timeMap_.begin());
136 return task;
137 }
138
DumpTask(const SharedTask & task)139 void ScheduleTaskManager::DumpTask(const SharedTask& task) {}
140
ScheduleThread()141 void ScheduleTaskManager::ScheduleThread()
142 {
143 pthread_setname_np(pthread_self(), "SchedTaskMgr");
144 while (runScheduleThread_) {
145 // take front task from task queue
146 WeakTask weakTask = TakeFront();
147 if (!runScheduleThread_) {
148 break;
149 }
150
151 TimePoint targetTime;
152 {
153 auto taskTime = weakTask.lock(); // promote to shared_ptr
154 if (!taskTime) {
155 // task cancelled with UnschduleTask or not a repeat task
156 HILOG_INFO(LOG_CORE, "front task cacelled or not repeat task");
157 continue;
158 }
159 targetTime = taskTime->nextRunTime;
160 }
161
162 // delay to target time
163 auto currentTime = Clock::now();
164 if (targetTime >= currentTime) {
165 std::this_thread::sleep_for(targetTime - currentTime);
166 }
167
168 auto taskRepeat = weakTask.lock();
169 if (!taskRepeat) {
170 // task cancelled with UnschduleTask
171 HILOG_INFO(LOG_CORE, "front task cacelled");
172 continue;
173 }
174
175 // call task callback
176 DumpTask(taskRepeat);
177 taskRepeat->callback();
178 taskRepeat->nextRunTime = targetTime + taskRepeat->repeatInterval;
179
180 if (taskRepeat->repeatInterval.count() != 0) {
181 // repeat task, re-insert task to timeMap
182 std::unique_lock<std::mutex> guard(taskMutex_);
183 timeMap_.insert(std::make_pair(taskRepeat->nextRunTime, taskRepeat));
184 } else {
185 // not a repeat task.
186 std::unique_lock<std::mutex> guard(taskMutex_);
187 taskMap_.erase(taskRepeat->name);
188 }
189 }
190 }
191