• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
18 #include <condition_variable>
19 #include <mutex>
20 #include <queue>
21 #include <thread>
22 
23 #include "priority_queue.h"
24 namespace OHOS {
25 
26 class Executor : public std::enable_shared_from_this<Executor> {
27 public:
28     using TaskId = uint64_t;
29     using Task = std::function<void()>;
30     using Duration = std::chrono::steady_clock::duration;
31     using Time = std::chrono::steady_clock::time_point;
32     static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>();
33     static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0);
34     static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max();
35     static constexpr Duration INVALID_DELAY = std::chrono::seconds(0);
36     static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l);
37 
38     enum Status {
39         RUNNING,
40         IS_STOPPING,
41         STOPPED
42     };
43     struct InnerTask {
44         std::function<void()> exec = []() {};
45         Duration interval = INVALID_INTERVAL;
46         uint64_t times = UNLIMITED_TIMES;
47         TaskId taskId = INVALID_TASK_ID;
48         InnerTask() = default;
49 
ValidInnerTask50         bool Valid() const
51         {
52             return taskId != INVALID_TASK_ID;
53         }
54     };
55 
Executor()56     Executor()
57         : thread_([this] {
58               pthread_setname_np(pthread_self(), "TaskExecutor");
59               Run();
60               self_ = nullptr;
61           })
62     {
63         thread_.detach();
64     }
65 
Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)66     void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle,
67         std::function<bool(std::shared_ptr<Executor>, bool)> release)
68     {
69         std::unique_lock<decltype(mutex_)> lock(mutex_);
70         self_ = shared_from_this();
71         waits_ = queue;
72         idle_ = std::move(idle);
73         release_ = std::move(release);
74         condition_.notify_one();
75     }
76 
77     void Stop(bool wait = false) noexcept
78     {
79         std::unique_lock<decltype(mutex_)> lock(mutex_);
80         running_ = IS_STOPPING;
81         condition_.notify_one();
82         cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; });
83     }
84 
85 private:
86     static constexpr Duration TIME_OUT = std::chrono::seconds(2);
Run()87     void Run()
88     {
89         std::unique_lock<decltype(mutex_)> lock(mutex_);
90         do {
91             do {
92                 condition_.wait(lock, [this] {
93                     return running_ == IS_STOPPING || waits_ != nullptr;
94                 });
95                 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) {
96                     auto currentTask = waits_->Pop();
97                     lock.unlock();
98                     currentTask.exec();
99                     lock.lock();
100                     waits_->Finish(currentTask.taskId);
101                 }
102                 if (!idle_(self_) && running_ == RUNNING) {
103                     continue;
104                 }
105                 waits_ = nullptr;
106             } while (running_ == RUNNING &&
107                      condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() {
108                          return waits_ != nullptr;
109                      }));
110         } while (!release_(self_, running_ == IS_STOPPING));
111         running_ = STOPPED;
112         cond_.notify_all();
113     }
114 
115     Status running_ = RUNNING;
116     std::mutex mutex_;
117     std::condition_variable condition_;
118     std::condition_variable cond_;
119     std::shared_ptr<Executor> self_;
120     PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr;
121     std::function<bool(std::shared_ptr<Executor>)> idle_;
122     std::function<bool(std::shared_ptr<Executor>, bool)> release_;
123     std::thread thread_;
124 };
125 } // namespace OHOS
126 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H
127