1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "DelayNotify"
16 #include "delay_notify.h"
17
18 #include "logger.h"
19 #include "task_executor.h"
20 namespace OHOS::NativeRdb {
21 using namespace OHOS::Rdb;
DelayNotify()22 DelayNotify::DelayNotify() : pauseCount_(0), task_(nullptr), pool_(nullptr)
23 {
24 }
25
~DelayNotify()26 DelayNotify::~DelayNotify()
27 {
28 if (pool_ == nullptr) {
29 return;
30 }
31 if (delaySyncTaskId_ != TaskExecutor::INVALID_TASK_ID) {
32 pool_->Remove(delaySyncTaskId_);
33 }
34 if (task_ != nullptr && changedData_.tableData.size() > 0) {
35 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
36 rdbNotifyConfig.delay_ = 0;
37 rdbNotifyConfig.isFull_ = isFull_;
38 auto errCode = task_(changedData_, rdbNotifyConfig);
39 if (errCode != 0) {
40 LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
41 }
42 }
43 }
44
UpdateNotify(const DistributedRdb::RdbChangedData & changedData,bool isFull)45 void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)
46 {
47 LOG_DEBUG("Update changed data.");
48 {
49 std::lock_guard<std::mutex> lock(mutex_);
50 for (auto &[k, v] : changedData.tableData) {
51 if (!v.isTrackedDataChange && !v.isP2pSyncDataChange) {
52 continue;
53 }
54 auto it = changedData_.tableData.find(k);
55 if (it == changedData_.tableData.end()) {
56 changedData_.tableData.insert_or_assign(k, v);
57 }
58 }
59 isFull_ |= isFull;
60 }
61 StartTimer();
62 }
63
SetExecutorPool(std::shared_ptr<ExecutorPool> pool)64 void DelayNotify::SetExecutorPool(std::shared_ptr<ExecutorPool> pool)
65 {
66 if (pool_ != nullptr) {
67 return;
68 }
69 pool_ = pool;
70 }
71
SetTask(Task task)72 void DelayNotify::SetTask(Task task)
73 {
74 task_ = std::move(task);
75 }
76
StartTimer()77 void DelayNotify::StartTimer()
78 {
79 DistributedRdb::RdbChangedData changedData;
80 bool needExecTask = false;
81 bool isFull = false;
82 {
83 std::lock_guard<std::mutex> lock(mutex_);
84 changedData.tableData = changedData_.tableData;
85 isFull = isFull_;
86 if (pool_ == nullptr) {
87 return;
88 }
89
90 if (delaySyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
91 delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
92 [this]() { ExecuteTask(); });
93 } else {
94 delaySyncTaskId_ =
95 pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
96 }
97
98 if (changedData.tableData.empty()) {
99 return;
100 }
101
102 if (!isInitialized_) {
103 needExecTask = true;
104 lastTimePoint_ = std::chrono::steady_clock::now();
105 isInitialized_ = true;
106 } else {
107 Time curTime = std::chrono::steady_clock::now();
108 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastTimePoint_);
109 if (duration >= std::chrono::milliseconds(MAX_NOTIFY_INTERVAL)) {
110 needExecTask = true;
111 lastTimePoint_ = std::chrono::steady_clock::now();
112 }
113 }
114 }
115
116 if (needExecTask) {
117 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
118 rdbNotifyConfig.delay_ = SERVICE_INTERVAL;
119 rdbNotifyConfig.isFull_ = isFull;
120 task_(changedData, rdbNotifyConfig);
121 }
122 }
123
StopTimer()124 void DelayNotify::StopTimer()
125 {
126 if (pool_ != nullptr) {
127 pool_->Remove(delaySyncTaskId_);
128 }
129 delaySyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
130 }
131
ExecuteTask()132 void DelayNotify::ExecuteTask()
133 {
134 LOG_DEBUG("Notify data change.");
135 DistributedRdb::RdbChangedData changedData;
136 bool isFull = false;
137 {
138 std::lock_guard<std::mutex> lock(mutex_);
139 changedData.tableData = std::move(changedData_.tableData);
140 isFull = isFull_;
141 RestoreDefaultSyncInterval();
142 StopTimer();
143 isFull_ = false;
144 }
145 if (task_ != nullptr && (changedData.tableData.size() > 0 || isFull)) {
146 DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
147 rdbNotifyConfig.delay_ = 0;
148 rdbNotifyConfig.isFull_ = isFull;
149 int errCode = task_(changedData, rdbNotifyConfig);
150 if (errCode != 0) {
151 LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
152 std::lock_guard<std::mutex> lock(mutex_);
153 for (auto &[k, v] : changedData.tableData) {
154 changedData_.tableData.insert_or_assign(k, v);
155 }
156 return;
157 }
158 }
159 }
160
SetAutoSyncInterval(uint32_t interval)161 void DelayNotify::SetAutoSyncInterval(uint32_t interval)
162 {
163 autoSyncInterval_ = interval;
164 }
165
RestoreDefaultSyncInterval()166 void DelayNotify::RestoreDefaultSyncInterval()
167 {
168 autoSyncInterval_ = AUTO_SYNC_INTERVAL;
169 }
170
Pause()171 void DelayNotify::Pause()
172 {
173 StopTimer();
174 pauseCount_.fetch_add(1, std::memory_order_relaxed);
175 }
176
Resume()177 void DelayNotify::Resume()
178 {
179 pauseCount_.fetch_sub(1, std::memory_order_relaxed);
180 if (pauseCount_.load() == 0) {
181 StartTimer();
182 }
183 }
184
PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier)185 PauseDelayNotify::PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier) : delayNotifier_(delayNotifier)
186 {
187 if (delayNotifier_ != nullptr) {
188 delayNotifier_->Pause();
189 delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
190 }
191 }
192
~PauseDelayNotify()193 PauseDelayNotify::~PauseDelayNotify()
194 {
195 if (delayNotifier_ != nullptr) {
196 delayNotifier_->Resume();
197 }
198 }
199 } // namespace OHOS::NativeRdb