• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_log_table_manager.h"
30 #include "sqlite_single_ver_database_upgrader.h"
31 #include "sqlite_single_ver_natural_store.h"
32 #include "sqlite_single_ver_schema_database_upgrader.h"
33 
34 namespace DistributedDB {
SQLiteSingleVerStorageEngine()35 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
36     : executorState_(ExecutorState::INVALID),
37       cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
38       isCorrupted_(false),
39       isNeedUpdateSecOpt_(false),
40       maxValueSize_(DBConstant::MAX_VALUE_SIZE)
41 {}
42 
~SQLiteSingleVerStorageEngine()43 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
44 {
45 }
46 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const47 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
48 {
49     return handle->MigrateLocalData();
50 }
51 
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)52 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
53 {
54     auto kvdbManager = KvDBManager::GetInstance();
55     if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
56         return -E_INVALID_DB;
57     }
58     auto identifier = GetIdentifier();
59     auto kvdb = kvdbManager->FindKvDB(identifier);
60     if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
61         LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
62         return -E_INVALID_DB;
63     }
64 
65     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
66     for (const auto &devId : removeDevices) {
67         int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
68         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
69             RefObject::DecObjRef(kvdb);
70             return errCode;
71         }
72     }
73 
74     RefObject::DecObjRef(kvdb);
75     return E_OK;
76 }
77 
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const78 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
79     std::set<std::string> &removeDevices, bool &isNeedHash) const
80 {
81     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
82         return -E_INVALID_DB;
83     }
84     if (item.value.empty()) { // Device ID has been set to value in cache db
85         // Empty means remove all device data, get device id from meta key
86         // LCOV_EXCL_BR_LINE
87         int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
88         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
89             LOGE("Get remove devices list from meta failed. err=%d", errCode);
90             return errCode;
91         }
92         isNeedHash = false;
93     } else {
94         std::string deviceName;
95         DBCommon::VectorToString(item.value, deviceName);
96         removeDevices.insert(deviceName);
97     }
98     return E_OK;
99 }
100 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)101 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
102     const std::vector<DataItem> &dataItems)
103 {
104     int errCode = E_OK;
105     for (const auto &dataItem : dataItems) {
106         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
107             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
108             bool isNeedHash = true;
109             std::set<std::string> removeDevices;
110             errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
111             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
112                 LOGE("Get remove device id failed. err=%d", errCode);
113                 return errCode;
114             }
115 
116             // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
117             errCode = ReleaseExecutor(handle);
118             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
119                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
120                 return errCode;
121             }
122 
123             errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
124             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
125                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
126                 return errCode;
127             }
128 
129             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
130                 errCode));
131             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
132                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
133                 return errCode;
134             }
135         }
136     }
137     return errCode;
138 }
139 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)140 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
141     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
142 {
143     if (syncData.committedData == nullptr) {
144         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
145         if (syncData.committedData == nullptr) {
146             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
147             return -E_OUT_OF_MEMORY;
148         }
149     }
150     InitConflictNotifiedFlag(syncData.committedData);
151 
152     std::vector<DataItem> dataItems;
153     uint64_t minVerIncurCacheDb = 0;
154     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
155     if (errCode != E_OK) {
156         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
157         return errCode;
158     }
159 
160     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
161         ++curMigrateVer;
162         return E_OK;
163     }
164 
165     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
166         curMigrateVer = minVerIncurCacheDb;
167     }
168 
169     // Call the syncer module to erase the water mark.
170     errCode = EraseDeviceWaterMark(handle, dataItems);
171     if (errCode != E_OK) {
172         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
173         return errCode;
174     }
175 
176     // next version need process
177     LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
178         curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
179     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
180     if (errCode != E_OK) {
181         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
182         return errCode;
183     }
184 
185     errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
186     if (errCode != E_OK) {
187         return errCode;
188     }
189 
190     return E_OK;
191 }
192 
193 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)194 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
195     NotifyMigrateSyncData &syncData)
196 {
197     int errCode = ReleaseExecutor(handle);
198     if (errCode != E_OK) {
199         LOGE("release executor for reopen database! errCode = [%d]", errCode);
200         return errCode;
201     }
202 
203     CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
204 
205     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
206     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
207     if (errCode != E_OK) {
208         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
209         return errCode;
210     }
211     return errCode;
212 }
213 
AddSubscribeToMainDBInMigrate()214 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
215 {
216     LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
217     std::lock_guard<std::mutex> lock(subscribeMutex_);
218     if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
219         return E_OK;
220     }
221     int errCode = E_OK;
222     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
223     if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
224         LOGE("Get available executor for add subscribe failed. %d", errCode);
225         return errCode;
226     }
227     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
228     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
229         goto END;
230     }
231     for (auto item : subscribeQuery_) {
232         errCode = handle->AddSubscribeTrigger(item.second, item.first);
233         if (errCode != E_OK) {
234             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
235         }
236     }
237     subscribeQuery_.clear();
238     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
239     (void)handle->Commit();
240 END:
241     ReleaseExecutor(handle);
242     return errCode;
243 }
244 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)245 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
246 {
247     int errCode = E_OK;
248     if (handle == nullptr) {
249         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
250         if (errCode != E_OK) {
251             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
252             return errCode;
253         }
254     }
255 
256     LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
257     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
258     NotifyMigrateSyncData syncData;
259     auto kvdbManager = KvDBManager::GetInstance();
260     if (kvdbManager != nullptr) {
261         auto identifier = GetIdentifier();
262         auto kvdb = kvdbManager->FindKvDB(identifier);
263         if (kvdb != nullptr) {
264             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
265             syncData.isPermitForceWrite =
266                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
267             RefObject::DecObjRef(kvdb);
268         } else {
269             LOGE("[SingleVerEngine] kvdb is null.");
270         }
271     }
272     // cache atomic version represents version of cacheDb input next time
273     while (curMigrateVer < GetCacheRecordVersion()) {
274         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
275         if (errCode != E_OK) {
276             LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
277             break;
278         }
279         if (!syncData.isRemote) {
280             isNeedTriggerSync = true;
281         }
282     }
283     if (syncData.committedData != nullptr) {
284         RefObject::DecObjRef(syncData.committedData);
285         syncData.committedData = nullptr;
286     }
287     // When finished Migrating sync data, will fix engine state
288     return errCode;
289 }
290 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)291 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
292     EngineState stateBeforeMigrate)
293 {
294     LOGD("Begin attach main db and cache db by executor!");
295     // Judge the file corresponding to db by the engine status and attach it to another file
296     int errCode = E_OK;
297     std::string attachAbsPath;
298     if (stateBeforeMigrate == EngineState::MAINDB) {
299         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
300             DBConstant::DB_EXTENSION;
301         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
302     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
303         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
304         DBConstant::DB_EXTENSION;
305         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
306     } else {
307         return -E_NOT_SUPPORT;
308     }
309     if (errCode != E_OK) {
310         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
311         return errCode;
312     }
313 
314     uint64_t maxVersion = 0;
315     errCode = handle->GetMaxVersionInCacheDb(maxVersion);
316     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
317         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
318     }
319 
320     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
321     return errCode;
322 }
323 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const324 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
325 {
326     LOGD("Begin attach main db and cache db by sqlite handle!");
327     // Judge the file corresponding to db by the engine status and attach it to another file
328     int errCode = E_OK;
329     std::string attachAbsPath;
330     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
331         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
332             DBConstant::DB_EXTENSION;
333         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
334     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
335         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
336             DBConstant::DB_EXTENSION;
337         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
338     } else {
339         return -E_NOT_SUPPORT;
340     }
341     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
342         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
343         return errCode;
344     }
345 
346     return errCode;
347 }
348 
ReInit()349 int SQLiteSingleVerStorageEngine::ReInit()
350 {
351     return Init();
352 }
353 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)354 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
355 {
356     if (handle == nullptr) {
357         return E_OK;
358     }
359     StorageExecutor *databaseHandle = handle;
360     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
361     Recycle(databaseHandle);
362     handle = nullptr;
363     if (isCorrupted_) {
364         LOGE("Database is corrupted or invalid passwd!");
365         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
366     }
367     return E_OK;
368 }
369 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)370 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
371     EngineState stateBeforeMigrate)
372 {
373     LOGI("Begin to finish migrate and reinit db state!");
374     int errCode;
375     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
376         return -E_INVALID_ARGS;
377     }
378 
379     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
380         sqlite3 *dbHandle = nullptr;
381         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
382         if (errCode != E_OK) {
383             LOGE("Get Db handle failed! errCode = [%d]", errCode);
384             return errCode;
385         }
386 
387         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
388         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
389             LOGE("Execute the SQLite detach failed:%d", errCode);
390             return errCode;
391         }
392         // delete cachedb
393         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
394         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
395             LOGE("Remove files of cache database after detach:%d", errCode);
396         }
397 
398         SetEngineState(EngineState::MAINDB);
399         return errCode;
400     }
401 
402     errCode = ReleaseExecutor(handle);
403     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
404         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
405         return errCode;
406     }
407 
408     // close db for reinit this engine
409     Release();
410 
411     // delete cache db
412     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
413     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
414         LOGE("Remove files of cache database after release current db:%d", errCode);
415         return errCode;
416     }
417 
418     // reInit, it will reset engine state
419     errCode = ReInit();
420     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
421         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
422         return errCode;
423     }
424 
425     return E_OK;
426 }
427 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)428 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
429     EngineState preMigrateState)
430 {
431     // after attach main and cache need change operate data sql, changing state forbid operate database
432     SetEngineState(EngineState::MIGRATING);
433 
434     int errCode = E_OK;
435     // check if has been attach and attach cache and main for migrate
436     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
437         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
438         if (errCode != E_OK) {
439             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
440             // For lock state open db, can not attach main and cache
441             return errCode;
442         }
443     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
444         // Has been attach, maybe ever crashed, need update version
445         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
446         uint64_t maxVersion = 0;
447         errCode = handle->GetMaxVersionInCacheDb(maxVersion);
448         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
449             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
450         }
451         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
452     } else {
453         return -E_UNEXPECTED_DATA;
454     }
455 
456     return errCode;
457 }
458 
ExecuteMigrate()459 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
460 {
461     EngineState preState = GetEngineState();
462     std::lock_guard<std::mutex> lock(migrateLock_);
463     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
464         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
465         DBConstant::DB_EXTENSION)) {
466         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
467         return E_OK;
468     }
469 
470     // Get write executor for migrate
471     int errCode = E_OK;
472     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
473     if (errCode != E_OK) {
474         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
475         return errCode;
476     }
477 
478     isMigrating_.store(true);
479     LOGD("Migrate start.");
480     bool isNeedTriggerSync = false;
481     errCode = InitExecuteMigrate(handle, preState);
482     if (errCode != E_OK) {
483         LOGE("Init migrate data fail, errCode = [%d]", errCode);
484         goto END;
485     }
486 
487     LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
488         static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
489     // has been attached, Mark start of migration and it can migrate data
490     errCode = MigrateLocalData(handle);
491     if (errCode != E_OK) {
492         LOGE("Migrate local data fail, errCode = [%d]", errCode);
493         goto END;
494     }
495 
496     errCode = MigrateSyncData(handle, isNeedTriggerSync);
497     if (errCode != E_OK) {
498         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
499         goto END;
500     }
501 
502     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
503 
504     // detach database and delete cachedb
505     errCode = FinishMigrateData(handle, preState);
506     if (errCode != E_OK) {
507         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
508         goto END;
509     }
510 
511 END: // after FinishMigrateData, it will reset engine state
512     // there is no need cover the errCode
513     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
514     isMigrating_.store(false);
515     LOGD("Migrate stop.");
516     return errCode;
517 }
518 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)519 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
520     int errCode, bool isNeedTriggerSync)
521 {
522     LOGD("Finish migrating data! errCode = [%d]", errCode);
523     if (errCode != E_OK) {
524         SetEngineState(stateBeforeMigrate);
525     }
526     if (handle != nullptr) {
527         handle->ClearMigrateData();
528     }
529     errCode = ReleaseExecutor(handle);
530     if (errCode != E_OK) {
531         LOGE("release executor after migrating! errCode = [%d]", errCode);
532     }
533 
534     errCode = AddSubscribeToMainDBInMigrate();
535     if (errCode != E_OK) {
536         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
537     }
538 
539     // Notify max timestamp offset for SyncEngine.
540     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
541     RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
542     if (isNeedTriggerSync) {
543         commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
544             nullptr);
545     }
546     return;
547 }
548 
IsEngineCorrupted() const549 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
550 {
551     return isCorrupted_;
552 }
553 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)554 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
555 {
556     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
557     if (executor == nullptr) {
558         return executor;
559     }
560     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
561     return executor;
562 }
563 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)564 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
565 {
566     // Only could get the main database handle in the uninitialized and the main status.
567     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
568         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
569         return -E_EKEYREVOKED;
570     }
571 
572     if (!option_.isMemDb) {
573         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
574             DBConstant::DB_EXTENSION;
575     }
576 
577     OpenDbProperties optionTemp = option_;
578     if (!isWrite) {
579         optionTemp.createIfNecessary = false;
580     }
581 
582     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
583     if (errCode != E_OK) {
584         if (errno == EKEYREVOKED) {
585             LOGI("Failed to open the main database for key revoked[%d]", errCode);
586             errCode = -E_EKEYREVOKED;
587         }
588         return errCode;
589     }
590 
591     executorState_ = ExecutorState::MAINDB;
592     // Set the engine state to main status for that the main database is valid.
593     SetEngineState(EngineState::MAINDB);
594 
595     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
596         DBConstant::DB_EXTENSION)) {
597         // In status cacheDb crash
598         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
599         if (errCode != E_OK) {
600             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
601             return E_OK; // not care err to return, only use for print log
602         }
603         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
604         // cache and main existed together, can not read data, must execute migrate first
605         SetEngineState(EngineState::ATTACHING);
606     }
607 
608     return errCode;
609 }
610 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)611 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
612 {
613     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
614     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
615         secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
616     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
617         return errCode;
618     }
619     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
620         DBConstant::DB_EXTENSION;
621     if (!isWrite || GetEngineState() != EngineState::INVALID ||
622         OS::CheckPathExistence(cacheDbPath)) {
623         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
624             isWrite, GetEngineState());
625         return -E_EKEYREVOKED;
626     }
627 
628     errCode = GetCacheDbHandle(dbHandle);
629     if (errCode != E_OK) {
630         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
631         return errCode;
632     }
633     SetEngineState(EngineState::CACHEDB);
634     executorState_ = ExecutorState::CACHEDB;
635 
636     ResetCacheRecordVersion();
637     // Get handle means maindb file ekeyevoked, not need attach to
638     return errCode;
639 }
640 
641 namespace CacheDbSqls {
642 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
643     "CREATE TABLE IF NOT EXISTS local_data(" \
644         "key     BLOB   NOT NULL," \
645         "value  BLOB," \
646         "timestamp  INT," \
647         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
648         "flag  INT  NOT NULL);";
649 
650 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
651     "CREATE TABLE IF NOT EXISTS sync_data(" \
652         "key         BLOB NOT NULL," \
653         "value       BLOB," \
654         "timestamp   INT  NOT NULL," \
655         "flag        INT  NOT NULL," \
656         "device      BLOB," \
657         "ori_device  BLOB," \
658         "hash_key    BLOB  NOT NULL," \
659         "w_timestamp INT," \
660         "version     INT  NOT NULL," \
661         "PRIMARY Key(version, hash_key));";
662 }
663 
664 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
665 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)666 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
667 {
668     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
669         DBConstant::DB_EXTENSION;
670     // creatTable
671     option_.sqls = {
672         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
673         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
674     };
675 
676     if (!option_.createIfNecessary) {
677         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
678             DBConstant::DB_EXTENSION;
679         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
680             return -E_INVALID_DB;
681         }
682     }
683 
684     OpenDbProperties option = option_; // copy for no change it
685     option.createIfNecessary = true;
686     int errCode = SQLiteUtils::OpenDatabase(option, db);
687     if (errCode != E_OK) {
688         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
689         return errCode;
690     }
691     return errCode;
692 }
693 
CheckDatabaseSecOpt(const SecurityOption & secOption) const694 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
695 {
696     if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
697         secOption.securityLabel != SecurityLabel::NOT_SET &&
698         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
699         LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
700             secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
701             option_.securityOpt.securityFlag);
702     }
703 }
704 
CreateNewDirsAndSetSecOpt() const705 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
706 {
707     LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
708     return CreateNewDirsAndSetSecOption(option_);
709 }
710 
GetExistedSecOption(SecurityOption & secOption) const711 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
712 {
713     LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
714     return GetExistedSecOpt(option_, secOption);
715 }
716 
ClearCorruptedFlag()717 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
718 {
719     isCorrupted_ = false;
720 }
721 
PreCreateExecutor(bool isWrite,SecurityOption & existedSecOpt)722 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite, SecurityOption &existedSecOpt)
723 {
724     // Assume that create the write executor firstly and the write one we will not be released.
725     // If the write one would be released in the future, should take care the pass through.
726     if (!isWrite) {
727         return E_OK;
728     }
729 
730     if (option_.isMemDb) {
731         return E_OK;
732     }
733 
734     // check sqlite open ok
735     int errCode = CheckStoreStatus(option_);
736     if (errCode != E_OK) {
737         return errCode;
738     }
739 
740     // Get the existed database secure option.
741     errCode = GetExistedSecOption(existedSecOpt);
742     if (errCode != E_OK) {
743         return errCode;
744     }
745 
746     CheckDatabaseSecOpt(existedSecOpt);
747 
748     // Judge whether need update the security option of the engine.
749     // Should update the security in the import or rekey scene(inner) or exist is not set.
750     if (IsUseExistedSecOption(existedSecOpt, option_.securityOpt)) {
751         option_.securityOpt = existedSecOpt;
752     } else {
753         isNeedUpdateSecOpt_ = true;
754     }
755 
756     errCode = CreateNewDirsAndSetSecOpt();
757     if (errCode != E_OK) {
758         return errCode;
759     }
760 
761     if (!isUpdated_) {
762         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
763         if (errCode != E_OK) {
764             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
765             return errCode;
766         }
767     }
768 
769     return E_OK;
770 }
771 
EndCreateExecutor(sqlite3 * db,SecurityOption existedSecOpt,bool isWrite,bool isDetachMeta)772 int SQLiteSingleVerStorageEngine::EndCreateExecutor(sqlite3 *db, SecurityOption existedSecOpt, bool isWrite,
773     bool isDetachMeta)
774 {
775     if (option_.isMemDb || !isWrite) {
776         return E_OK;
777     }
778 
779     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt, existedSecOpt,
780         isNeedUpdateSecOpt_);
781     if (errCode != E_OK) {
782         if (errCode == -E_NOT_SUPPORT) {
783             option_.securityOpt = SecurityOption();
784             errCode = E_OK;
785         }
786         LOGE("SetSecOption failed:%d", errCode);
787         return errCode;
788     }
789 
790     // after setting secOption, the database file operation ends
791     // database create completed, delete the token
792     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
793         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
794         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
795         return -E_SYSTEM_API_FAIL;
796     }
797     if (isDetachMeta) {
798         errCode = SQLiteUtils::ExecuteRawSQL(db, "DETACH 'meta'");
799         if (errCode != E_OK) {
800             LOGE("Detach meta db failed %d", errCode);
801             return errCode;
802         } else {
803             LOGI("Detach meta db success");
804         }
805     }
806     errCode = SqliteLogTableManager::CreateKvSyncLogTable(db);
807     if (errCode != E_OK) {
808         LOGE("[SQLiteSingleVerStorageEngine] create cloud log table failed, errCode = [%d]", errCode);
809     } else {
810         LOGI("[SQLiteSingleVerStorageEngine] create cloud log table success");
811     }
812     return errCode;
813 }
814 
TryAttachMetaDb(const SecurityOption & existedSecOpt,sqlite3 * & dbHandle,bool & isAttachMeta,bool & isNeedDetachMeta)815 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(const SecurityOption &existedSecOpt, sqlite3 *&dbHandle,
816     bool &isAttachMeta, bool &isNeedDetachMeta)
817 {
818     bool isCurrentSESECE = ParamCheckUtils::IsS3SECEOpt(existedSecOpt);
819     bool isOpenSESECE = ParamCheckUtils::IsS3SECEOpt(option_.securityOpt);
820     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
821     if ((!option_.isMemDb) && (isOpenSESECE || (isNeedUpdateSecOpt_ && isCurrentSESECE))) {
822         int errCode = AttachMetaDatabase(dbHandle, option_);
823         if (errCode != E_OK) {
824             (void)sqlite3_close_v2(dbHandle);
825             dbHandle = nullptr;
826             return errCode;
827         }
828         isAttachMeta = isOpenSESECE; // only open with S3 SECE need in attach mode
829         isNeedDetachMeta = !isOpenSESECE && isCurrentSESECE; // NOT S3 SECE no need meta.db
830     }
831     return E_OK;
832 }
833 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)834 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
835 {
836     SecurityOption existedSecOpt;
837     int errCode = PreCreateExecutor(isWrite, existedSecOpt);
838     if (errCode != E_OK) {
839         return errCode;
840     }
841 
842     sqlite3 *dbHandle = nullptr;
843     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
844     if (errCode != E_OK) {
845         return errCode;
846     }
847 
848     bool isAttachMeta = false;
849     bool isDetachMeta = false;
850     errCode = TryAttachMetaDb(existedSecOpt, dbHandle, isAttachMeta, isDetachMeta);
851     if (errCode != E_OK) {
852         return errCode;
853     }
854 
855     RegisterFunctionIfNeed(dbHandle);
856     errCode = Upgrade(dbHandle);
857     if (errCode != E_OK) {
858         (void)sqlite3_close_v2(dbHandle);
859         dbHandle = nullptr;
860         return errCode;
861     }
862 
863     errCode = EndCreateExecutor(dbHandle, existedSecOpt, isWrite, isDetachMeta);
864     if (errCode != E_OK) {
865         LOGE("After create executor, set security option incomplete!");
866         (void)sqlite3_close_v2(dbHandle);
867         dbHandle = nullptr;
868         return errCode;
869     }
870 
871     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
872     if (handle == nullptr) {
873         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
874         (void)sqlite3_close_v2(dbHandle);
875         dbHandle = nullptr;
876         return -E_OUT_OF_MEMORY;
877     }
878     if (isAttachMeta) {
879         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
880         singleVerHandle->SetAttachMetaMode(isAttachMeta);
881     }
882     return E_OK;
883 }
884 
Upgrade(sqlite3 * db)885 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
886 {
887     if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
888         return E_OK;
889     }
890 
891     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
892     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
893     if (option_.schema.empty()) {
894         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
895     } else {
896         SchemaObject schema;
897         int errCode = schema.ParseFromSchemaString(option_.schema);
898         if (errCode != E_OK) {
899             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
900             return errCode;
901         }
902         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
903             option_.securityOpt, option_.isMemDb);
904     }
905 
906     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
907     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
908     SecurityOption secOpt = option_.securityOpt;
909     int errCode = E_OK;
910     if (isNeedUpdateSecOpt_) {
911         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
912         if (errCode != E_OK) {
913             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
914             if (errCode != -E_NOT_SUPPORT) {
915                 return errCode;
916             }
917             secOpt = SecurityOption();
918         }
919     }
920 
921     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
922     upgrader->SetSubdir(option_.subdir);
923     errCode = upgrader->Upgrade();
924     if (errCode != E_OK) {
925         LOGE("Single ver database upgrade failed:%d", errCode);
926         return errCode;
927     }
928 
929     LOGD("Finish upgrade single ver database!");
930     isUpdated_ = true; // Identification to avoid repeated upgrades
931     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
932     isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
933     return errCode;
934 }
935 
936 // Attention: This function should be called before "Upgrade".
937 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const938 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
939 {
940     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
941     // not a newly created database, the meta-Table should exist and can be accessed.
942     std::string schemaStr = option_.schema;
943     if (schemaStr.empty()) {
944         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
945         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
946         if (errCode != E_OK) {
947             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
948         }
949     }
950     if (!schemaStr.empty()) {
951         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
952         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
953         if (errCode != E_OK) { // Not very likely
954             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
955             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
956         }
957     }
958 
959     // This function is used to update meta_data in triggers when it's attached to mainDB
960     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
961     if (errCode != E_OK) {
962         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
963     }
964 }
965 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const966 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
967 {
968     int errCode;
969     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
970     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
971         DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
972     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
973     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
974         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
975         if (errCode != E_OK) {
976             return errCode;
977         }
978     }
979     CipherPassword passwd;
980     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
981     if (errCode != E_OK) {
982         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
983     }
984     return errCode;
985 }
986 
ResetCacheRecordVersion()987 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
988 {
989     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
990 }
991 
IncreaseCacheRecordVersion()992 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
993 {
994     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
995 }
996 
GetAndIncreaseCacheRecordVersion()997 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
998 {
999     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1000 }
1001 
GetCacheRecordVersion() const1002 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1003 {
1004     return cacheRecordVersion_.load(std::memory_order_seq_cst);
1005 }
1006 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1007 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1008     int eventType) const
1009 {
1010     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1011     if (commitNotifyFunc_ == nullptr) {
1012         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1013         RefObject::DecObjRef(committedData);
1014         committedData = nullptr;
1015         return;
1016     }
1017     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1018     committedData = nullptr;
1019 }
1020 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1021 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1022 {
1023     if (committedData == nullptr) {
1024         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1025         return;
1026     }
1027     auto identifier = GetIdentifier();
1028     auto kvDBManager = KvDBManager::GetInstance();
1029     if (kvDBManager == nullptr) {
1030         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1031         return;
1032     }
1033     auto kvdb = kvDBManager->FindKvDB(identifier);
1034     if (kvdb == nullptr) {
1035         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1036         return;
1037     }
1038     unsigned int conflictFlag = 0;
1039     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1040         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1041         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1042     }
1043     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1044         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1045         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1046     }
1047     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1048         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1049         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1050     }
1051     RefObject::DecObjRef(kvdb);
1052     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1053     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1054 }
1055 
SetMaxValueSize(uint32_t maxValueSize)1056 void SQLiteSingleVerStorageEngine::SetMaxValueSize(uint32_t maxValueSize)
1057 {
1058     LOGI("Set the max value size to %" PRIu32, maxValueSize);
1059     maxValueSize_ = maxValueSize;
1060 }
1061 
GetMaxValueSize()1062 uint32_t SQLiteSingleVerStorageEngine::GetMaxValueSize()
1063 {
1064     return maxValueSize_;
1065 }
1066 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1067 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1068 {
1069     const auto &isRemote = syncData.isRemote;
1070     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1071     auto &committedData = syncData.committedData;
1072     auto &entries = syncData.entries;
1073 
1074     // Put data. Including insert, update and delete.
1075     if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1076         if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1077             int eventType = static_cast<int>(isRemote ?
1078                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1079                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1080             CommitAndReleaseNotifyData(committedData, eventType);
1081         }
1082         return;
1083     }
1084 
1085     // Remove device data.
1086     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1087         return;
1088     }
1089     size_t totalSize = 0;
1090     for (auto iter = entries.begin(); iter != entries.end();) {
1091         auto &entry = *iter;
1092         if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1093             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1094             if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1095                 LOGE("Alloc committed notify data failed.");
1096                 return;
1097             }
1098         }
1099         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > maxValueSize_) { // LCOV_EXCL_BR_LINE
1100             iter++;
1101             continue;
1102         }
1103         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1104             CommitAndReleaseNotifyData(committedData,
1105                 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1106             totalSize = 0;
1107             continue;
1108         }
1109         totalSize += (entry.key.size() + entry.value.size());
1110         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1111         iter++;
1112     }
1113     if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1114         CommitAndReleaseNotifyData(committedData,
1115             static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1116     }
1117 }
1118 
1119 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1120 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1121 {
1122     std::lock_guard<std::mutex> lock(subscribeMutex_);
1123     subscribeQuery_[subscribeId] = query;
1124 }
1125 
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1126 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1127     const SecurityOption &openSecOpt)
1128 {
1129     if (isNeedUpdateSecOpt_) {
1130         return false;
1131     }
1132     if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1133         return false;
1134     }
1135     return true;
1136 }
1137 
UpgradeLocalMetaData()1138 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1139 {
1140     std::function<int(void)> schemaChangedFunc = nullptr;
1141     {
1142         std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1143         if (isSchemaChanged_) {
1144             schemaChangedFunc = schemaChangedFunc_;
1145             isSchemaChanged_ = false;
1146         }
1147     }
1148     if (schemaChangedFunc != nullptr) {
1149         return schemaChangedFunc();
1150     }
1151     return E_OK;
1152 }
1153 }
1154