• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H
17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <queue>
22 #include <thread>
23 
24 #include "executor.h"
25 #include "pool.h"
26 #include "priority_queue.h"
27 namespace OHOS {
28 class ExecutorPool {
29 public:
30     using TaskId = Executor::TaskId;
31     using Task = Executor::Task;
32     using Duration = Executor::Duration;
33     using Time = Executor::Time;
34     using InnerTask = Executor::InnerTask;
35     using Status = Executor::Status;
36     using TaskQueue = PriorityQueue<InnerTask, Time, TaskId>;
37     static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>();
38     static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0);
39     static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max();
40     static constexpr Duration INVALID_DELAY = std::chrono::seconds(0);
41     static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l);
42 
ExecutorPool(size_t max,size_t min)43     ExecutorPool(size_t max, size_t min)
44         : pool_(max, min), delayTasks_(InnerTask(), NextTimer), taskId_(INVALID_TASK_ID)
45     {
46         // When max equals 1, timer thread schedules and executes tasks.
47         if (max > 1) {
48             execs_ = new (std::nothrow) TaskQueue(InnerTask());
49         }
50     }
51 
~ExecutorPool()52     ~ExecutorPool()
53     {
54         poolStatus = Status::IS_STOPPING;
55         if (execs_ != nullptr) {
56             execs_->Clean();
57         }
58         delayTasks_.Clean();
59         std::shared_ptr<Executor> scheduler;
60         {
61             std::lock_guard<decltype(mtx_)> scheduleLock(mtx_);
62             scheduler = std::move(scheduler_);
63         }
64         if (scheduler != nullptr) {
65             scheduler->Stop(true);
66         }
67         pool_.Clean([](std::shared_ptr<Executor> executor) {
68             executor->Stop(true);
69         });
70         delete execs_;
71         poolStatus = Status::STOPPED;
72     }
73 
Execute(Task task)74     TaskId Execute(Task task)
75     {
76         if (poolStatus != Status::RUNNING) {
77             return INVALID_TASK_ID;
78         }
79 
80         if (execs_ == nullptr) {
81             return Schedule(std::move(task), INVALID_DELAY, INVALID_INTERVAL, UNLIMITED_TIMES);
82         }
83 
84         return Execute(std::move(task), GenTaskId());
85     }
86 
Schedule(Duration delay,Task task)87     TaskId Schedule(Duration delay, Task task)
88     {
89         return Schedule(std::move(task), delay, INVALID_INTERVAL, 1);
90     }
91 
Schedule(Task task,Duration interval)92     TaskId Schedule(Task task, Duration interval)
93     {
94         return Schedule(std::move(task), INVALID_DELAY, interval, UNLIMITED_TIMES);
95     }
96 
Schedule(Task task,Duration delay,Duration interval)97     TaskId Schedule(Task task, Duration delay, Duration interval)
98     {
99         return Schedule(std::move(task), delay, interval, UNLIMITED_TIMES);
100     }
101 
Schedule(Task task,Duration delay,Duration interval,uint64_t times)102     TaskId Schedule(Task task, Duration delay, Duration interval, uint64_t times)
103     {
104         InnerTask innerTask;
105         innerTask.exec = std::move(task);
106         innerTask.interval = interval;
107         innerTask.times = times;
108         innerTask.taskId = GenTaskId();
109         return Schedule(std::move(innerTask), std::chrono::steady_clock::now() + delay);
110     }
111 
112     bool Remove(TaskId taskId, bool wait = false)
113     {
114         bool res = true;
115         auto delay = delayTasks_.Find(taskId);
116         if (!delay.Valid()) {
117             res = false;
118         }
119         delayTasks_.Remove(taskId, wait);
120         if (execs_ != nullptr) {
121             execs_->Remove(taskId, wait);
122         }
123         return res;
124     }
125 
Reset(TaskId taskId,Duration interval)126     TaskId Reset(TaskId taskId, Duration interval)
127     {
128         auto updated = delayTasks_.Update(taskId, [interval](InnerTask &task) -> std::pair<bool, Time> {
129             if (task.interval != INVALID_INTERVAL) {
130                 task.interval = interval;
131             }
132             auto time = std::chrono::steady_clock::now() + interval;
133             return std::pair{ true, time };
134         });
135         return updated ? taskId : INVALID_TASK_ID;
136     }
137 
138 private:
Execute(Task task,TaskId taskId)139     TaskId Execute(Task task, TaskId taskId)
140     {
141         InnerTask innerTask;
142         innerTask.exec = task;
143         innerTask.taskId = taskId;
144         execs_->Push(std::move(innerTask), taskId, INVALID_TIME);
145         auto executor = pool_.Get();
146         if (executor == nullptr) {
147             return taskId;
148         }
149         executor->Bind(
150             execs_,
151             [this](std::shared_ptr<Executor> exe) {
152                 pool_.Idle(exe);
153                 return true;
154             },
155             [this](std::shared_ptr<Executor> exe, bool force) -> bool {
156                 return pool_.Release(exe, force);
157             });
158         return taskId;
159     }
160 
Schedule(InnerTask innerTask,Time delay)161     TaskId Schedule(InnerTask innerTask, Time delay)
162     {
163         auto id = innerTask.taskId;
164         if (execs_ != nullptr) {
165             auto func = innerTask.exec;
166             auto run = [this, func, id]() {
167                 Execute(func, id);
168             };
169             innerTask.exec = run;
170         }
171         delayTasks_.Push(std::move(innerTask), id, delay);
172         std::lock_guard<decltype(mtx_)> scheduleLock(mtx_);
173         if (scheduler_ == nullptr) {
174             scheduler_ = pool_.Get(true);
175             scheduler_->Bind(
176                 &delayTasks_,
177                 [this](std::shared_ptr<Executor> exe) {
178                     std::unique_lock<decltype(mtx_)> lock(mtx_);
179                     if (delayTasks_.Size() != 0) {
180                         return false;
181                     }
182                     scheduler_ = nullptr;
183                     pool_.Idle(exe);
184                     return true;
185                 },
186                 [this](std::shared_ptr<Executor> exe, bool force) -> bool {
187                     return pool_.Release(exe, force);
188                 });
189         }
190         return innerTask.taskId;
191     }
192 
GenTaskId()193     TaskId GenTaskId()
194     {
195         auto taskId = ++taskId_;
196         if (taskId == INVALID_TASK_ID) {
197             taskId = ++taskId_;
198         }
199         return taskId;
200     }
201 
NextTimer(InnerTask & task)202     static std::pair<bool, Time> NextTimer(InnerTask &task)
203     {
204         if (task.interval != INVALID_INTERVAL && --task.times > 0) {
205             auto time = std::chrono::steady_clock::now() + task.interval;
206             return { true, time };
207         }
208         return { false, INVALID_TIME };
209     }
210 
211     Status poolStatus = Status::RUNNING;
212     std::mutex mtx_;
213     Pool<Executor> pool_;
214     TaskQueue delayTasks_;
215     std::shared_ptr<Executor> scheduler_ = nullptr;
216     TaskQueue *execs_ = nullptr;
217     std::atomic<TaskId> taskId_;
218 };
219 } // namespace OHOS
220 
221 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_POOL_H
222