1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_PREFERENCES_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 17 #define OHOS_PREFERENCES_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 18 #include <pthread.h> 19 20 #include <atomic> 21 #include <chrono> 22 #include <condition_variable> 23 #include <functional> 24 #include <limits> 25 #include <map> 26 #include <memory> 27 #include <mutex> 28 #include <thread> 29 30 #include "visibility.h" 31 namespace OHOS { 32 class API_LOCAL TaskScheduler { 33 public: 34 using TaskId = uint64_t; 35 using Time = std::chrono::steady_clock::time_point; 36 using Task = std::function<void()>; 37 inline static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0ULL); TaskScheduler(const std::string & name)38 TaskScheduler(const std::string &name) 39 { 40 capacity_ = std::numeric_limits<size_t>::max(); 41 isRunning_ = true; 42 taskId_ = INVALID_TASK_ID; 43 thread_ = std::make_unique<std::thread>([this, name]() { 44 auto realName = std::string("task_queue_") + name; 45 #if defined(MAC_PLATFORM) || defined(IOS_PLATFORM) 46 pthread_setname_np(realName.c_str()); 47 #else 48 pthread_setname_np(pthread_self(), realName.c_str()); 49 #endif 50 Loop(); 51 }); 52 } 53 ~TaskScheduler()54 ~TaskScheduler() 55 { 56 isRunning_ = false; 57 { 58 std::unique_lock<std::mutex> lock(mutex_); 59 indexes_.clear(); 60 tasks_.clear(); 61 } 62 At(std::chrono::steady_clock::now(), []() {}); 63 thread_->join(); 64 } 65 66 // execute task at specific time At(const Time & time,Task task)67 TaskId At(const Time &time, Task task) 68 { 69 std::unique_lock<std::mutex> lock(mutex_); 70 if (tasks_.size() >= capacity_) { 71 return INVALID_TASK_ID; 72 } 73 auto taskId = GenTaskId(); 74 auto it = tasks_.insert({ time, std::pair{ task, taskId } }); 75 if (it == tasks_.begin()) { 76 condition_.notify_one(); 77 } 78 indexes_[taskId] = it; 79 return taskId; 80 } 81 Execute(Task task)82 TaskId Execute(Task task) 83 { 84 return At(std::chrono::steady_clock::now(), std::move(task)); 85 } 86 87 private: 88 using InnerTask = std::pair<std::function<void()>, uint64_t>; Loop()89 void Loop() 90 { 91 while (isRunning_) { 92 std::function<void()> exec; 93 { 94 std::unique_lock<std::mutex> lock(mutex_); 95 condition_.wait(lock, [this] { return !tasks_.empty(); }); 96 auto it = tasks_.begin(); 97 exec = it->second.first; 98 indexes_.erase(it->second.second); 99 tasks_.erase(it); 100 } 101 102 if (exec) { 103 exec(); 104 } 105 } 106 } 107 GenTaskId()108 TaskId GenTaskId() 109 { 110 auto taskId = ++taskId_; 111 if (taskId == INVALID_TASK_ID) { 112 return ++taskId_; 113 } 114 return taskId; 115 } 116 117 volatile bool isRunning_; 118 size_t capacity_; 119 std::multimap<Time, InnerTask> tasks_; 120 std::map<TaskId, decltype(tasks_)::iterator> indexes_; 121 std::mutex mutex_; 122 std::unique_ptr<std::thread> thread_; 123 std::condition_variable condition_; 124 std::atomic<uint64_t> taskId_; 125 }; 126 } // namespace OHOS 127 #endif // OHOS_PREFERENCES_FRAMEWORKS_COMMON_TASK_SCHEDULER_H 128