• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "schedule_task_manager.h"
17 
18 #include <ctime>
19 #include <iostream>
20 #include <mutex>
21 #include <pthread.h>
22 
23 #include "logging.h"
24 #include "securec.h"
25 
26 namespace {
27 constexpr std::chrono::milliseconds POLL_INTERVAL = std::chrono::milliseconds(5000);
28 constexpr std::chrono::milliseconds MIN_REPEAT_INTERVAL = std::chrono::milliseconds(10);
29 constexpr std::chrono::milliseconds ZERO_INTERVAL = std::chrono::milliseconds(0);
30 } // namespace
31 
GetInstance()32 ScheduleTaskManager& ScheduleTaskManager::GetInstance()
33 {
34     static ScheduleTaskManager instance;
35     return instance;
36 }
37 
ScheduleTaskManager()38 ScheduleTaskManager::ScheduleTaskManager()
39 {
40     runScheduleThread_ = true;
41     scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
42 }
43 
~ScheduleTaskManager()44 ScheduleTaskManager::~ScheduleTaskManager()
45 {
46     Shutdown();
47 }
48 
Shutdown()49 void ScheduleTaskManager::Shutdown()
50 {
51     bool expect = true;
52     if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
53         return;
54     }
55     taskCv_.notify_one();
56     if (scheduleThread_.joinable()) {
57         scheduleThread_.join();
58     }
59     taskMap_.clear();
60     timeMap_.clear();
61 }
62 
NormalizeInterval(std::chrono::milliseconds interval)63 std::chrono::milliseconds ScheduleTaskManager::NormalizeInterval(std::chrono::milliseconds interval)
64 {
65     if (interval <= ZERO_INTERVAL) {
66         return ZERO_INTERVAL;
67     }
68     if (interval < MIN_REPEAT_INTERVAL) {
69         return MIN_REPEAT_INTERVAL;
70     }
71     return interval / MIN_REPEAT_INTERVAL * MIN_REPEAT_INTERVAL;
72 }
73 
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval)74 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
75                                        const std::function<void(void)>& callback,
76                                        const std::chrono::milliseconds& repeatInterval)
77 {
78     return ScheduleTask(name, callback, repeatInterval, repeatInterval);
79 }
80 
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval,std::chrono::milliseconds initialDelay)81 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
82                                        const std::function<void(void)>& callback,
83                                        const std::chrono::milliseconds& repeatInterval,
84                                        std::chrono::milliseconds initialDelay)
85 {
86     auto currentTime = Clock::now();
87     auto task = std::make_shared<Task>();
88 
89     task->name = name;
90     task->callback = callback;
91     task->initialDelay = initialDelay;
92     task->repeatInterval = NormalizeInterval(repeatInterval);
93     task->nextRunTime = currentTime + initialDelay;
94 
95     std::lock_guard<std::mutex> guard(taskMutex_);
96     if (taskMap_.count(name) > 0) {
97         HILOG_WARN(LOG_CORE, "task name %s already exists!", name.c_str());
98         return false;
99     }
100 
101     taskMap_[name] = task;
102     timeMap_.insert(std::make_pair(task->nextRunTime, task));
103     taskCv_.notify_one();
104 
105     DumpTask(task);
106     HILOG_DEBUG(LOG_CORE, "add schedule %s done, total: %zu", name.c_str(), taskMap_.size());
107     return true;
108 }
109 
UnscheduleTask(const std::string & name)110 bool ScheduleTaskManager::UnscheduleTask(const std::string& name)
111 {
112     std::unique_lock<std::mutex> lck(taskMutex_);
113     HILOG_DEBUG(LOG_CORE, "del schedule %s start, total: %zu", name.c_str(), taskMap_.size());
114     auto it = taskMap_.find(name);
115     if (it != taskMap_.end()) {
116         taskMap_.erase(it);
117         HILOG_DEBUG(LOG_CORE, "del schedule %s done, remain: %zu", name.c_str(), taskMap_.size());
118         return true;
119     }
120     HILOG_DEBUG(LOG_CORE, "del schedule %s pass, total: %zu", name.c_str(), taskMap_.size());
121     return false;
122 }
123 
TakeFront()124 ScheduleTaskManager::WeakTask ScheduleTaskManager::TakeFront()
125 {
126     std::unique_lock<std::mutex> lck(taskMutex_);
127 
128     // thread wait until task insert or shutdown
129     while (timeMap_.empty() && runScheduleThread_) {
130         taskCv_.wait_for(lck, POLL_INTERVAL);
131     }
132 
133     if (!runScheduleThread_) {
134         return {};
135     }
136 
137     auto task = timeMap_.begin()->second;
138     timeMap_.erase(timeMap_.begin());
139     return task;
140 }
141 
DumpTask(const SharedTask & task)142 void ScheduleTaskManager::DumpTask(const SharedTask& task) {}
143 
ScheduleThread()144 void ScheduleTaskManager::ScheduleThread()
145 {
146     pthread_setname_np(pthread_self(), "SchedTaskMgr");
147     while (runScheduleThread_) {
148         // take front task from task queue
149         WeakTask weakTask = TakeFront();
150         if (!runScheduleThread_) {
151             break;
152         }
153 
154         TimePoint targetTime;
155         {
156             auto taskTime = weakTask.lock(); // promote to shared_ptr
157             if (!taskTime) {
158                 // task cancelled with UnschduleTask or not a repeat task
159                 HILOG_INFO(LOG_CORE, "front task cacelled or not repeat task");
160                 continue;
161             }
162             targetTime = taskTime->nextRunTime;
163         }
164 
165         // delay to target time
166         auto currentTime = Clock::now();
167         if (targetTime >= currentTime) {
168             std::this_thread::sleep_for(targetTime - currentTime);
169         }
170 
171         auto taskRepeat = weakTask.lock();
172         if (!taskRepeat) {
173             // task cancelled with UnschduleTask
174             HILOG_INFO(LOG_CORE, "front task cacelled");
175             continue;
176         }
177 
178         // call task callback
179         DumpTask(taskRepeat);
180         taskRepeat->callback();
181         taskRepeat->nextRunTime = targetTime + taskRepeat->repeatInterval;
182 
183         if (taskRepeat->repeatInterval.count() != 0) {
184             // repeat task, re-insert task to timeMap
185             std::unique_lock<std::mutex> guard(taskMutex_);
186             timeMap_.insert(std::make_pair(taskRepeat->nextRunTime, taskRepeat));
187         } else {
188             // not a repeat task.
189             std::unique_lock<std::mutex> guard(taskMutex_);
190             taskMap_.erase(taskRepeat->name);
191         }
192     }
193 }
194