• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "schedule_task_manager.h"
17 
18 #include <ctime>
19 #include <iostream>
20 #include <mutex>
21 #include <pthread.h>
22 
23 #include "logging.h"
24 #include "securec.h"
25 
26 namespace {
27 constexpr std::chrono::milliseconds POLL_INTERVAL = std::chrono::milliseconds(5000);
28 constexpr std::chrono::milliseconds MIN_REPEAT_INTERVAL = std::chrono::milliseconds(10);
29 constexpr std::chrono::milliseconds ZERO_INTERVAL = std::chrono::milliseconds(0);
30 } // namespace
31 
GetInstance()32 ScheduleTaskManager& ScheduleTaskManager::GetInstance()
33 {
34     static ScheduleTaskManager instance;
35     return instance;
36 }
37 
ScheduleTaskManager()38 ScheduleTaskManager::ScheduleTaskManager()
39 {
40     runScheduleThread_ = true;
41     scheduleThread_ = std::thread(&ScheduleTaskManager::ScheduleThread, this);
42 }
43 
~ScheduleTaskManager()44 ScheduleTaskManager::~ScheduleTaskManager()
45 {
46     Shutdown();
47 }
48 
Shutdown()49 void ScheduleTaskManager::Shutdown()
50 {
51     bool expect = true;
52     if (!runScheduleThread_.compare_exchange_strong(expect, false)) {
53         return;
54     }
55     taskCv_.notify_one();
56     if (scheduleThread_.joinable()) {
57         scheduleThread_.join();
58     }
59     taskMap_.clear();
60     timeMap_.clear();
61 }
62 
NormalizeInterval(std::chrono::milliseconds interval)63 std::chrono::milliseconds ScheduleTaskManager::NormalizeInterval(std::chrono::milliseconds interval)
64 {
65     if (interval <= ZERO_INTERVAL) {
66         return ZERO_INTERVAL;
67     }
68     if (interval < MIN_REPEAT_INTERVAL) {
69         return MIN_REPEAT_INTERVAL;
70     }
71     return interval / MIN_REPEAT_INTERVAL * MIN_REPEAT_INTERVAL;
72 }
73 
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval)74 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
75                                        const std::function<void(void)>& callback,
76                                        const std::chrono::milliseconds& repeatInterval)
77 {
78     return ScheduleTask(name, callback, repeatInterval, repeatInterval);
79 }
80 
ScheduleTask(const std::string & name,const std::function<void (void)> & callback,const std::chrono::milliseconds & repeatInterval,std::chrono::milliseconds initialDelay)81 bool ScheduleTaskManager::ScheduleTask(const std::string& name,
82                                        const std::function<void(void)>& callback,
83                                        const std::chrono::milliseconds& repeatInterval,
84                                        std::chrono::milliseconds initialDelay)
85 {
86     auto currentTime = Clock::now();
87     auto task = std::make_shared<Task>();
88 
89     task->name = name;
90     task->callback = callback;
91     task->initialDelay = initialDelay;
92     task->repeatInterval = NormalizeInterval(repeatInterval);
93     task->nextRunTime = currentTime + initialDelay;
94 
95     std::lock_guard<std::mutex> guard(taskMutex_);
96     CHECK_TRUE(taskMap_.count(name) <= 0, false, "task name %s already exists!", name.c_str());
97 
98     taskMap_[name] = task;
99     timeMap_.insert(std::make_pair(task->nextRunTime, task));
100     taskCv_.notify_one();
101 
102     DumpTask(task);
103     HILOG_DEBUG(LOG_CORE, "add schedule %s done, total: %zu", name.c_str(), taskMap_.size());
104     return true;
105 }
106 
UnscheduleTask(const std::string & name)107 bool ScheduleTaskManager::UnscheduleTask(const std::string& name)
108 {
109     std::unique_lock<std::mutex> lck(taskMutex_);
110     HILOG_DEBUG(LOG_CORE, "del schedule %s start, total: %zu", name.c_str(), taskMap_.size());
111     auto it = taskMap_.find(name);
112     if (it != taskMap_.end()) {
113         taskMap_.erase(it);
114         HILOG_DEBUG(LOG_CORE, "del schedule %s done, remain: %zu", name.c_str(), taskMap_.size());
115         return true;
116     }
117     HILOG_DEBUG(LOG_CORE, "del schedule %s pass, total: %zu", name.c_str(), taskMap_.size());
118     return false;
119 }
120 
TakeFront()121 ScheduleTaskManager::WeakTask ScheduleTaskManager::TakeFront()
122 {
123     std::unique_lock<std::mutex> lck(taskMutex_);
124 
125     // thread wait until task insert or shutdown
126     while (timeMap_.empty() && runScheduleThread_) {
127         taskCv_.wait_for(lck, POLL_INTERVAL);
128     }
129 
130     if (!runScheduleThread_) {
131         return {};
132     }
133 
134     auto task = timeMap_.begin()->second;
135     timeMap_.erase(timeMap_.begin());
136     return task;
137 }
138 
DumpTask(const SharedTask & task)139 void ScheduleTaskManager::DumpTask(const SharedTask& task) {}
140 
ScheduleThread()141 void ScheduleTaskManager::ScheduleThread()
142 {
143     pthread_setname_np(pthread_self(), "SchedTaskMgr");
144     while (runScheduleThread_) {
145         // take front task from task queue
146         WeakTask weakTask = TakeFront();
147         if (!runScheduleThread_) {
148             break;
149         }
150 
151         TimePoint targetTime;
152         {
153             auto taskTime = weakTask.lock(); // promote to shared_ptr
154             if (!taskTime) {
155                 // task cancelled with UnschduleTask or not a repeat task
156                 HILOG_INFO(LOG_CORE, "front task cacelled or not repeat task");
157                 continue;
158             }
159             targetTime = taskTime->nextRunTime;
160         }
161 
162         // delay to target time
163         auto currentTime = Clock::now();
164         if (targetTime >= currentTime) {
165             std::this_thread::sleep_for(targetTime - currentTime);
166         }
167 
168         auto taskRepeat = weakTask.lock();
169         if (!taskRepeat) {
170             // task cancelled with UnschduleTask
171             HILOG_INFO(LOG_CORE, "front task cacelled");
172             continue;
173         }
174 
175         // call task callback
176         DumpTask(taskRepeat);
177         taskRepeat->callback();
178         taskRepeat->nextRunTime = targetTime + taskRepeat->repeatInterval;
179 
180         if (taskRepeat->repeatInterval.count() != 0) {
181             // repeat task, re-insert task to timeMap
182             std::unique_lock<std::mutex> guard(taskMutex_);
183             timeMap_.insert(std::make_pair(taskRepeat->nextRunTime, taskRepeat));
184         } else {
185             // not a repeat task.
186             std::unique_lock<std::mutex> guard(taskMutex_);
187             taskMap_.erase(taskRepeat->name);
188         }
189     }
190 }
191