1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H 17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <queue> 22 #include <thread> 23 24 #include "executor.h" 25 #include "pool.h" 26 #include "priority_queue.h" 27 namespace OHOS { 28 class ExecutorPool { 29 public: 30 using TaskId = Executor::TaskId; 31 using Task = Executor::Task; 32 using Duration = Executor::Duration; 33 using Time = Executor::Time; 34 using InnerTask = Executor::InnerTask; 35 using Status = Executor::Status; 36 using TaskQueue = PriorityQueue<InnerTask, Time, TaskId>; 37 static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>(); 38 static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 39 static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); 40 static constexpr Duration INVALID_DELAY = std::chrono::seconds(0); 41 static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 42 ExecutorPool(size_t max,size_t min)43 ExecutorPool(size_t max, size_t min) 44 : pool_(max, min), delayTasks_(InnerTask(), NextTimer), taskId_(INVALID_TASK_ID) 45 { 46 // When max equals 1, timer thread schedules and executes tasks. 47 if (max > 1) { 48 execs_ = new (std::nothrow) TaskQueue(InnerTask()); 49 } 50 } 51 ~ExecutorPool()52 ~ExecutorPool() 53 { 54 poolStatus = Status::IS_STOPPING; 55 if (execs_ != nullptr) { 56 execs_->Clean(); 57 } 58 delayTasks_.Clean(); 59 std::shared_ptr<Executor> scheduler; 60 { 61 std::lock_guard<decltype(mtx_)> scheduleLock(mtx_); 62 scheduler = std::move(scheduler_); 63 } 64 if (scheduler != nullptr) { 65 scheduler->Stop(true); 66 } 67 pool_.Clean([](std::shared_ptr<Executor> executor) { 68 executor->Stop(true); 69 }); 70 delete execs_; 71 poolStatus = Status::STOPPED; 72 } 73 Execute(Task task)74 TaskId Execute(Task task) 75 { 76 if (poolStatus != Status::RUNNING) { 77 return INVALID_TASK_ID; 78 } 79 80 if (execs_ == nullptr) { 81 return Schedule(std::move(task), INVALID_DELAY, INVALID_INTERVAL, UNLIMITED_TIMES); 82 } 83 84 return Execute(std::move(task), GenTaskId()); 85 } 86 Schedule(Duration delay,Task task)87 TaskId Schedule(Duration delay, Task task) 88 { 89 return Schedule(std::move(task), delay, INVALID_INTERVAL, 1); 90 } 91 Schedule(Task task,Duration interval)92 TaskId Schedule(Task task, Duration interval) 93 { 94 return Schedule(std::move(task), INVALID_DELAY, interval, UNLIMITED_TIMES); 95 } 96 Schedule(Task task,Duration delay,Duration interval)97 TaskId Schedule(Task task, Duration delay, Duration interval) 98 { 99 return Schedule(std::move(task), delay, interval, UNLIMITED_TIMES); 100 } 101 Schedule(Task task,Duration delay,Duration interval,uint64_t times)102 TaskId Schedule(Task task, Duration delay, Duration interval, uint64_t times) 103 { 104 InnerTask innerTask; 105 innerTask.exec = std::move(task); 106 innerTask.interval = interval; 107 innerTask.times = times; 108 innerTask.taskId = GenTaskId(); 109 return Schedule(std::move(innerTask), std::chrono::steady_clock::now() + delay); 110 } 111 112 bool Remove(TaskId taskId, bool wait = false) 113 { 114 bool res = true; 115 auto delay = delayTasks_.Find(taskId); 116 if (!delay.Valid()) { 117 res = false; 118 } 119 delayTasks_.Remove(taskId, wait); 120 if (execs_ != nullptr) { 121 execs_->Remove(taskId, wait); 122 } 123 return res; 124 } 125 Reset(TaskId taskId,Duration interval)126 TaskId Reset(TaskId taskId, Duration interval) 127 { 128 auto updated = delayTasks_.Update(taskId, [interval](InnerTask &task) -> std::pair<bool, Time> { 129 if (task.interval != INVALID_INTERVAL) { 130 task.interval = interval; 131 } 132 auto time = std::chrono::steady_clock::now() + interval; 133 return std::pair{ true, time }; 134 }); 135 return updated ? taskId : INVALID_TASK_ID; 136 } 137 138 private: Execute(Task task,TaskId taskId)139 TaskId Execute(Task task, TaskId taskId) 140 { 141 InnerTask innerTask; 142 innerTask.exec = task; 143 innerTask.taskId = taskId; 144 execs_->Push(std::move(innerTask), taskId, INVALID_TIME); 145 auto executor = pool_.Get(); 146 if (executor == nullptr) { 147 return taskId; 148 } 149 executor->Bind( 150 execs_, 151 [this](std::shared_ptr<Executor> exe) { 152 pool_.Idle(exe); 153 return true; 154 }, 155 [this](std::shared_ptr<Executor> exe, bool force) -> bool { 156 return pool_.Release(exe, force); 157 }); 158 return taskId; 159 } 160 Schedule(InnerTask innerTask,Time delay)161 TaskId Schedule(InnerTask innerTask, Time delay) 162 { 163 auto id = innerTask.taskId; 164 if (execs_ != nullptr) { 165 auto func = innerTask.exec; 166 auto run = [this, func, id]() { 167 Execute(func, id); 168 }; 169 innerTask.exec = run; 170 } 171 delayTasks_.Push(std::move(innerTask), id, delay); 172 std::lock_guard<decltype(mtx_)> scheduleLock(mtx_); 173 if (scheduler_ == nullptr) { 174 scheduler_ = pool_.Get(true); 175 scheduler_->Bind( 176 &delayTasks_, 177 [this](std::shared_ptr<Executor> exe) { 178 std::unique_lock<decltype(mtx_)> lock(mtx_); 179 if (delayTasks_.Size() != 0) { 180 return false; 181 } 182 scheduler_ = nullptr; 183 pool_.Idle(exe); 184 return true; 185 }, 186 [this](std::shared_ptr<Executor> exe, bool force) -> bool { 187 return pool_.Release(exe, force); 188 }); 189 } 190 return innerTask.taskId; 191 } 192 GenTaskId()193 TaskId GenTaskId() 194 { 195 auto taskId = ++taskId_; 196 if (taskId == INVALID_TASK_ID) { 197 taskId = ++taskId_; 198 } 199 return taskId; 200 } 201 NextTimer(InnerTask & task)202 static std::pair<bool, Time> NextTimer(InnerTask &task) 203 { 204 if (task.interval != INVALID_INTERVAL && --task.times > 0) { 205 auto time = std::chrono::steady_clock::now() + task.interval; 206 return { true, time }; 207 } 208 return { false, INVALID_TIME }; 209 } 210 211 Status poolStatus = Status::RUNNING; 212 std::mutex mtx_; 213 Pool<Executor> pool_; 214 TaskQueue delayTasks_; 215 std::shared_ptr<Executor> scheduler_ = nullptr; 216 TaskQueue *execs_ = nullptr; 217 std::atomic<TaskId> taskId_; 218 }; 219 } // namespace OHOS 220 221 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H 222