1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 17 #define OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 18 #include <condition_variable> 19 #include <mutex> 20 #include <queue> 21 #include <thread> 22 23 #include "priority_queue.h" 24 namespace OHOS { 25 26 class Executor : public std::enable_shared_from_this<Executor> { 27 public: 28 using TaskId = uint64_t; 29 using Task = std::function<void()>; 30 using Duration = std::chrono::steady_clock::duration; 31 using Time = std::chrono::steady_clock::time_point; 32 static constexpr Time INVALID_TIME = std::chrono::time_point<std::chrono::steady_clock, std::chrono::seconds>(); 33 static constexpr Duration INVALID_INTERVAL = std::chrono::milliseconds(0); 34 static constexpr uint64_t UNLIMITED_TIMES = std::numeric_limits<uint64_t>::max(); 35 static constexpr Duration INVALID_DELAY = std::chrono::seconds(0); 36 static constexpr TaskId INVALID_TASK_ID = static_cast<uint64_t>(0l); 37 38 enum Status { 39 RUNNING, 40 IS_STOPPING, 41 STOPPED 42 }; 43 struct InnerTask { 44 std::function<void()> exec = []() {}; 45 Duration interval = INVALID_INTERVAL; 46 uint64_t times = UNLIMITED_TIMES; 47 TaskId taskId = INVALID_TASK_ID; 48 InnerTask() = default; 49 ValidInnerTask50 bool Valid() const 51 { 52 return taskId != INVALID_TASK_ID; 53 } 54 }; 55 Executor()56 Executor() 57 : thread_([this] { 58 pthread_setname_np(pthread_self(), "TaskExecutor"); 59 Run(); 60 self_ = nullptr; 61 }) 62 { 63 thread_.detach(); 64 } 65 Bind(PriorityQueue<InnerTask,Time,TaskId> * queue,std::function<bool (std::shared_ptr<Executor>)> idle,std::function<bool (std::shared_ptr<Executor>,bool)> release)66 void Bind(PriorityQueue<InnerTask, Time, TaskId> *queue, std::function<bool(std::shared_ptr<Executor>)> idle, 67 std::function<bool(std::shared_ptr<Executor>, bool)> release) 68 { 69 std::unique_lock<decltype(mutex_)> lock(mutex_); 70 self_ = shared_from_this(); 71 waits_ = queue; 72 idle_ = std::move(idle); 73 release_ = std::move(release); 74 condition_.notify_one(); 75 } 76 77 void Stop(bool wait = false) noexcept 78 { 79 std::unique_lock<decltype(mutex_)> lock(mutex_); 80 running_ = IS_STOPPING; 81 condition_.notify_one(); 82 cond_.wait(lock, [this, wait]() { return !wait || running_ == STOPPED; }); 83 } 84 85 private: 86 static constexpr Duration TIME_OUT = std::chrono::seconds(2); Run()87 void Run() 88 { 89 std::unique_lock<decltype(mutex_)> lock(mutex_); 90 do { 91 do { 92 condition_.wait(lock, [this] { 93 return running_ == IS_STOPPING || waits_ != nullptr; 94 }); 95 while (running_ == RUNNING && waits_ != nullptr && waits_->Size() > 0) { 96 auto currentTask = waits_->Pop(); 97 lock.unlock(); 98 currentTask.exec(); 99 lock.lock(); 100 waits_->Finish(currentTask.taskId); 101 } 102 if (!idle_(self_) && running_ == RUNNING) { 103 continue; 104 } 105 waits_ = nullptr; 106 } while (running_ == RUNNING && 107 condition_.wait_until(lock, std::chrono::steady_clock::now() + TIME_OUT, [this]() { 108 return waits_ != nullptr; 109 })); 110 } while (!release_(self_, running_ == IS_STOPPING)); 111 running_ = STOPPED; 112 cond_.notify_all(); 113 } 114 115 Status running_ = RUNNING; 116 std::mutex mutex_; 117 std::condition_variable condition_; 118 std::condition_variable cond_; 119 std::shared_ptr<Executor> self_; 120 PriorityQueue<InnerTask, Time, TaskId> *waits_ = nullptr; 121 std::function<bool(std::shared_ptr<Executor>)> idle_; 122 std::function<bool(std::shared_ptr<Executor>, bool)> release_; 123 std::thread thread_; 124 }; 125 } // namespace OHOS 126 #endif // OHOS_DISTRIBUTED_DATA_KV_STORE_FRAMEWORKS_COMMON_EXECUTOR_H 127