1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 17 #define OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 18 #include <atomic> 19 #include <chrono> 20 #include <condition_variable> 21 #include <functional> 22 #include <limits> 23 #include <map> 24 #include <memory> 25 #include <mutex> 26 #include <set> 27 #include <thread> 28 29 namespace OHOS { 30 class TaskScheduler { 31 public: 32 using TaskId = uint64_t; 33 using Time = std::chrono::steady_clock::time_point; 34 using Duration = std::chrono::steady_clock::duration; 35 using Clock = std::chrono::steady_clock; 36 using Task = std::function<void()>; 37 inline static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 38 inline static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 39 inline static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); TaskScheduler(size_t capacity,const std::string & name)40 TaskScheduler(size_t capacity, const std::string &name) 41 { 42 capacity_ = capacity; 43 isRunning_ = true; 44 taskId_ = INVALID_TASK_ID; 45 running_ = InnerTask(); 46 thread_ = std::make_unique<std::thread>([this, name]() { 47 auto realName = std::string("scheduler_") + name; 48 pthread_setname_np(pthread_self(), realName.c_str()); 49 Loop(); 50 }); 51 } TaskScheduler(const std::string & name)52 TaskScheduler(const std::string &name) : TaskScheduler(std::numeric_limits<size_t>::max(), name) {} 53 TaskScheduler(size_t capacity = std::numeric_limits<size_t>::max()) : TaskScheduler(capacity, "") {} ~TaskScheduler()54 ~TaskScheduler() 55 { 56 isRunning_ = false; 57 Clean(); 58 Execute([]() {}); 59 thread_->join(); 60 } 61 // execute task at specific time 62 TaskId At(const Time &begin, Task task, Duration interval = INVALID_INTERVAL, uint64_t times = UNLIMITED_TIMES) 63 { 64 std::unique_lock<decltype(mutex_)> lock(mutex_); 65 if (tasks_.size() >= capacity_) { 66 return INVALID_TASK_ID; 67 } 68 InnerTask innerTask; 69 innerTask.times = times; 70 innerTask.taskId = GenTaskId(); 71 innerTask.interval = interval; 72 innerTask.exec = std::move(task); 73 auto it = tasks_.insert({ begin, innerTask}); 74 if (it == tasks_.begin()) { 75 condition_.notify_one(); 76 } 77 indexes_[innerTask.taskId] = it; 78 return innerTask.taskId; 79 } Reset(TaskId taskId,const Duration & interval)80 TaskId Reset(TaskId taskId, const Duration &interval) 81 { 82 std::unique_lock<decltype(mutex_)> lock(mutex_); 83 if (running_.taskId == taskId && running_.interval != INVALID_INTERVAL) { 84 running_.interval = interval; 85 return running_.taskId; 86 } 87 auto index = indexes_.find(taskId); 88 if (index == indexes_.end()) { 89 return INVALID_TASK_ID; 90 } 91 auto &innerTask = index->second->second; 92 if (innerTask.interval != INVALID_INTERVAL) { 93 innerTask.interval = interval; 94 } 95 auto it = tasks_.insert({ std::chrono::steady_clock::now() + interval, std::move(innerTask) }); 96 if (it == tasks_.begin() || index->second == tasks_.begin()) { 97 condition_.notify_one(); 98 } 99 tasks_.erase(index->second); 100 indexes_[taskId] = it; 101 return taskId; 102 } Clean()103 void Clean() 104 { 105 std::unique_lock<decltype(mutex_)> lock(mutex_); 106 indexes_.clear(); 107 tasks_.clear(); 108 } 109 // execute task periodically with duration Every(Duration interval,Task task)110 TaskId Every(Duration interval, Task task) 111 { 112 return At(std::chrono::steady_clock::now() + interval, task, interval); 113 } 114 // remove task in SchedulerTask 115 void Remove(TaskId taskId, bool wait = false) 116 { 117 std::unique_lock<decltype(mutex_)> lock(mutex_); 118 cond_.wait(lock, [this, taskId, wait]() { 119 return (!wait || running_.taskId != taskId); 120 }); 121 auto index = indexes_.find(taskId); 122 if (index == indexes_.end()) { 123 return; 124 } 125 tasks_.erase(index->second); 126 indexes_.erase(index); 127 condition_.notify_one(); 128 } 129 // execute task periodically with duration after delay Every(Duration delay,Duration interval,Task task)130 TaskId Every(Duration delay, Duration interval, Task task) 131 { 132 return At(std::chrono::steady_clock::now() + delay, task, interval); 133 } 134 // execute task for some times periodically with duration after delay Every(int32_t times,Duration delay,Duration interval,Task task)135 TaskId Every(int32_t times, Duration delay, Duration interval, Task task) 136 { 137 return At(std::chrono::steady_clock::now() + delay, task, interval, times); 138 } Execute(Task task)139 TaskId Execute(Task task) 140 { 141 return At(std::chrono::steady_clock::now(), std::move(task)); 142 } 143 private: 144 struct InnerTask { 145 TaskId taskId = INVALID_TASK_ID; 146 Duration interval = INVALID_INTERVAL; 147 uint64_t times = UNLIMITED_TIMES; 148 std::function<void()> exec; 149 }; Loop()150 void Loop() 151 { 152 while (isRunning_) { 153 std::function<void()> exec; 154 { 155 std::unique_lock<decltype(mutex_)> lock(mutex_); 156 condition_.wait(lock, [this] { 157 return !tasks_.empty(); 158 }); 159 if (tasks_.begin()->first > std::chrono::steady_clock::now()) { 160 auto time = tasks_.begin()->first; 161 condition_.wait_until(lock, time); 162 continue; 163 } 164 auto it = tasks_.begin(); 165 running_ = it->second; 166 exec = running_.exec; 167 indexes_.erase(running_.taskId); 168 tasks_.erase(it); 169 running_.times--; 170 } 171 if (exec) { 172 exec(); 173 } 174 { 175 std::unique_lock<decltype(mutex_)> lock(mutex_); 176 if (running_.interval != INVALID_INTERVAL && running_.times > 0) { 177 auto it = tasks_.insert({ std::chrono::steady_clock::now() + running_.interval, running_ }); 178 indexes_[running_.taskId] = it; 179 } 180 running_ = InnerTask(); 181 cond_.notify_all(); 182 } 183 } 184 } 185 GenTaskId()186 TaskId GenTaskId() 187 { 188 auto taskId = ++taskId_; 189 if (taskId == INVALID_TASK_ID) { 190 return ++taskId_; 191 } 192 return taskId; 193 } 194 195 volatile bool isRunning_; 196 size_t capacity_; 197 std::multimap<Time, InnerTask> tasks_; 198 std::map<TaskId, decltype(tasks_)::iterator> indexes_; 199 InnerTask running_; 200 std::mutex mutex_; 201 std::unique_ptr<std::thread> thread_; 202 std::condition_variable condition_; 203 std::condition_variable cond_; 204 std::atomic<uint64_t> taskId_; 205 }; 206 } // namespace OHOS 207 #endif // OHOS_DISTRIBUTED_DATA_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 208