1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sync_able_kvdb.h"
17
18 #include "db_dump_helper.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "parcel.h"
22 #include "runtime_context.h"
23 #include "user_change_monitor.h"
24
25 namespace DistributedDB {
26 const EventType SyncAbleKvDB::REMOTE_PUSH_FINISHED = 1;
27
SyncAbleKvDB()28 SyncAbleKvDB::SyncAbleKvDB()
29 : started_(false),
30 closed_(false),
31 isSyncModuleActiveCheck_(false),
32 isSyncNeedActive_(true),
33 notifyChain_(nullptr),
34 userChangeListener_(nullptr)
35 {}
36
~SyncAbleKvDB()37 SyncAbleKvDB::~SyncAbleKvDB()
38 {
39 if (notifyChain_ != nullptr) {
40 (void)notifyChain_->UnRegisterEventType(REMOTE_PUSH_FINISHED);
41 KillAndDecObjRef(notifyChain_);
42 notifyChain_ = nullptr;
43 }
44 if (userChangeListener_ != nullptr) {
45 userChangeListener_->Drop(true);
46 userChangeListener_ = nullptr;
47 }
48 }
49
DelConnection(GenericKvDBConnection * connection)50 void SyncAbleKvDB::DelConnection(GenericKvDBConnection *connection)
51 {
52 auto realConnection = static_cast<SyncAbleKvDBConnection *>(connection);
53 if (realConnection != nullptr) {
54 KillAndDecObjRef(realConnection);
55 realConnection = nullptr;
56 }
57 }
58
TriggerSync(int notifyEvent)59 void SyncAbleKvDB::TriggerSync(int notifyEvent)
60 {
61 if (!started_) {
62 StartSyncer();
63 }
64 if (started_) {
65 syncer_.LocalDataChanged(notifyEvent);
66 }
67 }
68
CommitNotify(int notifyEvent,KvDBCommitNotifyFilterAbleData * data)69 void SyncAbleKvDB::CommitNotify(int notifyEvent, KvDBCommitNotifyFilterAbleData *data)
70 {
71 SyncAbleKvDB::TriggerSync(notifyEvent);
72
73 GenericKvDB::CommitNotify(notifyEvent, data);
74 }
75
Close()76 void SyncAbleKvDB::Close()
77 {
78 StopSyncer(true);
79 }
80
81 // Start a sync action.
Sync(const ISyncer::SyncParma & parma,uint64_t connectionId)82 int SyncAbleKvDB::Sync(const ISyncer::SyncParma &parma, uint64_t connectionId)
83 {
84 if (!started_) {
85 int errCode = StartSyncer();
86 if (!started_) {
87 return errCode;
88 }
89 }
90 return syncer_.Sync(parma, connectionId);
91 }
92
EnableAutoSync(bool enable)93 void SyncAbleKvDB::EnableAutoSync(bool enable)
94 {
95 if (!started_) {
96 StartSyncer();
97 }
98 return syncer_.EnableAutoSync(enable);
99 }
100
WakeUpSyncer()101 void SyncAbleKvDB::WakeUpSyncer()
102 {
103 if (!started_) {
104 StartSyncer();
105 }
106 }
107
108 // Stop a sync action in progress.
StopSync(uint64_t connectionId)109 void SyncAbleKvDB::StopSync(uint64_t connectionId)
110 {
111 if (started_) {
112 syncer_.StopSync(connectionId);
113 }
114 }
115
SetSyncModuleActive()116 void SyncAbleKvDB::SetSyncModuleActive()
117 {
118 if (isSyncModuleActiveCheck_) {
119 return;
120 }
121 IKvDBSyncInterface *syncInterface = GetSyncInterface();
122 if (syncInterface == nullptr) {
123 LOGF("KvDB got null sync interface.");
124 return;
125 }
126 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
127 false);
128 if (!isSyncDualTupleMode) {
129 isSyncNeedActive_ = true;
130 isSyncModuleActiveCheck_ = true;
131 return;
132 }
133 isSyncNeedActive_ = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
134 if (!isSyncNeedActive_) {
135 LOGI("syncer no need to active");
136 }
137 isSyncModuleActiveCheck_ = true;
138 }
139
GetSyncModuleActive()140 bool SyncAbleKvDB::GetSyncModuleActive()
141 {
142 return isSyncNeedActive_;
143 }
144
ReSetSyncModuleActive()145 void SyncAbleKvDB::ReSetSyncModuleActive()
146 {
147 isSyncModuleActiveCheck_ = false;
148 isSyncNeedActive_ = true;
149 }
150
151 // Start syncer
StartSyncer(bool isCheckSyncActive,bool isNeedActive)152 int SyncAbleKvDB::StartSyncer(bool isCheckSyncActive, bool isNeedActive)
153 {
154 int errCode = E_OK;
155 {
156 std::unique_lock<std::mutex> lock(syncerOperateLock_);
157 errCode = StartSyncerWithNoLock(isCheckSyncActive, isNeedActive);
158 closed_ = false;
159 }
160 UserChangeHandle();
161 return errCode;
162 }
163
StartSyncerWithNoLock(bool isCheckSyncActive,bool isNeedActive)164 int SyncAbleKvDB::StartSyncerWithNoLock(bool isCheckSyncActive, bool isNeedActive)
165 {
166 IKvDBSyncInterface *syncInterface = GetSyncInterface();
167 if (syncInterface == nullptr) {
168 LOGF("KvDB got null sync interface.");
169 return -E_INVALID_ARGS;
170 }
171 if (!isCheckSyncActive) {
172 SetSyncModuleActive();
173 isNeedActive = GetSyncModuleActive();
174 }
175 int errCode = syncer_.Initialize(syncInterface, isNeedActive);
176 if (errCode == E_OK) {
177 started_ = true;
178 } else {
179 LOGW("KvDB start syncer failed, err:'%d'.", errCode);
180 }
181 bool isSyncDualTupleMode = syncInterface->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE,
182 false);
183 if (isSyncDualTupleMode && isCheckSyncActive && !isNeedActive && (userChangeListener_ == nullptr)) {
184 // active to non_active
185 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
186 std::bind(&SyncAbleKvDB::ChangeUserListerner, this), UserChangeMonitor::USER_ACTIVE_TO_NON_ACTIVE_EVENT);
187 } else if (isSyncDualTupleMode && (userChangeListener_ == nullptr)) {
188 EventType event = isNeedActive ?
189 UserChangeMonitor::USER_ACTIVE_EVENT : UserChangeMonitor::USER_NON_ACTIVE_EVENT;
190 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
191 std::bind(&SyncAbleKvDB::UserChangeHandle, this), event);
192 }
193 return errCode;
194 }
195
196 // Stop syncer
StopSyncer(bool isClosedOperation)197 void SyncAbleKvDB::StopSyncer(bool isClosedOperation)
198 {
199 NotificationChain::Listener *userChangeListener = nullptr;
200 {
201 std::unique_lock<std::mutex> lock(syncerOperateLock_);
202 StopSyncerWithNoLock(isClosedOperation);
203 userChangeListener = userChangeListener_;
204 userChangeListener_ = nullptr;
205 }
206 if (userChangeListener != nullptr) {
207 userChangeListener->Drop(true);
208 userChangeListener = nullptr;
209 }
210 }
211
StopSyncerWithNoLock(bool isClosedOperation)212 void SyncAbleKvDB::StopSyncerWithNoLock(bool isClosedOperation)
213 {
214 ReSetSyncModuleActive();
215 syncer_.Close(isClosedOperation);
216 if (started_) {
217 started_ = false;
218 }
219 closed_ = isClosedOperation;
220 if (!isClosedOperation && userChangeListener_ != nullptr) {
221 userChangeListener_->Drop(false);
222 userChangeListener_ = nullptr;
223 }
224 }
225
UserChangeHandle()226 void SyncAbleKvDB::UserChangeHandle()
227 {
228 bool isNeedChange;
229 bool isNeedActive = true;
230 IKvDBSyncInterface *syncInterface = GetSyncInterface();
231 if (syncInterface == nullptr) {
232 LOGF("KvDB got null sync interface.");
233 return;
234 }
235 std::unique_lock<std::mutex> lock(syncerOperateLock_);
236 if (closed_) {
237 LOGI("kvDB is already closed");
238 return;
239 }
240 isNeedActive = RuntimeContext::GetInstance()->IsSyncerNeedActive(syncInterface->GetDbProperties());
241 isNeedChange = (isNeedActive != isSyncNeedActive_) ? true : false;
242 // non_active to active or active to non_active
243 if (isNeedChange) {
244 StopSyncerWithNoLock(); // will drop userChangeListener
245 isSyncModuleActiveCheck_ = true;
246 isSyncNeedActive_ = isNeedActive;
247 StartSyncerWithNoLock(true, isNeedActive);
248 }
249 }
250
ChangeUserListerner()251 void SyncAbleKvDB::ChangeUserListerner()
252 {
253 // only active to non_active call, put into USER_NON_ACTIVE_EVENT listener from USER_ACTIVE_TO_NON_ACTIVE_EVENT
254 if (userChangeListener_ != nullptr) {
255 userChangeListener_->Drop(false);
256 userChangeListener_ = nullptr;
257 }
258 if (userChangeListener_ == nullptr) {
259 userChangeListener_ = RuntimeContext::GetInstance()->RegisterUserChangedListener(
260 std::bind(&SyncAbleKvDB::UserChangeHandle, this), UserChangeMonitor::USER_NON_ACTIVE_EVENT);
261 }
262 }
263
264 // Get The current virtual timestamp
GetTimestamp()265 uint64_t SyncAbleKvDB::GetTimestamp()
266 {
267 if (!started_ && !isSyncModuleActiveCheck_) {
268 StartSyncer();
269 }
270 return syncer_.GetTimestamp();
271 }
272
273 // Get the dataItem's append length
GetAppendedLen() const274 uint32_t SyncAbleKvDB::GetAppendedLen() const
275 {
276 return Parcel::GetAppendedLen();
277 }
278
EraseDeviceWaterMark(const std::string & deviceId,bool isNeedHash)279 int SyncAbleKvDB::EraseDeviceWaterMark(const std::string &deviceId, bool isNeedHash)
280 {
281 if (!started_) {
282 StartSyncer();
283 }
284 return syncer_.EraseDeviceWaterMark(deviceId, isNeedHash);
285 }
286
GetQueuedSyncSize(int * queuedSyncSize) const287 int SyncAbleKvDB::GetQueuedSyncSize(int *queuedSyncSize) const
288 {
289 return syncer_.GetQueuedSyncSize(queuedSyncSize);
290 }
291
SetQueuedSyncLimit(const int * queuedSyncLimit)292 int SyncAbleKvDB::SetQueuedSyncLimit(const int *queuedSyncLimit)
293 {
294 return syncer_.SetQueuedSyncLimit(queuedSyncLimit);
295 }
296
GetQueuedSyncLimit(int * queuedSyncLimit) const297 int SyncAbleKvDB::GetQueuedSyncLimit(int *queuedSyncLimit) const
298 {
299 return syncer_.GetQueuedSyncLimit(queuedSyncLimit);
300 }
301
DisableManualSync(void)302 int SyncAbleKvDB::DisableManualSync(void)
303 {
304 return syncer_.DisableManualSync();
305 }
306
EnableManualSync(void)307 int SyncAbleKvDB::EnableManualSync(void)
308 {
309 return syncer_.EnableManualSync();
310 }
311
GetLocalIdentity(std::string & outTarget)312 int SyncAbleKvDB::GetLocalIdentity(std::string &outTarget)
313 {
314 return syncer_.GetLocalIdentity(outTarget);
315 }
316
SetStaleDataWipePolicy(WipePolicy policy)317 int SyncAbleKvDB::SetStaleDataWipePolicy(WipePolicy policy)
318 {
319 return syncer_.SetStaleDataWipePolicy(policy);
320 }
321
RegisterEventType(EventType type)322 int SyncAbleKvDB::RegisterEventType(EventType type)
323 {
324 if (notifyChain_ == nullptr) {
325 notifyChain_ = new (std::nothrow) NotificationChain;
326 if (notifyChain_ == nullptr) {
327 return -E_OUT_OF_MEMORY;
328 }
329 }
330
331 int errCode = notifyChain_->RegisterEventType(type);
332 if (errCode == -E_ALREADY_REGISTER) {
333 return E_OK;
334 }
335 if (errCode != E_OK) {
336 LOGE("[SyncAbleKvDB] Register event type %u failed! err %d", type, errCode);
337 KillAndDecObjRef(notifyChain_);
338 notifyChain_ = nullptr;
339 }
340 return errCode;
341 }
342
AddRemotePushFinishedNotify(const RemotePushFinishedNotifier & notifier,int & errCode)343 NotificationChain::Listener *SyncAbleKvDB::AddRemotePushFinishedNotify(const RemotePushFinishedNotifier ¬ifier,
344 int &errCode)
345 {
346 std::unique_lock<std::shared_mutex> lock(notifyChainLock_);
347 errCode = RegisterEventType(REMOTE_PUSH_FINISHED);
348 if (errCode != E_OK) {
349 return nullptr;
350 }
351
352 auto listener = notifyChain_->RegisterListener(REMOTE_PUSH_FINISHED,
353 [notifier](void *arg) {
354 if (arg == nullptr) {
355 LOGE("PragmaRemotePushNotify is null.");
356 return;
357 }
358 notifier(*static_cast<RemotePushNotifyInfo *>(arg));
359 }, nullptr, errCode);
360 if (errCode != E_OK) {
361 LOGE("[SyncAbleKvDB] Add remote push finished notifier failed! err %d", errCode);
362 }
363 return listener;
364 }
365
NotifyRemotePushFinishedInner(const std::string & targetId) const366 void SyncAbleKvDB::NotifyRemotePushFinishedInner(const std::string &targetId) const
367 {
368 {
369 std::shared_lock<std::shared_mutex> lock(notifyChainLock_);
370 if (notifyChain_ == nullptr) {
371 return;
372 }
373 }
374 RemotePushNotifyInfo info;
375 info.deviceId = targetId;
376 notifyChain_->NotifyEvent(REMOTE_PUSH_FINISHED, static_cast<void *>(&info));
377 }
378
SetSyncRetry(bool isRetry)379 int SyncAbleKvDB::SetSyncRetry(bool isRetry)
380 {
381 return syncer_.SetSyncRetry(isRetry);
382 }
383
SetEqualIdentifier(const std::string & identifier,const std::vector<std::string> & targets)384 int SyncAbleKvDB::SetEqualIdentifier(const std::string &identifier, const std::vector<std::string> &targets)
385 {
386 return syncer_.SetEqualIdentifier(identifier, targets);
387 }
388
Dump(int fd)389 void SyncAbleKvDB::Dump(int fd)
390 {
391 SyncerBasicInfo basicInfo = syncer_.DumpSyncerBasicInfo();
392 DBDumpHelper::Dump(fd, "\tisSyncActive = %d, isAutoSync = %d\n\n", basicInfo.isSyncActive,
393 basicInfo.isAutoSync);
394 if (basicInfo.isSyncActive) {
395 DBDumpHelper::Dump(fd, "\tDistributedDB Database Sync Module Message Info:\n");
396 syncer_.Dump(fd);
397 }
398 }
399
GetHashDeviceId(const std::string & clientId,std::string & hashDevId)400 int SyncAbleKvDB::GetHashDeviceId(const std::string &clientId, std::string &hashDevId)
401 {
402 if (!started_) {
403 StartSyncer();
404 }
405 return syncer_.GetHashDeviceId(clientId, hashDevId);
406 }
407 }
408