• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_single_ver_database_upgrader.h"
30 #include "sqlite_single_ver_natural_store.h"
31 #include "sqlite_single_ver_schema_database_upgrader.h"
32 
33 namespace DistributedDB {
SQLiteSingleVerStorageEngine()34 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
35     : executorState_(ExecutorState::INVALID),
36       cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
37       isCorrupted_(false),
38       isNeedUpdateSecOpt_(false)
39 {}
40 
~SQLiteSingleVerStorageEngine()41 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
42 {
43 }
44 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const45 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
46 {
47     return handle->MigrateLocalData();
48 }
49 
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)50 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
51 {
52     auto kvdbManager = KvDBManager::GetInstance();
53     if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
54         return -E_INVALID_DB;
55     }
56     auto identifier = GetIdentifier();
57     auto kvdb = kvdbManager->FindKvDB(identifier);
58     if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
59         LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
60         return -E_INVALID_DB;
61     }
62 
63     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
64     for (const auto &devId : removeDevices) {
65         int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
66         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
67             RefObject::DecObjRef(kvdb);
68             return errCode;
69         }
70     }
71 
72     RefObject::DecObjRef(kvdb);
73     return E_OK;
74 }
75 
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const76 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
77     std::set<std::string> &removeDevices, bool &isNeedHash) const
78 {
79     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
80         return -E_INVALID_DB;
81     }
82     if (item.value.empty()) { // Device ID has been set to value in cache db
83         // Empty means remove all device data, get device id from meta key
84         // LCOV_EXCL_BR_LINE
85         int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
86         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
87             LOGE("Get remove devices list from meta failed. err=%d", errCode);
88             return errCode;
89         }
90         isNeedHash = false;
91     } else {
92         std::string deviceName;
93         DBCommon::VectorToString(item.value, deviceName);
94         removeDevices.insert(deviceName);
95     }
96     return E_OK;
97 }
98 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)99 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
100     const std::vector<DataItem> &dataItems)
101 {
102     int errCode = E_OK;
103     for (const auto &dataItem : dataItems) {
104         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
105             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
106             bool isNeedHash = true;
107             std::set<std::string> removeDevices;
108             errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
109             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
110                 LOGE("Get remove device id failed. err=%d", errCode);
111                 return errCode;
112             }
113 
114             // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
115             errCode = ReleaseExecutor(handle);
116             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
117                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
118                 return errCode;
119             }
120 
121             errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
122             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
123                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
124                 return errCode;
125             }
126 
127             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
128                 errCode));
129             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
130                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
131                 return errCode;
132             }
133         }
134     }
135     return errCode;
136 }
137 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)138 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
139     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
140 {
141     if (syncData.committedData == nullptr) {
142         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
143         if (syncData.committedData == nullptr) {
144             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
145             return -E_OUT_OF_MEMORY;
146         }
147     }
148     InitConflictNotifiedFlag(syncData.committedData);
149 
150     std::vector<DataItem> dataItems;
151     uint64_t minVerIncurCacheDb = 0;
152     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
153     if (errCode != E_OK) {
154         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
155         return errCode;
156     }
157 
158     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
159         ++curMigrateVer;
160         return E_OK;
161     }
162 
163     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
164         curMigrateVer = minVerIncurCacheDb;
165     }
166 
167     // Call the syncer module to erase the water mark.
168     errCode = EraseDeviceWaterMark(handle, dataItems);
169     if (errCode != E_OK) {
170         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
171         return errCode;
172     }
173 
174     // next version need process
175     LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
176         curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
177     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
178     if (errCode != E_OK) {
179         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
180         return errCode;
181     }
182 
183     errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
184     if (errCode != E_OK) {
185         return errCode;
186     }
187 
188     return E_OK;
189 }
190 
191 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)192 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
193     NotifyMigrateSyncData &syncData)
194 {
195     int errCode = ReleaseExecutor(handle);
196     if (errCode != E_OK) {
197         LOGE("release executor for reopen database! errCode = [%d]", errCode);
198         return errCode;
199     }
200 
201     CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
202 
203     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
204     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
205     if (errCode != E_OK) {
206         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
207         return errCode;
208     }
209     return errCode;
210 }
211 
AddSubscribeToMainDBInMigrate()212 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
213 {
214     LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
215     std::lock_guard<std::mutex> lock(subscribeMutex_);
216     if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
217         return E_OK;
218     }
219     int errCode = E_OK;
220     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
221     if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
222         LOGE("Get available executor for add subscribe failed. %d", errCode);
223         return errCode;
224     }
225     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
226     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
227         goto END;
228     }
229     for (auto item : subscribeQuery_) {
230         errCode = handle->AddSubscribeTrigger(item.second, item.first);
231         if (errCode != E_OK) {
232             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
233         }
234     }
235     subscribeQuery_.clear();
236     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
237     (void)handle->Commit();
238 END:
239     ReleaseExecutor(handle);
240     return errCode;
241 }
242 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)243 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
244 {
245     int errCode = E_OK;
246     if (handle == nullptr) {
247         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
248         if (errCode != E_OK) {
249             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
250             return errCode;
251         }
252     }
253 
254     LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
255     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
256     NotifyMigrateSyncData syncData;
257     auto kvdbManager = KvDBManager::GetInstance();
258     if (kvdbManager != nullptr) {
259         auto identifier = GetIdentifier();
260         auto kvdb = kvdbManager->FindKvDB(identifier);
261         if (kvdb != nullptr) {
262             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
263             syncData.isPermitForceWrite =
264                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
265             RefObject::DecObjRef(kvdb);
266         } else {
267             LOGE("[SingleVerEngine] kvdb is null.");
268         }
269     }
270     // cache atomic version represents version of cacheDb input next time
271     while (curMigrateVer < GetCacheRecordVersion()) {
272         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
273         if (errCode != E_OK) {
274             LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
275             break;
276         }
277         if (!syncData.isRemote) {
278             isNeedTriggerSync = true;
279         }
280     }
281     if (syncData.committedData != nullptr) {
282         RefObject::DecObjRef(syncData.committedData);
283         syncData.committedData = nullptr;
284     }
285     // When finished Migrating sync data, will fix engine state
286     return errCode;
287 }
288 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)289 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
290     EngineState stateBeforeMigrate)
291 {
292     LOGD("Begin attach main db and cache db by executor!");
293     // Judge the file corresponding to db by the engine status and attach it to another file
294     int errCode = E_OK;
295     std::string attachAbsPath;
296     if (stateBeforeMigrate == EngineState::MAINDB) {
297         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
298             DBConstant::DB_EXTENSION;
299         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
300     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
301         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
302         DBConstant::DB_EXTENSION;
303         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
304     } else {
305         return -E_NOT_SUPPORT;
306     }
307     if (errCode != E_OK) {
308         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
309         return errCode;
310     }
311 
312     uint64_t maxVersion = 0;
313     errCode = handle->GetMaxVersionInCacheDb(maxVersion);
314     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
315         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
316     }
317 
318     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
319     return errCode;
320 }
321 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const322 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
323 {
324     LOGD("Begin attach main db and cache db by sqlite handle!");
325     // Judge the file corresponding to db by the engine status and attach it to another file
326     int errCode = E_OK;
327     std::string attachAbsPath;
328     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
329         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
330             DBConstant::DB_EXTENSION;
331         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
332     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
333         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
334             DBConstant::DB_EXTENSION;
335         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
336     } else {
337         return -E_NOT_SUPPORT;
338     }
339     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
340         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
341         return errCode;
342     }
343 
344     return errCode;
345 }
346 
ReInit()347 int SQLiteSingleVerStorageEngine::ReInit()
348 {
349     return Init();
350 }
351 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)352 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
353 {
354     if (handle == nullptr) {
355         return E_OK;
356     }
357     StorageExecutor *databaseHandle = handle;
358     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
359     Recycle(databaseHandle);
360     handle = nullptr;
361     if (isCorrupted_) {
362         LOGE("Database is corrupted or invalid passwd!");
363         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
364     }
365     return E_OK;
366 }
367 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)368 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
369     EngineState stateBeforeMigrate)
370 {
371     LOGI("Begin to finish migrate and reinit db state!");
372     int errCode;
373     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
374         return -E_INVALID_ARGS;
375     }
376 
377     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
378         sqlite3 *dbHandle = nullptr;
379         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
380         if (errCode != E_OK) {
381             LOGE("Get Db handle failed! errCode = [%d]", errCode);
382             return errCode;
383         }
384 
385         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
386         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
387             LOGE("Execute the SQLite detach failed:%d", errCode);
388             return errCode;
389         }
390         // delete cachedb
391         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
392         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
393             LOGE("Remove files of cache database after detach:%d", errCode);
394         }
395 
396         SetEngineState(EngineState::MAINDB);
397         return errCode;
398     }
399 
400     errCode = ReleaseExecutor(handle);
401     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
402         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
403         return errCode;
404     }
405 
406     // close db for reinit this engine
407     Release();
408 
409     // delete cache db
410     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
411     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
412         LOGE("Remove files of cache database after release current db:%d", errCode);
413         return errCode;
414     }
415 
416     // reInit, it will reset engine state
417     errCode = ReInit();
418     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
419         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
420         return errCode;
421     }
422 
423     return E_OK;
424 }
425 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)426 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
427     EngineState preMigrateState)
428 {
429     // after attach main and cache need change operate data sql, changing state forbid operate database
430     SetEngineState(EngineState::MIGRATING);
431 
432     int errCode = E_OK;
433     // check if has been attach and attach cache and main for migrate
434     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
435         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
436         if (errCode != E_OK) {
437             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
438             // For lock state open db, can not attach main and cache
439             return errCode;
440         }
441     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
442         // Has been attach, maybe ever crashed, need update version
443         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
444         uint64_t maxVersion = 0;
445         errCode = handle->GetMaxVersionInCacheDb(maxVersion);
446         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
447             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
448         }
449         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
450     } else {
451         return -E_UNEXPECTED_DATA;
452     }
453 
454     return errCode;
455 }
456 
ExecuteMigrate()457 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
458 {
459     EngineState preState = GetEngineState();
460     std::lock_guard<std::mutex> lock(migrateLock_);
461     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
462         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
463         DBConstant::DB_EXTENSION)) {
464         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
465         return E_OK;
466     }
467 
468     // Get write executor for migrate
469     int errCode = E_OK;
470     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
471     if (errCode != E_OK) {
472         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
473         return errCode;
474     }
475 
476     isMigrating_.store(true);
477     LOGD("Migrate start.");
478     bool isNeedTriggerSync = false;
479     errCode = InitExecuteMigrate(handle, preState);
480     if (errCode != E_OK) {
481         LOGE("Init migrate data fail, errCode = [%d]", errCode);
482         goto END;
483     }
484 
485     LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
486         static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
487     // has been attached, Mark start of migration and it can migrate data
488     errCode = MigrateLocalData(handle);
489     if (errCode != E_OK) {
490         LOGE("Migrate local data fail, errCode = [%d]", errCode);
491         goto END;
492     }
493 
494     errCode = MigrateSyncData(handle, isNeedTriggerSync);
495     if (errCode != E_OK) {
496         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
497         goto END;
498     }
499 
500     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
501 
502     // detach database and delete cachedb
503     errCode = FinishMigrateData(handle, preState);
504     if (errCode != E_OK) {
505         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
506         goto END;
507     }
508 
509 END: // after FinishMigrateData, it will reset engine state
510     // there is no need cover the errCode
511     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
512     isMigrating_.store(false);
513     LOGD("Migrate stop.");
514     return errCode;
515 }
516 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)517 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
518     int errCode, bool isNeedTriggerSync)
519 {
520     LOGD("Finish migrating data! errCode = [%d]", errCode);
521     if (errCode != E_OK) {
522         SetEngineState(stateBeforeMigrate);
523     }
524     if (handle != nullptr) {
525         handle->ClearMigrateData();
526     }
527     errCode = ReleaseExecutor(handle);
528     if (errCode != E_OK) {
529         LOGE("release executor after migrating! errCode = [%d]", errCode);
530     }
531 
532     errCode = AddSubscribeToMainDBInMigrate();
533     if (errCode != E_OK) {
534         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
535     }
536 
537     // Notify max timestamp offset for SyncEngine.
538     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
539     RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
540     if (isNeedTriggerSync) {
541         commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
542             nullptr);
543     }
544     return;
545 }
546 
IsEngineCorrupted() const547 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
548 {
549     return isCorrupted_;
550 }
551 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)552 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
553 {
554     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
555     if (executor == nullptr) {
556         return executor;
557     }
558     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
559     return executor;
560 }
561 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)562 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
563 {
564     // Only could get the main database handle in the uninitialized and the main status.
565     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
566         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
567         return -E_EKEYREVOKED;
568     }
569 
570     if (!option_.isMemDb) {
571         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
572             DBConstant::DB_EXTENSION;
573     }
574 
575     OpenDbProperties optionTemp = option_;
576     if (!isWrite) {
577         optionTemp.createIfNecessary = false;
578     }
579 
580     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
581     if (errCode != E_OK) {
582         if (errno == EKEYREVOKED) {
583             LOGI("Failed to open the main database for key revoked[%d]", errCode);
584             errCode = -E_EKEYREVOKED;
585         }
586         return errCode;
587     }
588 
589     executorState_ = ExecutorState::MAINDB;
590     // Set the engine state to main status for that the main database is valid.
591     SetEngineState(EngineState::MAINDB);
592 
593     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
594         DBConstant::DB_EXTENSION)) {
595         // In status cacheDb crash
596         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
597         if (errCode != E_OK) {
598             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
599             return E_OK; // not care err to return, only use for print log
600         }
601         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
602         // cache and main existed together, can not read data, must execute migrate first
603         SetEngineState(EngineState::ATTACHING);
604     }
605 
606     return errCode;
607 }
608 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)609 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
610 {
611     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
612     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
613         secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
614     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
615         return errCode;
616     }
617     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
618         DBConstant::DB_EXTENSION;
619     if (!isWrite || GetEngineState() != EngineState::INVALID ||
620         OS::CheckPathExistence(cacheDbPath)) {
621         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
622             isWrite, GetEngineState());
623         return -E_EKEYREVOKED;
624     }
625 
626     errCode = GetCacheDbHandle(dbHandle);
627     if (errCode != E_OK) {
628         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
629         return errCode;
630     }
631     SetEngineState(EngineState::CACHEDB);
632     executorState_ = ExecutorState::CACHEDB;
633 
634     ResetCacheRecordVersion();
635     // Get handle means maindb file ekeyevoked, not need attach to
636     return errCode;
637 }
638 
639 namespace CacheDbSqls {
640 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
641     "CREATE TABLE IF NOT EXISTS local_data(" \
642         "key     BLOB   NOT NULL," \
643         "value  BLOB," \
644         "timestamp  INT," \
645         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
646         "flag  INT  NOT NULL);";
647 
648 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
649     "CREATE TABLE IF NOT EXISTS sync_data(" \
650         "key         BLOB NOT NULL," \
651         "value       BLOB," \
652         "timestamp   INT  NOT NULL," \
653         "flag        INT  NOT NULL," \
654         "device      BLOB," \
655         "ori_device  BLOB," \
656         "hash_key    BLOB  NOT NULL," \
657         "w_timestamp INT," \
658         "version     INT  NOT NULL," \
659         "PRIMARY Key(version, hash_key));";
660 }
661 
662 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
663 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)664 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
665 {
666     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
667         DBConstant::DB_EXTENSION;
668     // creatTable
669     option_.sqls = {
670         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
671         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
672     };
673 
674     if (!option_.createIfNecessary) {
675         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
676             DBConstant::DB_EXTENSION;
677         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
678             return -E_INVALID_DB;
679         }
680     }
681 
682     OpenDbProperties option = option_; // copy for no change it
683     option.createIfNecessary = true;
684     int errCode = SQLiteUtils::OpenDatabase(option, db);
685     if (errCode != E_OK) {
686         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
687         return errCode;
688     }
689     return errCode;
690 }
691 
CheckDatabaseSecOpt(const SecurityOption & secOption) const692 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
693 {
694     if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
695         secOption.securityLabel != SecurityLabel::NOT_SET &&
696         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
697         LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
698             secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
699             option_.securityOpt.securityFlag);
700     }
701 }
702 
CreateNewDirsAndSetSecOpt() const703 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
704 {
705     LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
706     return CreateNewDirsAndSetSecOption(option_);
707 }
708 
GetExistedSecOption(SecurityOption & secOption) const709 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
710 {
711     LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
712     return GetExistedSecOpt(option_, secOption);
713 }
714 
ClearCorruptedFlag()715 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
716 {
717     isCorrupted_ = false;
718 }
719 
PreCreateExecutor(bool isWrite)720 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
721 {
722     // Assume that create the write executor firstly and the write one we will not be released.
723     // If the write one would be released in the future, should take care the pass through.
724     if (!isWrite) {
725         return E_OK;
726     }
727 
728     if (option_.isMemDb) {
729         return E_OK;
730     }
731 
732     // Get the existed database secure option.
733     SecurityOption existedSecOpt;
734     int errCode = GetExistedSecOption(existedSecOpt);
735     if (errCode != E_OK) {
736         return errCode;
737     }
738 
739     CheckDatabaseSecOpt(existedSecOpt);
740 
741     // Judge whether need update the security option of the engine.
742     // Should update the security in the import or rekey scene(inner) or exist is not set.
743     if (IsUseExistedSecOption(existedSecOpt, option_.securityOpt)) {
744         option_.securityOpt = existedSecOpt;
745     } else {
746         isNeedUpdateSecOpt_ = true;
747     }
748 
749     errCode = CreateNewDirsAndSetSecOpt();
750     if (errCode != E_OK) {
751         return errCode;
752     }
753 
754     if (!isUpdated_) {
755         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
756         if (errCode != E_OK) {
757             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
758             return errCode;
759         }
760     }
761 
762     return E_OK;
763 }
764 
EndCreateExecutor(bool isWrite)765 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
766 {
767     if (option_.isMemDb || !isWrite) {
768         return E_OK;
769     }
770 
771     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
772         isNeedUpdateSecOpt_);
773     if (errCode != E_OK) {
774         if (errCode == -E_NOT_SUPPORT) {
775             option_.securityOpt = SecurityOption();
776             errCode = E_OK;
777         }
778         LOGE("SetSecOption failed:%d", errCode);
779         return errCode;
780     }
781 
782     // after setting secOption, the database file operation ends
783     // database create completed, delete the token
784     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
785         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
786         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
787         return -E_SYSTEM_API_FAIL;
788     }
789     return errCode;
790 }
791 
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)792 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
793 {
794     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
795     if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
796         int errCode = AttachMetaDatabase(dbHandle, option_);
797         if (errCode != E_OK) {
798             (void)sqlite3_close_v2(dbHandle);
799             dbHandle = nullptr;
800             return errCode;
801         }
802         isAttachMeta = true;
803     }
804     return E_OK;
805 }
806 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)807 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
808 {
809     int errCode = PreCreateExecutor(isWrite);
810     if (errCode != E_OK) {
811         return errCode;
812     }
813 
814     sqlite3 *dbHandle = nullptr;
815     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
816     if (errCode != E_OK) {
817         return errCode;
818     }
819 
820     bool isAttachMeta = false;
821     errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
822     if (errCode != E_OK) {
823         return errCode;
824     }
825 
826     RegisterFunctionIfNeed(dbHandle);
827     errCode = Upgrade(dbHandle);
828     if (errCode != E_OK) {
829         (void)sqlite3_close_v2(dbHandle);
830         dbHandle = nullptr;
831         return errCode;
832     }
833 
834     errCode = EndCreateExecutor(isWrite);
835     if (errCode != E_OK) {
836         LOGE("After create executor, set security option incomplete!");
837         (void)sqlite3_close_v2(dbHandle);
838         dbHandle = nullptr;
839         return errCode;
840     }
841 
842     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
843     if (handle == nullptr) {
844         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
845         (void)sqlite3_close_v2(dbHandle);
846         dbHandle = nullptr;
847         return -E_OUT_OF_MEMORY;
848     }
849     if (isAttachMeta) {
850         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
851         singleVerHandle->SetAttachMetaMode(isAttachMeta);
852     }
853     return E_OK;
854 }
855 
Upgrade(sqlite3 * db)856 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
857 {
858     if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
859         LOGI("Storage engine [%.6s] is in cache status or has been upgraded[%d]!",
860             DBCommon::TransferStringToHex(identifier_).c_str(), isUpdated_);
861         return E_OK;
862     }
863 
864     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
865     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
866     if (option_.schema.empty()) {
867         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
868     } else {
869         SchemaObject schema;
870         int errCode = schema.ParseFromSchemaString(option_.schema);
871         if (errCode != E_OK) {
872             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
873             return errCode;
874         }
875         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
876             option_.securityOpt, option_.isMemDb);
877     }
878 
879     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
880     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
881     SecurityOption secOpt = option_.securityOpt;
882     int errCode = E_OK;
883     if (isNeedUpdateSecOpt_) {
884         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
885         if (errCode != E_OK) {
886             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
887             if (errCode != -E_NOT_SUPPORT) {
888                 return errCode;
889             }
890             secOpt = SecurityOption();
891         }
892     }
893 
894     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
895     upgrader->SetSubdir(option_.subdir);
896     errCode = upgrader->Upgrade();
897     if (errCode != E_OK) {
898         LOGE("Single ver database upgrade failed:%d", errCode);
899         return errCode;
900     }
901 
902     LOGD("Finish upgrade single ver database!");
903     isUpdated_ = true; // Identification to avoid repeated upgrades
904     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
905     isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
906     return errCode;
907 }
908 
909 // Attention: This function should be called before "Upgrade".
910 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const911 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
912 {
913     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
914     // not a newly created database, the meta-Table should exist and can be accessed.
915     std::string schemaStr = option_.schema;
916     if (schemaStr.empty()) {
917         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
918         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
919         if (errCode != E_OK) {
920             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
921         }
922     }
923     if (!schemaStr.empty()) {
924         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
925         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
926         if (errCode != E_OK) { // Not very likely
927             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
928             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
929         }
930     }
931 
932     // This function is used to update meta_data in triggers when it's attached to mainDB
933     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
934     if (errCode != E_OK) {
935         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
936     }
937 }
938 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const939 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
940 {
941     int errCode;
942     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
943     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
944         DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
945     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
946     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
947         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
948         if (errCode != E_OK) {
949             return errCode;
950         }
951     }
952     CipherPassword passwd;
953     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
954     if (errCode != E_OK) {
955         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
956     }
957     return errCode;
958 }
959 
ResetCacheRecordVersion()960 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
961 {
962     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
963 }
964 
IncreaseCacheRecordVersion()965 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
966 {
967     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
968 }
969 
GetAndIncreaseCacheRecordVersion()970 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
971 {
972     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
973 }
974 
GetCacheRecordVersion() const975 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
976 {
977     return cacheRecordVersion_.load(std::memory_order_seq_cst);
978 }
979 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const980 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
981     int eventType) const
982 {
983     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
984     if (commitNotifyFunc_ == nullptr) {
985         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
986         RefObject::DecObjRef(committedData);
987         committedData = nullptr;
988         return;
989     }
990     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
991     committedData = nullptr;
992 }
993 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const994 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
995 {
996     if (committedData == nullptr) {
997         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
998         return;
999     }
1000     auto identifier = GetIdentifier();
1001     auto kvDBManager = KvDBManager::GetInstance();
1002     if (kvDBManager == nullptr) {
1003         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1004         return;
1005     }
1006     auto kvdb = kvDBManager->FindKvDB(identifier);
1007     if (kvdb == nullptr) {
1008         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1009         return;
1010     }
1011     unsigned int conflictFlag = 0;
1012     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1013         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1014         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1015     }
1016     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1017         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1018         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1019     }
1020     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1021         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1022         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1023     }
1024     RefObject::DecObjRef(kvdb);
1025     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1026     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1027 }
1028 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1029 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1030 {
1031     const auto &isRemote = syncData.isRemote;
1032     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1033     auto &committedData = syncData.committedData;
1034     auto &entries = syncData.entries;
1035 
1036     // Put data. Including insert, update and delete.
1037     if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1038         if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1039             int eventType = static_cast<int>(isRemote ?
1040                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1041                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1042             CommitAndReleaseNotifyData(committedData, eventType);
1043         }
1044         return;
1045     }
1046 
1047     // Remove device data.
1048     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1049         return;
1050     }
1051     size_t totalSize = 0;
1052     for (auto iter = entries.begin(); iter != entries.end();) {
1053         auto &entry = *iter;
1054         if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1055             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1056             if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1057                 LOGE("Alloc committed notify data failed.");
1058                 return;
1059             }
1060         }
1061         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() >
1062             DBConstant::MAX_VALUE_SIZE) { // LCOV_EXCL_BR_LINE
1063             iter++;
1064             continue;
1065         }
1066         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1067             CommitAndReleaseNotifyData(committedData,
1068                 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1069             totalSize = 0;
1070             continue;
1071         }
1072         totalSize += (entry.key.size() + entry.value.size());
1073         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1074         iter++;
1075     }
1076     if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1077         CommitAndReleaseNotifyData(committedData,
1078             static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1079     }
1080 }
1081 
1082 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1083 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1084 {
1085     std::lock_guard<std::mutex> lock(subscribeMutex_);
1086     subscribeQuery_[subscribeId] = query;
1087 }
1088 
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1089 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1090     const SecurityOption &openSecOpt)
1091 {
1092     if (isNeedUpdateSecOpt_) {
1093         return false;
1094     }
1095     if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1096         return false;
1097     }
1098     return true;
1099 }
1100 
UpgradeLocalMetaData()1101 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1102 {
1103     std::function<int(void)> schemaChangedFunc = nullptr;
1104     {
1105         std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1106         if (isSchemaChanged_) {
1107             schemaChangedFunc = schemaChangedFunc_;
1108             isSchemaChanged_ = false;
1109         }
1110     }
1111     if (schemaChangedFunc != nullptr) {
1112         return schemaChangedFunc();
1113     }
1114     return E_OK;
1115 }
1116 }
1117