• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_natural_store.h"
17 
18 #include <algorithm>
19 #include <thread>
20 #include <chrono>
21 
22 #include "data_compression.h"
23 #include "db_common.h"
24 #include "db_constant.h"
25 #include "db_dump_helper.h"
26 #include "db_dfx_adapter.h"
27 #include "db_errno.h"
28 #include "generic_single_ver_kv_entry.h"
29 #include "intercepted_data_impl.h"
30 #include "kvdb_utils.h"
31 #include "log_print.h"
32 #include "platform_specific.h"
33 #include "schema_object.h"
34 #include "single_ver_database_oper.h"
35 #include "single_ver_utils.h"
36 #include "storage_engine_manager.h"
37 #include "sqlite_single_ver_natural_store_connection.h"
38 #include "value_hash_calc.h"
39 
40 namespace DistributedDB {
GetDbPropertyForUpdate()41 KvDBProperties &SQLiteSingleVerNaturalStore::GetDbPropertyForUpdate()
42 {
43     return MyProp();
44 }
45 
HeartBeatForLifeCycle() const46 void SQLiteSingleVerNaturalStore::HeartBeatForLifeCycle() const
47 {
48     std::lock_guard<std::mutex> lock(lifeCycleMutex_);
49     int errCode = ResetLifeCycleTimer();
50     if (errCode != E_OK) {
51         LOGE("Heart beat for life cycle failed:%d", errCode);
52     }
53 }
54 
StartLifeCycleTimer(const DatabaseLifeCycleNotifier & notifier) const55 int SQLiteSingleVerNaturalStore::StartLifeCycleTimer(const DatabaseLifeCycleNotifier &notifier) const
56 {
57     auto runtimeCxt = RuntimeContext::GetInstance();
58     if (runtimeCxt == nullptr) {
59         return -E_INVALID_ARGS;
60     }
61     RefObject::IncObjRef(this);
62     TimerId timerId = 0;
63     int errCode = runtimeCxt->SetTimer(autoLifeTime_,
64         [this](TimerId id) -> int {
65             std::lock_guard<std::mutex> lock(lifeCycleMutex_);
66             if (lifeCycleNotifier_) {
67                 std::string identifier;
68                 if (GetMyProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false)) {
69                     identifier = GetMyProperties().GetStringProp(KvDBProperties::DUAL_TUPLE_IDENTIFIER_DATA, "");
70                 } else {
71                     identifier = GetMyProperties().GetStringProp(KvDBProperties::IDENTIFIER_DATA, "");
72                 }
73                 auto userId = GetMyProperties().GetStringProp(DBProperties::USER_ID, "");
74                 lifeCycleNotifier_(identifier, userId);
75             }
76             return 0;
77         },
78         [this]() {
79             int ret = RuntimeContext::GetInstance()->ScheduleTask([this]() {
80                 RefObject::DecObjRef(this);
81             });
82             if (ret != E_OK) {
83                 LOGE("SQLiteSingleVerNaturalStore timer finalizer ScheduleTask, errCode %d", ret);
84             }
85         },
86         timerId);
87     if (errCode != E_OK) {
88         lifeTimerId_ = 0;
89         LOGE("SetTimer failed:%d", errCode);
90         RefObject::DecObjRef(this);
91         return errCode;
92     }
93 
94     lifeCycleNotifier_ = notifier;
95     lifeTimerId_ = timerId;
96     return E_OK;
97 }
98 
ResetLifeCycleTimer() const99 int SQLiteSingleVerNaturalStore::ResetLifeCycleTimer() const
100 {
101     if (lifeTimerId_ == 0) {
102         return E_OK;
103     }
104     auto lifeNotifier = lifeCycleNotifier_;
105     lifeCycleNotifier_ = nullptr;
106     int errCode = StopLifeCycleTimer();
107     if (errCode != E_OK) {
108         LOGE("[Reset timer]Stop the life cycle timer failed:%d", errCode);
109     }
110     return StartLifeCycleTimer(lifeNotifier);
111 }
112 
StopLifeCycleTimer() const113 int SQLiteSingleVerNaturalStore::StopLifeCycleTimer() const
114 {
115     auto runtimeCxt = RuntimeContext::GetInstance();
116     if (runtimeCxt == nullptr) {
117         return -E_INVALID_ARGS;
118     }
119     if (lifeTimerId_ != 0) {
120         TimerId timerId = lifeTimerId_;
121         lifeTimerId_ = 0;
122         runtimeCxt->RemoveTimer(timerId, false);
123     }
124     return E_OK;
125 }
126 
IsDataMigrating() const127 bool SQLiteSingleVerNaturalStore::IsDataMigrating() const
128 {
129     if (storageEngine_ == nullptr) {
130         return false;
131     }
132 
133     if (storageEngine_->IsMigrating()) {
134         LOGD("Migrating now.");
135         return true;
136     }
137     return false;
138 }
139 
SetConnectionFlag(bool isExisted) const140 void SQLiteSingleVerNaturalStore::SetConnectionFlag(bool isExisted) const
141 {
142     if (storageEngine_ != nullptr) {
143         storageEngine_->SetConnectionFlag(isExisted);
144     }
145 }
146 
TriggerToMigrateData() const147 int SQLiteSingleVerNaturalStore::TriggerToMigrateData() const
148 {
149     SQLiteSingleVerStorageEngine *storageEngine = nullptr;
150     {
151         std::lock_guard<std::shared_mutex> autoLock(engineMutex_);
152         if (storageEngine_ == nullptr) {
153             return E_OK;
154         }
155         storageEngine = storageEngine_;
156         RefObject::IncObjRef(storageEngine);
157     }
158     RefObject::IncObjRef(this);
159     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, storageEngine]() {
160         AsyncDataMigration(storageEngine);
161     });
162     if (errCode != E_OK) {
163         RefObject::DecObjRef(this);
164         RefObject::DecObjRef(storageEngine);
165         LOGE("[SingleVerNStore] Trigger to migrate data failed : %d.", errCode);
166     }
167     return errCode;
168 }
169 
IsCacheDBMode() const170 bool SQLiteSingleVerNaturalStore::IsCacheDBMode() const
171 {
172     if (storageEngine_ == nullptr) {
173         LOGE("[SingleVerNStore] IsCacheDBMode storage engine is invalid.");
174         return false;
175     }
176     EngineState engineState = storageEngine_->GetEngineState();
177     return (engineState == EngineState::CACHEDB);
178 }
179 
IsExtendedCacheDBMode() const180 bool SQLiteSingleVerNaturalStore::IsExtendedCacheDBMode() const
181 {
182     if (storageEngine_ == nullptr) {
183         LOGE("[SingleVerNStore] storage engine is invalid.");
184         return false;
185     }
186     EngineState engineState = storageEngine_->GetEngineState();
187     return (engineState == EngineState::CACHEDB || engineState == EngineState::MIGRATING ||
188         engineState == EngineState::ATTACHING);
189 }
190 
CheckReadDataControlled() const191 int SQLiteSingleVerNaturalStore::CheckReadDataControlled() const
192 {
193     if (IsExtendedCacheDBMode()) {
194         int err = IsCacheDBMode() ? -E_EKEYREVOKED : -E_BUSY;
195         LOGE("Existed cache database can not read data, errCode = [%d]!", err);
196         return err;
197     }
198     return E_OK;
199 }
200 
IncreaseCacheRecordVersion() const201 void SQLiteSingleVerNaturalStore::IncreaseCacheRecordVersion() const
202 {
203     if (storageEngine_ == nullptr) {
204         LOGE("[SingleVerNStore] Increase cache version storage engine is invalid.");
205         return;
206     }
207     storageEngine_->IncreaseCacheRecordVersion();
208 }
209 
GetCacheRecordVersion() const210 uint64_t SQLiteSingleVerNaturalStore::GetCacheRecordVersion() const
211 {
212     if (storageEngine_ == nullptr) {
213         LOGE("[SingleVerNStore] Get cache version storage engine is invalid.");
214         return 0;
215     }
216     return storageEngine_->GetCacheRecordVersion();
217 }
218 
GetAndIncreaseCacheRecordVersion() const219 uint64_t SQLiteSingleVerNaturalStore::GetAndIncreaseCacheRecordVersion() const
220 {
221     if (storageEngine_ == nullptr) {
222         LOGE("[SingleVerNStore] Get and increase cache version storage engine is invalid.");
223         return 0;
224     }
225     return storageEngine_->GetAndIncreaseCacheRecordVersion();
226 }
227 
CheckAmendValueContentForSyncProcedure(std::vector<DataItem> & dataItems) const228 void SQLiteSingleVerNaturalStore::CheckAmendValueContentForSyncProcedure(std::vector<DataItem> &dataItems) const
229 {
230     const SchemaObject &schemaObjRef = MyProp().GetSchemaConstRef();
231     if (!schemaObjRef.IsSchemaValid()) {
232         // Not a schema database, do not need to check more
233         return;
234     }
235     uint32_t deleteCount = 0;
236     uint32_t amendCount = 0;
237     uint32_t neglectCount = 0;
238     for (auto &eachItem : dataItems) {
239         if ((eachItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG ||
240             (eachItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) {
241             // Delete record not concerned
242             deleteCount++;
243             continue;
244         }
245         bool useAmendValue = false;
246         int errCode = CheckValueAndAmendIfNeed(ValueSource::FROM_SYNC, eachItem.value, eachItem.value, useAmendValue);
247         if (errCode != E_OK) {
248             eachItem.neglect = true;
249             neglectCount++;
250             continue;
251         }
252         if (useAmendValue) {
253             amendCount++;
254         }
255     }
256     LOGI("[SqlSinStore][CheckAmendForSync] OriCount=%zu, DeleteCount=%u, AmendCount=%u, NeglectCount=%u",
257         dataItems.size(), deleteCount, amendCount, neglectCount);
258 }
259 
NotifyRemotePushFinished(const std::string & targetId) const260 void SQLiteSingleVerNaturalStore::NotifyRemotePushFinished(const std::string &targetId) const
261 {
262     std::string identifier = DBCommon::VectorToHexString(GetIdentifier());
263     LOGI("label:%.6s sourceTarget: %s{private} push finished", identifier.c_str(), targetId.c_str());
264     NotifyRemotePushFinishedInner(targetId);
265 }
266 
CheckIntegrity() const267 int SQLiteSingleVerNaturalStore::CheckIntegrity() const
268 {
269     int errCode = E_OK;
270     auto handle = GetHandle(true, errCode);
271     if (handle == nullptr) {
272         return errCode;
273     }
274 
275     errCode = handle->CheckIntegrity();
276     ReleaseHandle(handle);
277     return errCode;
278 }
279 
SaveCreateDBTimeIfNotExisted()280 int SQLiteSingleVerNaturalStore::SaveCreateDBTimeIfNotExisted()
281 {
282     Timestamp createDBTime = 0;
283     int errCode = GetDatabaseCreateTimestamp(createDBTime);
284     if (errCode == -E_NOT_FOUND) {
285         errCode = SaveCreateDBTime();
286     }
287     if (errCode != E_OK) {
288         LOGE("SaveCreateDBTimeIfNotExisted failed, errCode=%d.", errCode);
289     }
290     return errCode;
291 }
292 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const293 int SQLiteSingleVerNaturalStore::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
294 {
295     if (keyPrefix.empty() || keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
296         return -E_INVALID_ARGS;
297     }
298 
299     int errCode = E_OK;
300     auto handle = GetHandle(true, errCode);
301     if (handle == nullptr) {
302         return errCode;
303     }
304 
305     errCode = handle->DeleteMetaDataByPrefixKey(keyPrefix);
306     if (errCode != E_OK) {
307         LOGE("[SinStore] DeleteMetaData by prefix key failed, errCode = %d", errCode);
308     }
309 
310     ReleaseHandle(handle);
311     HeartBeatForLifeCycle();
312     return errCode;
313 }
314 
GetCompressionOption(bool & needCompressOnSync,uint8_t & compressionRate) const315 int SQLiteSingleVerNaturalStore::GetCompressionOption(bool &needCompressOnSync, uint8_t &compressionRate) const
316 {
317     needCompressOnSync = GetDbProperties().GetBoolProp(DBProperties::COMPRESS_ON_SYNC, false);
318     compressionRate = GetDbProperties().GetIntProp(DBProperties::COMPRESSION_RATE,
319         DBConstant::DEFAULT_COMPTRESS_RATE);
320     return E_OK;
321 }
322 
GetCompressionAlgo(std::set<CompressAlgorithm> & algorithmSet) const323 int SQLiteSingleVerNaturalStore::GetCompressionAlgo(std::set<CompressAlgorithm> &algorithmSet) const
324 {
325     algorithmSet.clear();
326     DataCompression::GetCompressionAlgo(algorithmSet);
327     return E_OK;
328 }
329 
CheckAndInitQueryCondition(QueryObject & query) const330 int SQLiteSingleVerNaturalStore::CheckAndInitQueryCondition(QueryObject &query) const
331 {
332     const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
333     if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
334         // Flatbuffer schema is not support subscribe
335         return -E_NOT_SUPPORT;
336     }
337     query.SetSchema(localSchema);
338 
339     int errCode = E_OK;
340     SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
341     if (handle == nullptr) {
342         return errCode;
343     }
344 
345     errCode = handle->CheckQueryObjectLegal(query);
346     if (errCode != E_OK) {
347         LOGE("Check query condition failed [%d]!", errCode);
348     }
349     ReleaseHandle(handle);
350     return errCode;
351 }
352 
SetSendDataInterceptor(const PushDataInterceptor & interceptor)353 void SQLiteSingleVerNaturalStore::SetSendDataInterceptor(const PushDataInterceptor &interceptor)
354 {
355     std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_);
356     pushDataInterceptor_ = interceptor;
357 }
358 
InterceptData(std::vector<SingleVerKvEntry * > & entries,const std::string & sourceID,const std::string & targetID,bool isPush) const359 int SQLiteSingleVerNaturalStore::InterceptData(std::vector<SingleVerKvEntry *> &entries, const std::string &sourceID,
360     const std::string &targetID, bool isPush) const
361 {
362     PushDataInterceptor interceptor = nullptr;
363     {
364         std::shared_lock<std::shared_mutex> lock(dataInterceptorMutex_);
365         interceptor = isPush ? pushDataInterceptor_ : receiveDataInterceptor_;
366         if (interceptor == nullptr) {
367             return E_OK;
368         }
369     }
370 
371     InterceptedDataImpl data(entries, [this](const Value &newValue) -> int {
372             bool useAmendValue = false;
373             Value amendValue = newValue;
374             return this->CheckValueAndAmendIfNeed(ValueSource::FROM_LOCAL, newValue, amendValue, useAmendValue);
375         }
376     );
377 
378     int errCode = interceptor(data, sourceID, targetID);
379     if (data.IsError()) {
380         if (isPush) {
381             // receive data release by syncer
382             SingleVerKvEntry::Release(entries);
383         }
384         LOGE("Intercept data failed:%d.", errCode);
385         return -E_INTERCEPT_DATA_FAIL;
386     }
387     return E_OK;
388 }
389 
AddSubscribe(const std::string & subscribeId,const QueryObject & query,bool needCacheSubscribe)390 int SQLiteSingleVerNaturalStore::AddSubscribe(const std::string &subscribeId, const QueryObject &query,
391     bool needCacheSubscribe)
392 {
393     if (IsSupportSubscribe() != E_OK) {
394         return -E_NOT_SUPPORT;
395     }
396     const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
397     QueryObject queryInner = query;
398     queryInner.SetSchema(localSchema);
399     if (IsExtendedCacheDBMode() && needCacheSubscribe) { // cache auto subscribe when engine state is in CACHEDB mode
400         LOGI("Cache subscribe query and return ok when in cacheDB.");
401         storageEngine_->CacheSubscribe(subscribeId, queryInner);
402         return E_OK;
403     }
404 
405     int errCode = E_OK;
406     SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
407     if (handle == nullptr) {
408         return errCode;
409     }
410 
411     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
412     if (errCode != E_OK) {
413         ReleaseHandle(handle);
414         return errCode;
415     }
416 
417     errCode = handle->AddSubscribeTrigger(queryInner, subscribeId);
418     if (errCode != E_OK) {
419         LOGE("Add subscribe trigger failed: %d", errCode);
420         (void)handle->Rollback();
421     } else {
422         errCode = handle->Commit();
423     }
424     ReleaseHandle(handle);
425     return errCode;
426 }
427 
SetMaxLogSize(uint64_t limit)428 int SQLiteSingleVerNaturalStore::SetMaxLogSize(uint64_t limit)
429 {
430     if (maxLogSize_ != limit) {
431         LOGI("Set the max log size to %" PRIu64, limit);
432     }
433     maxLogSize_.store(limit);
434     return E_OK;
435 }
436 
SetMaxValueSize(uint32_t maxValueSize)437 int SQLiteSingleVerNaturalStore::SetMaxValueSize(uint32_t maxValueSize)
438 {
439     storageEngine_->SetMaxValueSize(maxValueSize);
440     return E_OK;
441 }
442 
GetMaxValueSize() const443 uint32_t SQLiteSingleVerNaturalStore::GetMaxValueSize() const
444 {
445     if (storageEngine_ == nullptr) {
446         LOGE("[SingleVerNStore] Get max value size storage engine is invalid.");
447         return DBConstant::MAX_VALUE_SIZE;
448     }
449     return storageEngine_->GetMaxValueSize();
450 }
451 
GetMaxLogSize() const452 uint64_t SQLiteSingleVerNaturalStore::GetMaxLogSize() const
453 {
454     return maxLogSize_.load();
455 }
456 
Dump(int fd)457 void SQLiteSingleVerNaturalStore::Dump(int fd)
458 {
459     std::string userId = MyProp().GetStringProp(DBProperties::USER_ID, "");
460     std::string appId = MyProp().GetStringProp(DBProperties::APP_ID, "");
461     std::string storeId = MyProp().GetStringProp(DBProperties::STORE_ID, "");
462     std::string label = MyProp().GetStringProp(DBProperties::IDENTIFIER_DATA, "");
463     label = DBCommon::TransferStringToHex(label);
464     DBDumpHelper::Dump(fd, "\tdb userId = %s, appId = %s, storeId = %s, label = %s\n",
465         userId.c_str(), appId.c_str(), storeId.c_str(), label.c_str());
466     SyncAbleKvDB::Dump(fd);
467 }
468 
IsSupportSubscribe() const469 int SQLiteSingleVerNaturalStore::IsSupportSubscribe() const
470 {
471     const SchemaObject &localSchema = MyProp().GetSchemaConstRef();
472     if (localSchema.GetSchemaType() != SchemaType::NONE && localSchema.GetSchemaType() != SchemaType::JSON) {
473         // Flatbuffer schema is not support subscribe
474         return -E_NOT_SUPPORT;
475     }
476     return E_OK;
477 }
478 
EraseAllDeviceWaterMark(const std::string & hashDev)479 int SQLiteSingleVerNaturalStore::EraseAllDeviceWaterMark(const std::string &hashDev)
480 {
481     int errCode = E_OK;
482     std::set<std::string> removeDevices;
483     if (hashDev.empty()) {
484         errCode = GetExistsDeviceList(removeDevices);
485         if (errCode != E_OK) {
486             LOGE("[SingleVerNStore] get remove device list failed:%d", errCode);
487             return errCode;
488         }
489     } else {
490         removeDevices.insert(hashDev);
491     }
492 
493     LOGD("[SingleVerNStore] remove device data, size=%zu", removeDevices.size());
494     for (const auto &iterDevice : removeDevices) {
495         // Call the syncer module to erase the water mark.
496         errCode = EraseDeviceWaterMark(iterDevice, false);
497         if (errCode != E_OK) {
498             LOGE("[SingleVerNStore] erase water mark failed:%d", errCode);
499             return errCode;
500         }
501     }
502     return errCode;
503 }
504 
RemoveDeviceDataInner(const std::string & hashDev,bool isNeedNotify)505 std::function<int(void)> SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev,
506     bool isNeedNotify)
507 {
508     return [this, hashDev, isNeedNotify]()->int {
509         int errCode = E_OK;
510         SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
511         if (handle == nullptr) {
512             LOGE("[SingleVerNStore] RemoveDeviceData get handle failed:%d", errCode);
513             return errCode;
514         }
515         uint64_t logFileSize = handle->GetLogFileSize();
516         ReleaseHandle(handle);
517         if (logFileSize > GetMaxLogSize()) {
518             LOGW("[SingleVerNStore] RmDevData log size[%" PRIu64 "] over the limit", logFileSize);
519             return -E_LOG_OVER_LIMITS;
520         }
521 
522         errCode = EraseAllDeviceWaterMark(hashDev);
523         if (errCode != E_OK) {
524             LOGE("[SingleVerNStore] Erase all device water mark with notify failed: %d", errCode);
525             return errCode;
526         }
527 
528 #ifdef USE_DISTRIBUTEDDB_CLOUD
529         CleanAllWaterMark();
530 #endif
531         if (IsExtendedCacheDBMode()) {
532             errCode = RemoveDeviceDataInCacheMode(hashDev, isNeedNotify);
533         } else {
534             errCode = RemoveDeviceDataNormally(hashDev, isNeedNotify);
535         }
536         if (errCode != E_OK) {
537             LOGE("[SingleVerNStore] RemoveDeviceData failed:%d", errCode);
538         }
539         return errCode;
540     };
541 }
542 
RemoveDeviceDataInner(const std::string & hashDev,ClearMode mode)543 std::function<int(void)> SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev, ClearMode mode)
544 {
545     return [this, hashDev, mode]()->int {
546         int errCode = E_OK;
547         SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
548         if (handle == nullptr) {
549             LOGE("[SingleVerNStore] RemoveDeviceData with mode get handle failed:%d", errCode);
550             return errCode;
551         }
552         errCode = handle->StartTransaction(TransactType::IMMEDIATE);
553         if (errCode != E_OK) {
554             LOGE("Start transaction failed %d in RemoveDeviceData.", errCode);
555             ReleaseHandle(handle);
556             return errCode;
557         }
558         errCode = handle->RemoveDeviceData(hashDev, mode);
559         if (errCode != E_OK) {
560             LOGE("RemoveDeviceData failed: %d", errCode);
561             (void)handle->Rollback();
562         } else {
563             errCode = handle->Commit();
564             if (errCode != E_OK) {
565                 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode);
566             }
567         }
568         ReleaseHandle(handle);
569         return errCode;
570     };
571 }
572 
RemoveDeviceDataInner(const std::string & hashDev,const std::string & user,ClearMode mode)573 std::function<int(void)> SQLiteSingleVerNaturalStore::RemoveDeviceDataInner(const std::string &hashDev,
574     const std::string &user, ClearMode mode)
575 {
576     return [this, hashDev, user, mode]()->int {
577         int errCode = E_OK;
578         SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
579         if (handle == nullptr) {
580             LOGE("[SingleVerNStore] RemoveDeviceData with user and mode get handle failed:%d", errCode);
581             return errCode;
582         }
583         errCode = handle->StartTransaction(TransactType::IMMEDIATE);
584         if (errCode != E_OK) {
585             LOGE("Start transaction failed %d in RemoveDeviceData.", errCode);
586             ReleaseHandle(handle);
587             return errCode;
588         }
589         errCode = handle->RemoveDeviceData(hashDev, user, mode);
590         if (errCode != E_OK) {
591             LOGE("RemoveDeviceData failed: %d", errCode);
592             (void)handle->Rollback();
593         } else {
594             errCode = handle->Commit();
595             if (errCode != E_OK) {
596                 LOGE("Transaction commit failed %d in RemoveDeviceData.", errCode);
597             }
598         }
599         ReleaseHandle(handle);
600         return errCode;
601     };
602 }
603 
AbortHandle()604 void SQLiteSingleVerNaturalStore::AbortHandle()
605 {
606     std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
607     abortPerm_ = OperatePerm::RESTART_SYNC_PERM;
608 }
609 
EnableHandle()610 void SQLiteSingleVerNaturalStore::EnableHandle()
611 {
612     std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
613     abortPerm_ = OperatePerm::NORMAL_PERM;
614 }
615 
TryHandle() const616 int SQLiteSingleVerNaturalStore::TryHandle() const
617 {
618     std::unique_lock<std::shared_mutex> lock(abortHandleMutex_);
619     if (abortPerm_ == OperatePerm::RESTART_SYNC_PERM) {
620         LOGW("[SingleVerNStore] Restarting sync, handle id[%s] is busy",
621             DBCommon::TransferStringToHex(storageEngine_->GetIdentifier()).c_str());
622         return -E_BUSY;
623     }
624     return E_OK;
625 }
626 
GetStorageExecutor(bool isWrite)627 std::pair<int, SQLiteSingleVerStorageExecutor*> SQLiteSingleVerNaturalStore::GetStorageExecutor(bool isWrite)
628 {
629     int errCode = E_OK;
630     SQLiteSingleVerStorageExecutor *handle = GetHandle(isWrite, errCode);
631     return {errCode, handle};
632 }
633 
RecycleStorageExecutor(SQLiteSingleVerStorageExecutor * executor)634 void SQLiteSingleVerNaturalStore::RecycleStorageExecutor(SQLiteSingleVerStorageExecutor *executor)
635 {
636     ReleaseHandle(executor);
637 }
638 
GetLocalTimeOffsetForCloud()639 TimeOffset SQLiteSingleVerNaturalStore::GetLocalTimeOffsetForCloud()
640 {
641     return GetLocalTimeOffset();
642 }
643 
RegisterObserverAction(const KvStoreObserver * observer,const ObserverAction & action)644 int SQLiteSingleVerNaturalStore::RegisterObserverAction(const KvStoreObserver *observer, const ObserverAction &action)
645 {
646     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
647     if (sqliteCloudKvStore_ == nullptr) {
648         return -E_INTERNAL_ERROR;
649     }
650     sqliteCloudKvStore_->RegisterObserverAction(observer, action);
651     return E_OK;
652 }
653 
UnRegisterObserverAction(const KvStoreObserver * observer)654 int SQLiteSingleVerNaturalStore::UnRegisterObserverAction(const KvStoreObserver *observer)
655 {
656     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
657     if (sqliteCloudKvStore_ == nullptr) {
658         return -E_INTERNAL_ERROR;
659     }
660     sqliteCloudKvStore_->UnRegisterObserverAction(observer);
661     return E_OK;
662 }
663 
GetCloudVersion(const std::string & device,std::map<std::string,std::string> & versionMap)664 int SQLiteSingleVerNaturalStore::GetCloudVersion(const std::string &device,
665     std::map<std::string, std::string> &versionMap)
666 {
667     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
668     if (sqliteCloudKvStore_ == nullptr) {
669         return -E_INTERNAL_ERROR;
670     }
671     return sqliteCloudKvStore_->GetCloudVersion(device, versionMap);
672 }
673 
SetReceiveDataInterceptor(const DataInterceptor & interceptor)674 void SQLiteSingleVerNaturalStore::SetReceiveDataInterceptor(const DataInterceptor &interceptor)
675 {
676     std::unique_lock<std::shared_mutex> lock(dataInterceptorMutex_);
677     receiveDataInterceptor_ = interceptor;
678 }
679 
SetCloudSyncConfig(const CloudSyncConfig & config)680 int SQLiteSingleVerNaturalStore::SetCloudSyncConfig(const CloudSyncConfig &config)
681 {
682     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
683     if (sqliteCloudKvStore_ == nullptr) {
684         LOGE("[SingleVerNStore] DB is null when set config");
685         return -E_INTERNAL_ERROR;
686     }
687     sqliteCloudKvStore_->SetCloudSyncConfig(config);
688     return E_OK;
689 }
690 
GetCloudSyncConfig() const691 CloudSyncConfig SQLiteSingleVerNaturalStore::GetCloudSyncConfig() const
692 {
693     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
694     if (sqliteCloudKvStore_ == nullptr) {
695         LOGE("[SingleVerNStore] DB is null when get config");
696         CloudSyncConfig config;
697         return config;
698     }
699     return sqliteCloudKvStore_->GetCloudSyncConfig();
700 }
701 
702 #ifdef USE_DISTRIBUTEDDB_CLOUD
ClearCloudWatermark()703 int SQLiteSingleVerNaturalStore::ClearCloudWatermark()
704 {
705     auto syncer = GetAndIncCloudSyncer();
706     if (syncer == nullptr) {
707         LOGE("[SingleVerNStore] Cloud syncer was not initialized");
708         return -E_INVALID_DB;
709     }
710     auto clearFunc = ClearCloudWatermarkInner();
711     int errCode = syncer->ClearCloudWatermark(clearFunc);
712     if (errCode != E_OK) {
713         LOGE("[SingleVerNStore] Clear cloud watermark failed: %d", errCode);
714         DecObjRef(syncer);
715         return errCode;
716     }
717     CleanAllWaterMark();
718     DecObjRef(syncer);
719     return errCode;
720 }
721 
ClearCloudWatermarkInner()722 std::function<int(void)> SQLiteSingleVerNaturalStore::ClearCloudWatermarkInner()
723 {
724     return [this]()->int {
725         int errCode = E_OK;
726         SQLiteSingleVerStorageExecutor *handle = GetHandle(true, errCode);
727         if (handle == nullptr) {
728             LOGE("[SingleVerNStore] get handle failed when clear cloud watermark:%d.", errCode);
729             return errCode;
730         }
731         errCode = handle->StartTransaction(TransactType::IMMEDIATE);
732         if (errCode != E_OK) {
733             LOGE("[SingleVerNStore] start transaction failed when clear cloud watermark:%d.", errCode);
734             ReleaseHandle(handle);
735             return errCode;
736         }
737         errCode = handle->ClearCloudWatermark();
738         if (errCode != E_OK) {
739             LOGE("[SingleVerNStore] clear cloud watermark failed: %d", errCode);
740             (void)handle->Rollback();
741         } else {
742             errCode = handle->Commit();
743             if (errCode != E_OK) {
744                 LOGE("[SingleVerNStore] transaction commit failed %d in RemoveDeviceData.", errCode);
745             }
746         }
747         ReleaseHandle(handle);
748         return errCode;
749     };
750 }
751 #endif
752 
OperateDataStatus(uint32_t dataOperator)753 int SQLiteSingleVerNaturalStore::OperateDataStatus(uint32_t dataOperator)
754 {
755     std::lock_guard<std::mutex> autoLock(cloudStoreMutex_);
756     if (sqliteCloudKvStore_ == nullptr) {
757         LOGE("[SingleVerNStore] DB is null when operate data status");
758         return -E_INTERNAL_ERROR;
759     }
760     return sqliteCloudKvStore_->OperateDataStatus(dataOperator);
761 }
762 }
763