1 /* 2 * Copyright (c) 2025 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H 17 #define COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H 18 #include <functional> 19 #include <memory> 20 #include "executor_pool.h" 21 namespace OHOS { 22 class ActuatorBase : public std::enable_shared_from_this<ActuatorBase> { 23 public: 24 static constexpr uint32_t DEFAULT_FIRST_DELAY_INTERVAL = 0; 25 static constexpr uint32_t DEFAULT_MIN_EXECUTE_INTERVAL = 200; 26 static constexpr uint32_t DEFAULT_MAX_EXECUTE_INTERVAL = 500; 27 static const uint32_t INVALID_INTERVAL = UINT32_MAX; SetExecutorPool(std::shared_ptr<ExecutorPool> pool)28 void SetExecutorPool(std::shared_ptr<ExecutorPool> pool) 29 { 30 pool_ = std::move(pool); 31 } Suspend()32 void Suspend() 33 { 34 isSuspend_ = true; 35 } Active()36 void Active() 37 { 38 isSuspend_ = false; 39 } 40 41 class Defer final { 42 public: Defer(std::vector<std::shared_ptr<ActuatorBase>> actuators)43 Defer(std::vector<std::shared_ptr<ActuatorBase>> actuators) : actuators_(std::move(actuators)) 44 { 45 for (auto actuator : actuators_) { 46 if (actuator) { 47 actuator->Suspend(); 48 actuator->StopTimer(false); 49 } 50 } 51 } Defer(std::shared_ptr<ActuatorBase> actuator)52 Defer(std::shared_ptr<ActuatorBase> actuator) : Defer(std::vector<std::shared_ptr<ActuatorBase>>({actuator})) 53 {} ~Defer()54 ~Defer() 55 { 56 for (auto actuator : actuators_) { 57 if (actuator) { 58 actuator->Active(); 59 actuator->StartTimer(); 60 } 61 } 62 } 63 Defer(const Defer &defer) = delete; 64 Defer(Defer &&defer) = delete; 65 Defer &operator=(const Defer &defer) = delete; 66 Defer &operator=(Defer &&defer) = delete; 67 void *operator new(size_t size) = delete; 68 void *operator new[](size_t size) = delete; 69 void operator delete(void *) = delete; 70 void operator delete[](void *) = delete; 71 72 private: 73 std::vector<std::shared_ptr<ActuatorBase>> actuators_; 74 }; 75 protected: 76 ActuatorBase(uint32_t firstInterval = DEFAULT_FIRST_DELAY_INTERVAL, 77 uint32_t minInterval = DEFAULT_MIN_EXECUTE_INTERVAL, uint32_t maxInterval = DEFAULT_MAX_EXECUTE_INTERVAL) firstDelayInterval_(firstInterval)78 : firstDelayInterval_(firstInterval), minExecuteInterval_(minInterval), maxExecuteInterval_(maxInterval) 79 { 80 } ~ActuatorBase()81 virtual ~ActuatorBase() 82 { 83 StopTimer(true); 84 } 85 StartTimer()86 void StartTimer() 87 { 88 std::lock_guard<std::mutex> lock(mutex_); 89 if (pool_ == nullptr || isSuspend_) { 90 return; 91 } 92 auto weakThis = weak_from_this(); 93 if (forceTaskId_ == Executor::INVALID_TASK_ID && maxExecuteInterval_ != INVALID_INTERVAL) { 94 forceTaskId_ = pool_->Schedule(std::chrono::milliseconds(maxExecuteInterval_), [weakThis]() { 95 auto self = weakThis.lock(); 96 if (self != nullptr) { 97 self->ExecuteTask(); 98 self->lastExecuteTimePoint_ = GetTimeStamp(); 99 } 100 }); 101 } 102 if (firstDelayInterval_ == INVALID_INTERVAL || minExecuteInterval_ == INVALID_INTERVAL) { 103 return; 104 } 105 if (delayTaskId_ == Executor::INVALID_TASK_ID) { 106 delayTaskId_ = 107 pool_->Schedule(std::chrono::milliseconds(lastExecuteTimePoint_ <= GetTimeStamp() - minExecuteInterval_ 108 ? firstDelayInterval_ 109 : minExecuteInterval_), 110 [weakThis]() { 111 auto self = weakThis.lock(); 112 if (self != nullptr) { 113 self->ExecuteTask(); 114 self->lastExecuteTimePoint_ = GetTimeStamp(); 115 } 116 }); 117 } else { 118 delayTaskId_ = pool_->Reset(delayTaskId_, std::chrono::milliseconds(minExecuteInterval_)); 119 } 120 } StopTimer(bool wait)121 void StopTimer(bool wait) 122 { 123 std::lock_guard<std::mutex> lock(mutex_); 124 if (pool_ != nullptr) { 125 pool_->Remove(forceTaskId_, wait); 126 pool_->Remove(delayTaskId_, wait); 127 } 128 forceTaskId_ = Executor::INVALID_TASK_ID; 129 delayTaskId_ = Executor::INVALID_TASK_ID; 130 } 131 ExecuteTask()132 virtual void ExecuteTask() {}; GetTimeStamp()133 static inline uint64_t GetTimeStamp() 134 { 135 return std::chrono::duration_cast<std::chrono::milliseconds>( 136 std::chrono::steady_clock::now().time_since_epoch()) 137 .count(); 138 } 139 private: 140 ExecutorPool::TaskId delayTaskId_ = ExecutorPool::INVALID_TASK_ID; 141 ExecutorPool::TaskId forceTaskId_ = ExecutorPool::INVALID_TASK_ID; 142 std::shared_ptr<ExecutorPool> pool_ = nullptr; 143 bool isSuspend_ = false; 144 std::atomic_uint64_t lastExecuteTimePoint_ = 0; 145 uint32_t const firstDelayInterval_ = DEFAULT_FIRST_DELAY_INTERVAL; 146 uint32_t const minExecuteInterval_ = DEFAULT_MIN_EXECUTE_INTERVAL; 147 uint32_t const maxExecuteInterval_ = DEFAULT_MAX_EXECUTE_INTERVAL; 148 mutable std::mutex mutex_; 149 }; 150 151 template<class T, class MergeFunc = std::function<void(T &out, T &&input)>, class Task = std::function<int32_t(T &&)>> 152 class DelayActuator : public ActuatorBase { 153 public: 154 DelayActuator(MergeFunc mergeFunc = nullptr, uint32_t firstInterval = DEFAULT_FIRST_DELAY_INTERVAL, 155 uint32_t minInterval = DEFAULT_MIN_EXECUTE_INTERVAL, uint32_t maxInterval = DEFAULT_MAX_EXECUTE_INTERVAL) ActuatorBase(firstInterval,minInterval,maxInterval)156 : ActuatorBase(firstInterval, minInterval, maxInterval), data_(T()), mergeFunc_(mergeFunc) 157 { 158 } ~DelayActuator()159 ~DelayActuator() {} 160 SetTask(Task task)161 void SetTask(Task task) 162 { 163 task_ = std::move(task); 164 } 165 template<class D = T> Execute(D && value)166 void Execute(D &&value) 167 { 168 { 169 std::lock_guard<std::mutex> lock(mutex_); 170 if (mergeFunc_) { 171 mergeFunc_(data_, std::forward<D>(value)); 172 } else { 173 data_ = T{std::move(value)}; 174 } 175 } 176 StartTimer(); 177 } 178 179 private: 180 Task task_; 181 T data_; 182 MergeFunc mergeFunc_; 183 std::mutex mutex_; 184 ExecuteTask()185 void ExecuteTask() override 186 { 187 StopTimer(false); 188 T data; 189 { 190 std::lock_guard<std::mutex> lock(mutex_); 191 data = std::move(data_); 192 data_ = T(); 193 } 194 if (task_) { 195 task_(std::move(data)); 196 } 197 } 198 }; 199 } // namespace OHOS 200 #endif // COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H