• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "DelayNotify"
16 #include "delay_notify.h"
17 
18 #include "logger.h"
19 #include "task_executor.h"
20 namespace OHOS::NativeRdb {
21 using namespace OHOS::Rdb;
DelayNotify()22 DelayNotify::DelayNotify() : pauseCount_(0), task_(nullptr), pool_(nullptr)
23 {
24 }
25 
~DelayNotify()26 DelayNotify::~DelayNotify()
27 {
28     if (pool_ == nullptr) {
29         return;
30     }
31     if (delaySyncTaskId_ != TaskExecutor::INVALID_TASK_ID) {
32         pool_->Remove(delaySyncTaskId_, true);
33     }
34     if (task_ != nullptr && changedData_.tableData.size() > 0) {
35         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
36         rdbNotifyConfig.delay_ = 0;
37         rdbNotifyConfig.isFull_ = isFull_;
38         auto errCode = task_(changedData_, rdbNotifyConfig);
39         if (errCode != 0) {
40             LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
41         }
42     }
43 }
44 
UpdateNotify(const DistributedRdb::RdbChangedData & changedData,bool isFull)45 void DelayNotify::UpdateNotify(const DistributedRdb::RdbChangedData &changedData, bool isFull)
46 {
47     LOG_DEBUG("Update changed data.");
48     {
49         std::lock_guard<std::mutex> lock(mutex_);
50         for (auto &[k, v] : changedData.tableData) {
51             if (!v.isTrackedDataChange && !v.isP2pSyncDataChange && !v.isKnowledgeDataChange) {
52                 continue;
53             }
54             auto it = changedData_.tableData.find(k);
55             if (it == changedData_.tableData.end()) {
56                 changedData_.tableData.insert_or_assign(k, v);
57             }
58         }
59         isFull_ |= isFull;
60     }
61     StartTimer();
62 }
63 
SetExecutorPool(std::shared_ptr<ExecutorPool> pool)64 void DelayNotify::SetExecutorPool(std::shared_ptr<ExecutorPool> pool)
65 {
66     if (pool_ != nullptr) {
67         return;
68     }
69     pool_ = pool;
70 }
71 
SetTask(Task task)72 void DelayNotify::SetTask(Task task)
73 {
74     task_ = std::move(task);
75 }
76 
StartTimer()77 void DelayNotify::StartTimer()
78 {
79     DistributedRdb::RdbChangedData changedData;
80     bool needExecTask = false;
81     bool isFull = false;
82     {
83         std::lock_guard<std::mutex> lock(mutex_);
84         changedData.tableData = changedData_.tableData;
85         isFull = isFull_;
86         if (pool_ == nullptr) {
87             return;
88         }
89 
90         if (delaySyncTaskId_ == TaskExecutor::INVALID_TASK_ID) {
91             std::weak_ptr<DelayNotify> delayNotify = shared_from_this();
92             delaySyncTaskId_ = pool_->Schedule(std::chrono::milliseconds(autoSyncInterval_),
93                 [delayNotify]() {
94                     auto lockedDelayNotify = delayNotify.lock();
95                     if (lockedDelayNotify != nullptr) {
96                         lockedDelayNotify->ExecuteTask();
97                     }
98                 });
99         } else {
100             delaySyncTaskId_ =
101                 pool_->Reset(delaySyncTaskId_, std::chrono::milliseconds(autoSyncInterval_));
102         }
103 
104         if (changedData.tableData.empty()) {
105             return;
106         }
107 
108         if (!isInitialized_) {
109             needExecTask = true;
110             lastTimePoint_ = std::chrono::steady_clock::now();
111             isInitialized_ = true;
112         } else {
113             Time curTime = std::chrono::steady_clock::now();
114             auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastTimePoint_);
115             if (duration >= std::chrono::milliseconds(MAX_NOTIFY_INTERVAL)) {
116                 needExecTask = true;
117                 lastTimePoint_ = std::chrono::steady_clock::now();
118             }
119         }
120     }
121 
122     if (needExecTask) {
123         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
124         rdbNotifyConfig.delay_ = SERVICE_INTERVAL;
125         rdbNotifyConfig.isFull_ = isFull;
126         task_(changedData, rdbNotifyConfig);
127     }
128 }
129 
StopTimer()130 void DelayNotify::StopTimer()
131 {
132     if (pool_ != nullptr) {
133         pool_->Remove(delaySyncTaskId_);
134     }
135     delaySyncTaskId_ = TaskExecutor::INVALID_TASK_ID;
136 }
137 
ExecuteTask()138 void DelayNotify::ExecuteTask()
139 {
140     LOG_DEBUG("Notify data change.");
141     DistributedRdb::RdbChangedData changedData;
142     bool isFull = false;
143     {
144         std::lock_guard<std::mutex> lock(mutex_);
145         changedData.tableData = std::move(changedData_.tableData);
146         isFull = isFull_;
147         RestoreDefaultSyncInterval();
148         StopTimer();
149         isFull_ = false;
150     }
151     if (task_ != nullptr && (changedData.tableData.size() > 0 || isFull)) {
152         DistributedRdb::RdbNotifyConfig rdbNotifyConfig;
153         rdbNotifyConfig.delay_ = 0;
154         rdbNotifyConfig.isFull_ = isFull;
155         int errCode = task_(changedData, rdbNotifyConfig);
156         if (errCode != 0) {
157             LOG_ERROR("NotifyDataChange is failed, err is %{public}d.", errCode);
158             std::lock_guard<std::mutex> lock(mutex_);
159             for (auto &[k, v] : changedData.tableData) {
160                 changedData_.tableData.insert_or_assign(k, v);
161             }
162             return;
163         }
164     }
165 }
166 
SetAutoSyncInterval(uint32_t interval)167 void DelayNotify::SetAutoSyncInterval(uint32_t interval)
168 {
169     autoSyncInterval_ = interval;
170 }
171 
RestoreDefaultSyncInterval()172 void DelayNotify::RestoreDefaultSyncInterval()
173 {
174     autoSyncInterval_ = AUTO_SYNC_INTERVAL;
175 }
176 
Pause()177 void DelayNotify::Pause()
178 {
179     StopTimer();
180     pauseCount_.fetch_add(1, std::memory_order_relaxed);
181 }
182 
Resume()183 void DelayNotify::Resume()
184 {
185     pauseCount_.fetch_sub(1, std::memory_order_relaxed);
186     if (pauseCount_.load() == 0) {
187         StartTimer();
188     }
189 }
190 
PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier)191 PauseDelayNotify::PauseDelayNotify(std::shared_ptr<DelayNotify> delayNotifier) : delayNotifier_(delayNotifier)
192 {
193     if (delayNotifier_ != nullptr) {
194         delayNotifier_->Pause();
195         delayNotifier_->SetAutoSyncInterval(AUTO_SYNC_MAX_INTERVAL);
196     }
197 }
198 
~PauseDelayNotify()199 PauseDelayNotify::~PauseDelayNotify()
200 {
201     if (delayNotifier_ != nullptr) {
202         delayNotifier_->Resume();
203     }
204 }
205 } // namespace OHOS::NativeRdb