1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 17 #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 18 #include <atomic> 19 #include <chrono> 20 #include <condition_variable> 21 #include <functional> 22 #include <limits> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <thread> 27 28 #include "visibility.h" 29 namespace OHOS { 30 class API_LOCAL TaskScheduler { 31 public: 32 using TaskId = uint64_t; 33 using Time = std::chrono::steady_clock::time_point; 34 using Duration = std::chrono::steady_clock::duration; 35 using Clock = std::chrono::steady_clock; 36 using Task = std::function<void()>; 37 inline static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); TaskScheduler(size_t capacity,const std::string & name)38 TaskScheduler(size_t capacity, const std::string &name) 39 { 40 capacity_ = capacity; 41 isRunning_ = true; 42 taskId_ = INVALID_TASK_ID; 43 thread_ = std::make_unique<std::thread>([this, name]() { 44 auto realName = std::string("scheduler_") + name; 45 pthread_setname_np(pthread_self(), realName.c_str()); 46 Loop(); 47 }); 48 } TaskScheduler(const std::string & name)49 TaskScheduler(const std::string &name) : TaskScheduler(std::numeric_limits<size_t>::max(), name) {} 50 TaskScheduler(size_t capacity = std::numeric_limits<size_t>::max()) : TaskScheduler(capacity, "") {} 51 ~TaskScheduler()52 ~TaskScheduler() 53 { 54 isRunning_ = false; 55 Clean(); 56 At(std::chrono::steady_clock::now(), []() {}); 57 thread_->join(); 58 } 59 60 // execute task at specific time At(const Time & time,Task task)61 TaskId At(const Time &time, Task task) 62 { 63 std::unique_lock<std::mutex> lock(mutex_); 64 if (tasks_.size() >= capacity_) { 65 return INVALID_TASK_ID; 66 } 67 auto taskId = GenTaskId(); 68 auto it = tasks_.insert({ time, std::pair{ task, taskId } }); 69 if (it == tasks_.begin()) { 70 condition_.notify_one(); 71 } 72 indexes_[taskId] = it; 73 return taskId; 74 } 75 Reset(TaskId taskId,const Duration & interval)76 TaskId Reset(TaskId taskId, const Duration &interval) 77 { 78 std::unique_lock<std::mutex> lock(mutex_); 79 auto index = indexes_.find(taskId); 80 if (index == indexes_.end()) { 81 return INVALID_TASK_ID; 82 } 83 84 auto it = tasks_.insert({ std::chrono::steady_clock::now() + interval, std::move(index->second->second) }); 85 if (it == tasks_.begin() || index->second == tasks_.begin()) { 86 condition_.notify_one(); 87 } 88 tasks_.erase(index->second); 89 indexes_[taskId] = it; 90 return taskId; 91 } 92 Clean()93 void Clean() 94 { 95 std::unique_lock<std::mutex> lock(mutex_); 96 indexes_.clear(); 97 tasks_.clear(); 98 } 99 100 // execute task periodically with duration Every(Duration interval,Task task)101 void Every(Duration interval, Task task) 102 { 103 std::function<void()> waitFunc = [this, interval, task]() { 104 task(); 105 this->Every(interval, task); 106 }; 107 At(std::chrono::steady_clock::now() + interval, waitFunc); 108 } 109 110 // remove task in SchedulerTask Remove(TaskId taskId)111 void Remove(TaskId taskId) 112 { 113 std::unique_lock<std::mutex> lock(mutex_); 114 auto index = indexes_.find(taskId); 115 if (index == indexes_.end()) { 116 return; 117 } 118 tasks_.erase(index->second); 119 indexes_.erase(index); 120 condition_.notify_one(); 121 } 122 123 // execute task periodically with duration after delay Every(Duration delay,Duration interval,Task task)124 void Every(Duration delay, Duration interval, Task task) 125 { 126 std::function<void()> waitFunc = [this, interval, task]() { 127 task(); 128 Every(interval, task); 129 }; 130 At(std::chrono::steady_clock::now() + delay, waitFunc); 131 } 132 133 // execute task for some times periodically with duration after delay Every(int32_t times,Duration delay,Duration interval,Task task)134 void Every(int32_t times, Duration delay, Duration interval, Task task) 135 { 136 std::function<void()> waitFunc = [this, times, interval, task]() { 137 task(); 138 int count = times; 139 count--; 140 if (times > 1) { 141 Every(count, interval, interval, task); 142 } 143 }; 144 145 At(std::chrono::steady_clock::now() + delay, waitFunc); 146 } 147 Execute(Task task)148 TaskId Execute(Task task) 149 { 150 return At(std::chrono::steady_clock::now(), std::move(task)); 151 } 152 153 private: 154 using InnerTask = std::pair<std::function<void()>, uint64_t>; Loop()155 void Loop() 156 { 157 while (isRunning_) { 158 std::function<void()> exec; 159 { 160 std::unique_lock<std::mutex> lock(mutex_); 161 condition_.wait(lock, [this] { 162 return !tasks_.empty(); 163 }); 164 if (tasks_.begin()->first > std::chrono::steady_clock::now()) { 165 auto time = tasks_.begin()->first; 166 condition_.wait_until(lock, time); 167 continue; 168 } 169 auto it = tasks_.begin(); 170 exec = it->second.first; 171 indexes_.erase(it->second.second); 172 tasks_.erase(it); 173 } 174 175 if (exec) { 176 exec(); 177 } 178 } 179 } 180 GenTaskId()181 TaskId GenTaskId() 182 { 183 auto taskId = ++taskId_; 184 if (taskId == INVALID_TASK_ID) { 185 return ++taskId_; 186 } 187 return taskId; 188 } 189 190 volatile bool isRunning_; 191 size_t capacity_; 192 std::multimap<Time, InnerTask> tasks_; 193 std::map<TaskId, decltype(tasks_)::iterator> indexes_; 194 std::mutex mutex_; 195 std::unique_ptr<std::thread> thread_; 196 std::condition_variable condition_; 197 std::atomic<uint64_t> taskId_; 198 }; 199 } // namespace OHOS 200 #endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 201