• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_single_ver_database_upgrader.h"
30 #include "sqlite_single_ver_natural_store.h"
31 #include "sqlite_single_ver_schema_database_upgrader.h"
32 
33 namespace DistributedDB {
SQLiteSingleVerStorageEngine()34 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
35     : executorState_(ExecutorState::INVALID),
36       cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
37       isCorrupted_(false),
38       isNeedUpdateSecOpt_(false),
39       maxValueSize_(DBConstant::MAX_VALUE_SIZE)
40 {}
41 
~SQLiteSingleVerStorageEngine()42 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
43 {
44 }
45 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const46 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
47 {
48     return handle->MigrateLocalData();
49 }
50 
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)51 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
52 {
53     auto kvdbManager = KvDBManager::GetInstance();
54     if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
55         return -E_INVALID_DB;
56     }
57     auto identifier = GetIdentifier();
58     auto kvdb = kvdbManager->FindKvDB(identifier);
59     if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
60         LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
61         return -E_INVALID_DB;
62     }
63 
64     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
65     for (const auto &devId : removeDevices) {
66         int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
67         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
68             RefObject::DecObjRef(kvdb);
69             return errCode;
70         }
71     }
72 
73     RefObject::DecObjRef(kvdb);
74     return E_OK;
75 }
76 
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const77 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
78     std::set<std::string> &removeDevices, bool &isNeedHash) const
79 {
80     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
81         return -E_INVALID_DB;
82     }
83     if (item.value.empty()) { // Device ID has been set to value in cache db
84         // Empty means remove all device data, get device id from meta key
85         // LCOV_EXCL_BR_LINE
86         int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
87         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
88             LOGE("Get remove devices list from meta failed. err=%d", errCode);
89             return errCode;
90         }
91         isNeedHash = false;
92     } else {
93         std::string deviceName;
94         DBCommon::VectorToString(item.value, deviceName);
95         removeDevices.insert(deviceName);
96     }
97     return E_OK;
98 }
99 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)100 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
101     const std::vector<DataItem> &dataItems)
102 {
103     int errCode = E_OK;
104     for (const auto &dataItem : dataItems) {
105         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
106             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
107             bool isNeedHash = true;
108             std::set<std::string> removeDevices;
109             errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
110             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
111                 LOGE("Get remove device id failed. err=%d", errCode);
112                 return errCode;
113             }
114 
115             // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
116             errCode = ReleaseExecutor(handle);
117             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
118                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
119                 return errCode;
120             }
121 
122             errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
123             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
124                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
125                 return errCode;
126             }
127 
128             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
129                 errCode));
130             if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
131                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
132                 return errCode;
133             }
134         }
135     }
136     return errCode;
137 }
138 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)139 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
140     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
141 {
142     if (syncData.committedData == nullptr) {
143         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
144         if (syncData.committedData == nullptr) {
145             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
146             return -E_OUT_OF_MEMORY;
147         }
148     }
149     InitConflictNotifiedFlag(syncData.committedData);
150 
151     std::vector<DataItem> dataItems;
152     uint64_t minVerIncurCacheDb = 0;
153     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
154     if (errCode != E_OK) {
155         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
156         return errCode;
157     }
158 
159     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
160         ++curMigrateVer;
161         return E_OK;
162     }
163 
164     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
165         curMigrateVer = minVerIncurCacheDb;
166     }
167 
168     // Call the syncer module to erase the water mark.
169     errCode = EraseDeviceWaterMark(handle, dataItems);
170     if (errCode != E_OK) {
171         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
172         return errCode;
173     }
174 
175     // next version need process
176     LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
177         curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
178     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
179     if (errCode != E_OK) {
180         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
181         return errCode;
182     }
183 
184     errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
185     if (errCode != E_OK) {
186         return errCode;
187     }
188 
189     return E_OK;
190 }
191 
192 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)193 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
194     NotifyMigrateSyncData &syncData)
195 {
196     int errCode = ReleaseExecutor(handle);
197     if (errCode != E_OK) {
198         LOGE("release executor for reopen database! errCode = [%d]", errCode);
199         return errCode;
200     }
201 
202     CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
203 
204     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
205     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
206     if (errCode != E_OK) {
207         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
208         return errCode;
209     }
210     return errCode;
211 }
212 
AddSubscribeToMainDBInMigrate()213 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
214 {
215     LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
216     std::lock_guard<std::mutex> lock(subscribeMutex_);
217     if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
218         return E_OK;
219     }
220     int errCode = E_OK;
221     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
222     if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
223         LOGE("Get available executor for add subscribe failed. %d", errCode);
224         return errCode;
225     }
226     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
227     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
228         goto END;
229     }
230     for (auto item : subscribeQuery_) {
231         errCode = handle->AddSubscribeTrigger(item.second, item.first);
232         if (errCode != E_OK) {
233             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
234         }
235     }
236     subscribeQuery_.clear();
237     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
238     (void)handle->Commit();
239 END:
240     ReleaseExecutor(handle);
241     return errCode;
242 }
243 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)244 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
245 {
246     int errCode = E_OK;
247     if (handle == nullptr) {
248         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
249         if (errCode != E_OK) {
250             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
251             return errCode;
252         }
253     }
254 
255     LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
256     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
257     NotifyMigrateSyncData syncData;
258     auto kvdbManager = KvDBManager::GetInstance();
259     if (kvdbManager != nullptr) {
260         auto identifier = GetIdentifier();
261         auto kvdb = kvdbManager->FindKvDB(identifier);
262         if (kvdb != nullptr) {
263             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
264             syncData.isPermitForceWrite =
265                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
266             RefObject::DecObjRef(kvdb);
267         } else {
268             LOGE("[SingleVerEngine] kvdb is null.");
269         }
270     }
271     // cache atomic version represents version of cacheDb input next time
272     while (curMigrateVer < GetCacheRecordVersion()) {
273         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
274         if (errCode != E_OK) {
275             LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
276             break;
277         }
278         if (!syncData.isRemote) {
279             isNeedTriggerSync = true;
280         }
281     }
282     if (syncData.committedData != nullptr) {
283         RefObject::DecObjRef(syncData.committedData);
284         syncData.committedData = nullptr;
285     }
286     // When finished Migrating sync data, will fix engine state
287     return errCode;
288 }
289 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)290 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
291     EngineState stateBeforeMigrate)
292 {
293     LOGD("Begin attach main db and cache db by executor!");
294     // Judge the file corresponding to db by the engine status and attach it to another file
295     int errCode = E_OK;
296     std::string attachAbsPath;
297     if (stateBeforeMigrate == EngineState::MAINDB) {
298         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
299             DBConstant::DB_EXTENSION;
300         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
301     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
302         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
303         DBConstant::DB_EXTENSION;
304         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
305     } else {
306         return -E_NOT_SUPPORT;
307     }
308     if (errCode != E_OK) {
309         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
310         return errCode;
311     }
312 
313     uint64_t maxVersion = 0;
314     errCode = handle->GetMaxVersionInCacheDb(maxVersion);
315     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
316         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
317     }
318 
319     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
320     return errCode;
321 }
322 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const323 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
324 {
325     LOGD("Begin attach main db and cache db by sqlite handle!");
326     // Judge the file corresponding to db by the engine status and attach it to another file
327     int errCode = E_OK;
328     std::string attachAbsPath;
329     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
330         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
331             DBConstant::DB_EXTENSION;
332         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
333     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
334         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
335             DBConstant::DB_EXTENSION;
336         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
337     } else {
338         return -E_NOT_SUPPORT;
339     }
340     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
341         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
342         return errCode;
343     }
344 
345     return errCode;
346 }
347 
ReInit()348 int SQLiteSingleVerStorageEngine::ReInit()
349 {
350     return Init();
351 }
352 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)353 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
354 {
355     if (handle == nullptr) {
356         return E_OK;
357     }
358     StorageExecutor *databaseHandle = handle;
359     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
360     Recycle(databaseHandle);
361     handle = nullptr;
362     if (isCorrupted_) {
363         LOGE("Database is corrupted or invalid passwd!");
364         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
365     }
366     return E_OK;
367 }
368 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)369 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
370     EngineState stateBeforeMigrate)
371 {
372     LOGI("Begin to finish migrate and reinit db state!");
373     int errCode;
374     if (handle == nullptr) { // LCOV_EXCL_BR_LINE
375         return -E_INVALID_ARGS;
376     }
377 
378     if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
379         sqlite3 *dbHandle = nullptr;
380         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
381         if (errCode != E_OK) {
382             LOGE("Get Db handle failed! errCode = [%d]", errCode);
383             return errCode;
384         }
385 
386         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
387         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
388             LOGE("Execute the SQLite detach failed:%d", errCode);
389             return errCode;
390         }
391         // delete cachedb
392         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
393         if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
394             LOGE("Remove files of cache database after detach:%d", errCode);
395         }
396 
397         SetEngineState(EngineState::MAINDB);
398         return errCode;
399     }
400 
401     errCode = ReleaseExecutor(handle);
402     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
403         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
404         return errCode;
405     }
406 
407     // close db for reinit this engine
408     Release();
409 
410     // delete cache db
411     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
412     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
413         LOGE("Remove files of cache database after release current db:%d", errCode);
414         return errCode;
415     }
416 
417     // reInit, it will reset engine state
418     errCode = ReInit();
419     if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
420         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
421         return errCode;
422     }
423 
424     return E_OK;
425 }
426 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)427 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
428     EngineState preMigrateState)
429 {
430     // after attach main and cache need change operate data sql, changing state forbid operate database
431     SetEngineState(EngineState::MIGRATING);
432 
433     int errCode = E_OK;
434     // check if has been attach and attach cache and main for migrate
435     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
436         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
437         if (errCode != E_OK) {
438             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
439             // For lock state open db, can not attach main and cache
440             return errCode;
441         }
442     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
443         // Has been attach, maybe ever crashed, need update version
444         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
445         uint64_t maxVersion = 0;
446         errCode = handle->GetMaxVersionInCacheDb(maxVersion);
447         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
448             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
449         }
450         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
451     } else {
452         return -E_UNEXPECTED_DATA;
453     }
454 
455     return errCode;
456 }
457 
ExecuteMigrate()458 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
459 {
460     EngineState preState = GetEngineState();
461     std::lock_guard<std::mutex> lock(migrateLock_);
462     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
463         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
464         DBConstant::DB_EXTENSION)) {
465         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
466         return E_OK;
467     }
468 
469     // Get write executor for migrate
470     int errCode = E_OK;
471     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
472     if (errCode != E_OK) {
473         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
474         return errCode;
475     }
476 
477     isMigrating_.store(true);
478     LOGD("Migrate start.");
479     bool isNeedTriggerSync = false;
480     errCode = InitExecuteMigrate(handle, preState);
481     if (errCode != E_OK) {
482         LOGE("Init migrate data fail, errCode = [%d]", errCode);
483         goto END;
484     }
485 
486     LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
487         static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
488     // has been attached, Mark start of migration and it can migrate data
489     errCode = MigrateLocalData(handle);
490     if (errCode != E_OK) {
491         LOGE("Migrate local data fail, errCode = [%d]", errCode);
492         goto END;
493     }
494 
495     errCode = MigrateSyncData(handle, isNeedTriggerSync);
496     if (errCode != E_OK) {
497         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
498         goto END;
499     }
500 
501     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
502 
503     // detach database and delete cachedb
504     errCode = FinishMigrateData(handle, preState);
505     if (errCode != E_OK) {
506         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
507         goto END;
508     }
509 
510 END: // after FinishMigrateData, it will reset engine state
511     // there is no need cover the errCode
512     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
513     isMigrating_.store(false);
514     LOGD("Migrate stop.");
515     return errCode;
516 }
517 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)518 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
519     int errCode, bool isNeedTriggerSync)
520 {
521     LOGD("Finish migrating data! errCode = [%d]", errCode);
522     if (errCode != E_OK) {
523         SetEngineState(stateBeforeMigrate);
524     }
525     if (handle != nullptr) {
526         handle->ClearMigrateData();
527     }
528     errCode = ReleaseExecutor(handle);
529     if (errCode != E_OK) {
530         LOGE("release executor after migrating! errCode = [%d]", errCode);
531     }
532 
533     errCode = AddSubscribeToMainDBInMigrate();
534     if (errCode != E_OK) {
535         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
536     }
537 
538     // Notify max timestamp offset for SyncEngine.
539     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
540     RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
541     if (isNeedTriggerSync) {
542         commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
543             nullptr);
544     }
545     return;
546 }
547 
IsEngineCorrupted() const548 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
549 {
550     return isCorrupted_;
551 }
552 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)553 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
554 {
555     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
556     if (executor == nullptr) {
557         return executor;
558     }
559     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
560     return executor;
561 }
562 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)563 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
564 {
565     // Only could get the main database handle in the uninitialized and the main status.
566     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
567         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
568         return -E_EKEYREVOKED;
569     }
570 
571     if (!option_.isMemDb) {
572         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
573             DBConstant::DB_EXTENSION;
574     }
575 
576     OpenDbProperties optionTemp = option_;
577     if (!isWrite) {
578         optionTemp.createIfNecessary = false;
579     }
580 
581     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
582     if (errCode != E_OK) {
583         if (errno == EKEYREVOKED) {
584             LOGI("Failed to open the main database for key revoked[%d]", errCode);
585             errCode = -E_EKEYREVOKED;
586         }
587         return errCode;
588     }
589 
590     executorState_ = ExecutorState::MAINDB;
591     // Set the engine state to main status for that the main database is valid.
592     SetEngineState(EngineState::MAINDB);
593 
594     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
595         DBConstant::DB_EXTENSION)) {
596         // In status cacheDb crash
597         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
598         if (errCode != E_OK) {
599             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
600             return E_OK; // not care err to return, only use for print log
601         }
602         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
603         // cache and main existed together, can not read data, must execute migrate first
604         SetEngineState(EngineState::ATTACHING);
605     }
606 
607     return errCode;
608 }
609 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)610 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
611 {
612     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
613     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
614         secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
615     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
616         return errCode;
617     }
618     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
619         DBConstant::DB_EXTENSION;
620     if (!isWrite || GetEngineState() != EngineState::INVALID ||
621         OS::CheckPathExistence(cacheDbPath)) {
622         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
623             isWrite, GetEngineState());
624         return -E_EKEYREVOKED;
625     }
626 
627     errCode = GetCacheDbHandle(dbHandle);
628     if (errCode != E_OK) {
629         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
630         return errCode;
631     }
632     SetEngineState(EngineState::CACHEDB);
633     executorState_ = ExecutorState::CACHEDB;
634 
635     ResetCacheRecordVersion();
636     // Get handle means maindb file ekeyevoked, not need attach to
637     return errCode;
638 }
639 
640 namespace CacheDbSqls {
641 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
642     "CREATE TABLE IF NOT EXISTS local_data(" \
643         "key     BLOB   NOT NULL," \
644         "value  BLOB," \
645         "timestamp  INT," \
646         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
647         "flag  INT  NOT NULL);";
648 
649 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
650     "CREATE TABLE IF NOT EXISTS sync_data(" \
651         "key         BLOB NOT NULL," \
652         "value       BLOB," \
653         "timestamp   INT  NOT NULL," \
654         "flag        INT  NOT NULL," \
655         "device      BLOB," \
656         "ori_device  BLOB," \
657         "hash_key    BLOB  NOT NULL," \
658         "w_timestamp INT," \
659         "version     INT  NOT NULL," \
660         "PRIMARY Key(version, hash_key));";
661 }
662 
663 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
664 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)665 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
666 {
667     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
668         DBConstant::DB_EXTENSION;
669     // creatTable
670     option_.sqls = {
671         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
672         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
673     };
674 
675     if (!option_.createIfNecessary) {
676         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
677             DBConstant::DB_EXTENSION;
678         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
679             return -E_INVALID_DB;
680         }
681     }
682 
683     OpenDbProperties option = option_; // copy for no change it
684     option.createIfNecessary = true;
685     int errCode = SQLiteUtils::OpenDatabase(option, db);
686     if (errCode != E_OK) {
687         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
688         return errCode;
689     }
690     return errCode;
691 }
692 
CheckDatabaseSecOpt(const SecurityOption & secOption) const693 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
694 {
695     if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
696         secOption.securityLabel != SecurityLabel::NOT_SET &&
697         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
698         LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
699             secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
700             option_.securityOpt.securityFlag);
701     }
702 }
703 
CreateNewDirsAndSetSecOpt() const704 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
705 {
706     LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
707     return CreateNewDirsAndSetSecOption(option_);
708 }
709 
GetExistedSecOption(SecurityOption & secOption) const710 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
711 {
712     LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
713     return GetExistedSecOpt(option_, secOption);
714 }
715 
ClearCorruptedFlag()716 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
717 {
718     isCorrupted_ = false;
719 }
720 
PreCreateExecutor(bool isWrite,SecurityOption & existedSecOpt)721 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite, SecurityOption &existedSecOpt)
722 {
723     // Assume that create the write executor firstly and the write one we will not be released.
724     // If the write one would be released in the future, should take care the pass through.
725     if (!isWrite) {
726         return E_OK;
727     }
728 
729     if (option_.isMemDb) {
730         return E_OK;
731     }
732 
733     // check sqlite open ok
734     int errCode = CheckStoreStatus(option_);
735     if (errCode != E_OK) {
736         return errCode;
737     }
738 
739     // Get the existed database secure option.
740     errCode = GetExistedSecOption(existedSecOpt);
741     if (errCode != E_OK) {
742         return errCode;
743     }
744 
745     CheckDatabaseSecOpt(existedSecOpt);
746 
747     // Judge whether need update the security option of the engine.
748     // Should update the security in the import or rekey scene(inner) or exist is not set.
749     if (IsUseExistedSecOption(existedSecOpt, option_.securityOpt)) {
750         option_.securityOpt = existedSecOpt;
751     } else {
752         isNeedUpdateSecOpt_ = true;
753     }
754 
755     errCode = CreateNewDirsAndSetSecOpt();
756     if (errCode != E_OK) {
757         return errCode;
758     }
759 
760     if (!isUpdated_) {
761         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
762         if (errCode != E_OK) {
763             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
764             return errCode;
765         }
766     }
767 
768     return E_OK;
769 }
770 
EndCreateExecutor(sqlite3 * db,bool isWrite,bool isDetachMeta)771 int SQLiteSingleVerStorageEngine::EndCreateExecutor(sqlite3 *db, bool isWrite, bool isDetachMeta)
772 {
773     if (option_.isMemDb || !isWrite) {
774         return E_OK;
775     }
776 
777     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
778         isNeedUpdateSecOpt_);
779     if (errCode != E_OK) {
780         if (errCode == -E_NOT_SUPPORT) {
781             option_.securityOpt = SecurityOption();
782             errCode = E_OK;
783         }
784         LOGE("SetSecOption failed:%d", errCode);
785         return errCode;
786     }
787 
788     // after setting secOption, the database file operation ends
789     // database create completed, delete the token
790     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
791         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
792         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
793         return -E_SYSTEM_API_FAIL;
794     }
795     if (isDetachMeta) {
796         errCode = SQLiteUtils::ExecuteRawSQL(db, "DETACH 'meta'");
797         if (errCode != E_OK) {
798             LOGE("Detach meta db failed %d", errCode);
799         } else {
800             LOGI("Detach meta db success");
801         }
802     }
803     return errCode;
804 }
805 
TryAttachMetaDb(const SecurityOption & existedSecOpt,sqlite3 * & dbHandle,bool & isAttachMeta,bool & isNeedDetachMeta)806 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(const SecurityOption &existedSecOpt, sqlite3 *&dbHandle,
807     bool &isAttachMeta, bool &isNeedDetachMeta)
808 {
809     bool isCurrentSESECE = ParamCheckUtils::IsS3SECEOpt(existedSecOpt);
810     bool isOpenSESECE = ParamCheckUtils::IsS3SECEOpt(option_.securityOpt);
811     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
812     if ((!option_.isMemDb) && (isOpenSESECE || (isNeedUpdateSecOpt_ && isCurrentSESECE))) {
813         int errCode = AttachMetaDatabase(dbHandle, option_);
814         if (errCode != E_OK) {
815             (void)sqlite3_close_v2(dbHandle);
816             dbHandle = nullptr;
817             return errCode;
818         }
819         isAttachMeta = isOpenSESECE; // only open with S3 SECE need in attach mode
820         isNeedDetachMeta = !isOpenSESECE && isCurrentSESECE; // NOT S3 SECE no need meta.db
821     }
822     return E_OK;
823 }
824 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)825 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
826 {
827     SecurityOption existedSecOpt;
828     int errCode = PreCreateExecutor(isWrite, existedSecOpt);
829     if (errCode != E_OK) {
830         return errCode;
831     }
832 
833     sqlite3 *dbHandle = nullptr;
834     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
835     if (errCode != E_OK) {
836         return errCode;
837     }
838 
839     bool isAttachMeta = false;
840     bool isDetachMeta = false;
841     errCode = TryAttachMetaDb(existedSecOpt, dbHandle, isAttachMeta, isDetachMeta);
842     if (errCode != E_OK) {
843         return errCode;
844     }
845 
846     RegisterFunctionIfNeed(dbHandle);
847     errCode = Upgrade(dbHandle);
848     if (errCode != E_OK) {
849         (void)sqlite3_close_v2(dbHandle);
850         dbHandle = nullptr;
851         return errCode;
852     }
853 
854     errCode = EndCreateExecutor(dbHandle, isWrite, isDetachMeta);
855     if (errCode != E_OK) {
856         LOGE("After create executor, set security option incomplete!");
857         (void)sqlite3_close_v2(dbHandle);
858         dbHandle = nullptr;
859         return errCode;
860     }
861 
862     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
863     if (handle == nullptr) {
864         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
865         (void)sqlite3_close_v2(dbHandle);
866         dbHandle = nullptr;
867         return -E_OUT_OF_MEMORY;
868     }
869     if (isAttachMeta) {
870         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
871         singleVerHandle->SetAttachMetaMode(isAttachMeta);
872     }
873     return E_OK;
874 }
875 
Upgrade(sqlite3 * db)876 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
877 {
878     if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
879         LOGI("Storage engine [%.6s] is in cache status or has been upgraded[%d]!",
880             DBCommon::TransferStringToHex(identifier_).c_str(), isUpdated_);
881         return E_OK;
882     }
883 
884     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
885     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
886     if (option_.schema.empty()) {
887         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
888     } else {
889         SchemaObject schema;
890         int errCode = schema.ParseFromSchemaString(option_.schema);
891         if (errCode != E_OK) {
892             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
893             return errCode;
894         }
895         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
896             option_.securityOpt, option_.isMemDb);
897     }
898 
899     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
900     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
901     SecurityOption secOpt = option_.securityOpt;
902     int errCode = E_OK;
903     if (isNeedUpdateSecOpt_) {
904         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
905         if (errCode != E_OK) {
906             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
907             if (errCode != -E_NOT_SUPPORT) {
908                 return errCode;
909             }
910             secOpt = SecurityOption();
911         }
912     }
913 
914     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
915     upgrader->SetSubdir(option_.subdir);
916     errCode = upgrader->Upgrade();
917     if (errCode != E_OK) {
918         LOGE("Single ver database upgrade failed:%d", errCode);
919         return errCode;
920     }
921 
922     LOGD("Finish upgrade single ver database!");
923     isUpdated_ = true; // Identification to avoid repeated upgrades
924     std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
925     isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
926     return errCode;
927 }
928 
929 // Attention: This function should be called before "Upgrade".
930 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const931 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
932 {
933     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
934     // not a newly created database, the meta-Table should exist and can be accessed.
935     std::string schemaStr = option_.schema;
936     if (schemaStr.empty()) {
937         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
938         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
939         if (errCode != E_OK) {
940             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
941         }
942     }
943     if (!schemaStr.empty()) {
944         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
945         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
946         if (errCode != E_OK) { // Not very likely
947             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
948             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
949         }
950     }
951 
952     // This function is used to update meta_data in triggers when it's attached to mainDB
953     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
954     if (errCode != E_OK) {
955         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
956     }
957 }
958 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const959 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
960 {
961     int errCode;
962     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
963     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
964         DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
965     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
966     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
967         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
968         if (errCode != E_OK) {
969             return errCode;
970         }
971     }
972     CipherPassword passwd;
973     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
974     if (errCode != E_OK) {
975         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
976     }
977     return errCode;
978 }
979 
ResetCacheRecordVersion()980 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
981 {
982     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
983 }
984 
IncreaseCacheRecordVersion()985 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
986 {
987     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
988 }
989 
GetAndIncreaseCacheRecordVersion()990 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
991 {
992     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
993 }
994 
GetCacheRecordVersion() const995 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
996 {
997     return cacheRecordVersion_.load(std::memory_order_seq_cst);
998 }
999 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1000 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1001     int eventType) const
1002 {
1003     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1004     if (commitNotifyFunc_ == nullptr) {
1005         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1006         RefObject::DecObjRef(committedData);
1007         committedData = nullptr;
1008         return;
1009     }
1010     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1011     committedData = nullptr;
1012 }
1013 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1014 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1015 {
1016     if (committedData == nullptr) {
1017         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1018         return;
1019     }
1020     auto identifier = GetIdentifier();
1021     auto kvDBManager = KvDBManager::GetInstance();
1022     if (kvDBManager == nullptr) {
1023         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1024         return;
1025     }
1026     auto kvdb = kvDBManager->FindKvDB(identifier);
1027     if (kvdb == nullptr) {
1028         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1029         return;
1030     }
1031     unsigned int conflictFlag = 0;
1032     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1033         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1034         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1035     }
1036     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1037         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1038         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1039     }
1040     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1041         RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1042         conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1043     }
1044     RefObject::DecObjRef(kvdb);
1045     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1046     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1047 }
1048 
SetMaxValueSize(uint32_t maxValueSize)1049 void SQLiteSingleVerStorageEngine::SetMaxValueSize(uint32_t maxValueSize)
1050 {
1051     LOGI("Set the max value size to %" PRIu32, maxValueSize);
1052     maxValueSize_ = maxValueSize;
1053 }
1054 
GetMaxValueSize()1055 uint32_t SQLiteSingleVerStorageEngine::GetMaxValueSize()
1056 {
1057     return maxValueSize_;
1058 }
1059 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1060 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1061 {
1062     const auto &isRemote = syncData.isRemote;
1063     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1064     auto &committedData = syncData.committedData;
1065     auto &entries = syncData.entries;
1066 
1067     // Put data. Including insert, update and delete.
1068     if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1069         if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1070             int eventType = static_cast<int>(isRemote ?
1071                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1072                 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1073             CommitAndReleaseNotifyData(committedData, eventType);
1074         }
1075         return;
1076     }
1077 
1078     // Remove device data.
1079     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1080         return;
1081     }
1082     size_t totalSize = 0;
1083     for (auto iter = entries.begin(); iter != entries.end();) {
1084         auto &entry = *iter;
1085         if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1086             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1087             if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1088                 LOGE("Alloc committed notify data failed.");
1089                 return;
1090             }
1091         }
1092         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > maxValueSize_) { // LCOV_EXCL_BR_LINE
1093             iter++;
1094             continue;
1095         }
1096         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1097             CommitAndReleaseNotifyData(committedData,
1098                 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1099             totalSize = 0;
1100             continue;
1101         }
1102         totalSize += (entry.key.size() + entry.value.size());
1103         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1104         iter++;
1105     }
1106     if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1107         CommitAndReleaseNotifyData(committedData,
1108             static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1109     }
1110 }
1111 
1112 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1113 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1114 {
1115     std::lock_guard<std::mutex> lock(subscribeMutex_);
1116     subscribeQuery_[subscribeId] = query;
1117 }
1118 
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1119 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1120     const SecurityOption &openSecOpt)
1121 {
1122     if (isNeedUpdateSecOpt_) {
1123         return false;
1124     }
1125     if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1126         return false;
1127     }
1128     return true;
1129 }
1130 
UpgradeLocalMetaData()1131 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1132 {
1133     std::function<int(void)> schemaChangedFunc = nullptr;
1134     {
1135         std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1136         if (isSchemaChanged_) {
1137             schemaChangedFunc = schemaChangedFunc_;
1138             isSchemaChanged_ = false;
1139         }
1140     }
1141     if (schemaChangedFunc != nullptr) {
1142         return schemaChangedFunc();
1143     }
1144     return E_OK;
1145 }
1146 }
1147