• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 
16 #ifndef COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H
17 #define COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H
18 #include <functional>
19 #include <memory>
20 #include "executor_pool.h"
21 namespace OHOS {
22 class ActuatorBase : public std::enable_shared_from_this<ActuatorBase> {
23 public:
24     static constexpr uint32_t DEFAULT_FIRST_DELAY_INTERVAL = 0;
25     static constexpr uint32_t DEFAULT_MIN_EXECUTE_INTERVAL = 200;
26     static constexpr uint32_t DEFAULT_MAX_EXECUTE_INTERVAL = 500;
27     static const uint32_t INVALID_INTERVAL = UINT32_MAX;
SetExecutorPool(std::shared_ptr<ExecutorPool> pool)28     void SetExecutorPool(std::shared_ptr<ExecutorPool> pool)
29     {
30         pool_ = std::move(pool);
31     }
Suspend()32     void Suspend()
33     {
34         isSuspend_ = true;
35     }
Active()36     void Active()
37     {
38         isSuspend_ = false;
39     }
40 
41     class Defer final {
42     public:
Defer(std::vector<std::shared_ptr<ActuatorBase>> actuators)43         Defer(std::vector<std::shared_ptr<ActuatorBase>> actuators) : actuators_(std::move(actuators))
44         {
45             for (auto actuator : actuators_) {
46                 if (actuator) {
47                     actuator->Suspend();
48                     actuator->StopTimer(false);
49                 }
50             }
51         }
Defer(std::shared_ptr<ActuatorBase> actuator)52         Defer(std::shared_ptr<ActuatorBase> actuator) : Defer(std::vector<std::shared_ptr<ActuatorBase>>({actuator}))
53         {}
~Defer()54         ~Defer()
55         {
56             for (auto actuator : actuators_) {
57                 if (actuator) {
58                     actuator->Active();
59                     actuator->StartTimer();
60                 }
61             }
62         }
63         Defer(const Defer &defer) = delete;
64         Defer(Defer &&defer) = delete;
65         Defer &operator=(const Defer &defer) = delete;
66         Defer &operator=(Defer &&defer) = delete;
67         void *operator new(size_t size) = delete;
68         void *operator new[](size_t size) = delete;
69         void operator delete(void *) = delete;
70         void operator delete[](void *) = delete;
71 
72     private:
73         std::vector<std::shared_ptr<ActuatorBase>> actuators_;
74     };
75 protected:
76     ActuatorBase(uint32_t firstInterval = DEFAULT_FIRST_DELAY_INTERVAL,
77         uint32_t minInterval = DEFAULT_MIN_EXECUTE_INTERVAL, uint32_t maxInterval = DEFAULT_MAX_EXECUTE_INTERVAL)
firstDelayInterval_(firstInterval)78         : firstDelayInterval_(firstInterval), minExecuteInterval_(minInterval), maxExecuteInterval_(maxInterval)
79     {
80     }
~ActuatorBase()81     virtual ~ActuatorBase()
82     {
83         StopTimer(true);
84     }
85 
StartTimer()86     void StartTimer()
87     {
88         std::lock_guard<std::mutex> lock(mutex_);
89         if (pool_ == nullptr || isSuspend_) {
90             return;
91         }
92         auto weakThis = weak_from_this();
93         if (forceTaskId_ == Executor::INVALID_TASK_ID && maxExecuteInterval_ != INVALID_INTERVAL) {
94             forceTaskId_ = pool_->Schedule(std::chrono::milliseconds(maxExecuteInterval_), [weakThis]() {
95                 auto self = weakThis.lock();
96                 if (self != nullptr) {
97                     self->ExecuteTask();
98                     self->lastExecuteTimePoint_ = GetTimeStamp();
99                 }
100             });
101         }
102         if (firstDelayInterval_ == INVALID_INTERVAL || minExecuteInterval_ == INVALID_INTERVAL) {
103             return;
104         }
105         if (delayTaskId_ == Executor::INVALID_TASK_ID) {
106             delayTaskId_ =
107                 pool_->Schedule(std::chrono::milliseconds(lastExecuteTimePoint_ <= GetTimeStamp() - minExecuteInterval_
108                                                               ? firstDelayInterval_
109                                                               : minExecuteInterval_),
110                     [weakThis]() {
111                         auto self = weakThis.lock();
112                         if (self != nullptr) {
113                             self->ExecuteTask();
114                             self->lastExecuteTimePoint_ = GetTimeStamp();
115                         }
116                     });
117         } else {
118             delayTaskId_ = pool_->Reset(delayTaskId_, std::chrono::milliseconds(minExecuteInterval_));
119         }
120     }
StopTimer(bool wait)121     void StopTimer(bool wait)
122     {
123         std::lock_guard<std::mutex> lock(mutex_);
124         if (pool_ != nullptr) {
125             pool_->Remove(forceTaskId_, wait);
126             pool_->Remove(delayTaskId_, wait);
127         }
128         forceTaskId_ = Executor::INVALID_TASK_ID;
129         delayTaskId_ = Executor::INVALID_TASK_ID;
130     }
131 
ExecuteTask()132     virtual void ExecuteTask() {};
GetTimeStamp()133     static inline uint64_t GetTimeStamp()
134     {
135         return std::chrono::duration_cast<std::chrono::milliseconds>(
136             std::chrono::steady_clock::now().time_since_epoch())
137             .count();
138     }
139 private:
140     ExecutorPool::TaskId delayTaskId_ = ExecutorPool::INVALID_TASK_ID;
141     ExecutorPool::TaskId forceTaskId_ = ExecutorPool::INVALID_TASK_ID;
142     std::shared_ptr<ExecutorPool> pool_ = nullptr;
143     bool isSuspend_ = false;
144     std::atomic_uint64_t lastExecuteTimePoint_ = 0;
145     uint32_t const firstDelayInterval_ = DEFAULT_FIRST_DELAY_INTERVAL;
146     uint32_t const minExecuteInterval_ = DEFAULT_MIN_EXECUTE_INTERVAL;
147     uint32_t const maxExecuteInterval_ = DEFAULT_MAX_EXECUTE_INTERVAL;
148     mutable std::mutex mutex_;
149 };
150 
151 template<class T, class MergeFunc = std::function<void(T &out, T &&input)>, class Task = std::function<int32_t(T &&)>>
152 class DelayActuator : public ActuatorBase {
153 public:
154     DelayActuator(MergeFunc mergeFunc = nullptr, uint32_t firstInterval = DEFAULT_FIRST_DELAY_INTERVAL,
155         uint32_t minInterval = DEFAULT_MIN_EXECUTE_INTERVAL, uint32_t maxInterval = DEFAULT_MAX_EXECUTE_INTERVAL)
ActuatorBase(firstInterval,minInterval,maxInterval)156         : ActuatorBase(firstInterval, minInterval, maxInterval), data_(T()), mergeFunc_(mergeFunc)
157     {
158     }
~DelayActuator()159     ~DelayActuator() {}
160 
SetTask(Task task)161     void SetTask(Task task)
162     {
163         task_ = std::move(task);
164     }
165     template<class D = T>
Execute(D && value)166     void Execute(D &&value)
167     {
168         {
169             std::lock_guard<std::mutex> lock(mutex_);
170             if (mergeFunc_) {
171                 mergeFunc_(data_, std::forward<D>(value));
172             } else {
173                 data_ = T{std::move(value)};
174             }
175         }
176         StartTimer();
177     }
178 
179 private:
180     Task task_;
181     T data_;
182     MergeFunc mergeFunc_;
183     std::mutex mutex_;
184 
ExecuteTask()185     void ExecuteTask() override
186     {
187         StopTimer(false);
188         T data;
189         {
190             std::lock_guard<std::mutex> lock(mutex_);
191             data = std::move(data_);
192             data_ = T();
193         }
194         if (task_) {
195             task_(std::move(data));
196         }
197     }
198 };
199 } // namespace OHOS
200 #endif // COMMON_RDB_INCLUDE_DELAY_ACTUATOR_H