• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "db_constant.h"
23 #include "sqlite_single_ver_database_upgrader.h"
24 #include "sqlite_single_ver_natural_store.h"
25 #include "sqlite_single_ver_schema_database_upgrader.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "db_common.h"
29 #include "kvdb_manager.h"
30 #include "param_check_utils.h"
31 
32 namespace DistributedDB {
33 namespace {
34     const uint64_t CACHE_RECORD_DEFAULT_VERSION = 1;
GetPathSecurityOption(const std::string & filePath,SecurityOption & secOpt)35     int GetPathSecurityOption(const std::string &filePath, SecurityOption &secOpt)
36     {
37         return RuntimeContext::GetInstance()->GetSecurityOption(filePath, secOpt);
38     }
39 
40     enum class DbType {
41         MAIN,
42         META,
43         CACHE
44     };
45 
GetDbDir(const std::string & subDir,DbType type)46     std::string GetDbDir(const std::string &subDir, DbType type)
47     {
48         static const std::map<DbType, std::string> dbDirDic {
49             { DbType::MAIN, DBConstant::MAINDB_DIR },
50             { DbType::META, DBConstant::METADB_DIR },
51             { DbType::CACHE, DBConstant::CACHEDB_DIR },
52         }; // for ensure static compilation order
53 
54         if (dbDirDic.find(type) == dbDirDic.end()) {
55             return std::string();
56         }
57         return subDir + "/" + dbDirDic.at(type);
58     }
59 } // namespace
60 
SQLiteSingleVerStorageEngine()61 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
62     : cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
63       executorState_(ExecutorState::INVALID),
64       isCorrupted_(false),
65       isNeedUpdateSecOpt_(false)
66 {}
67 
~SQLiteSingleVerStorageEngine()68 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
69 {
70 }
71 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const72 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
73 {
74     return handle->MigrateLocalData();
75 }
76 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)77 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
78     const std::vector<DataItem> &dataItems)
79 {
80     int errCode = E_OK;
81     for (const auto &dataItem : dataItems) {
82         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
83             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
84             auto kvdbManager = KvDBManager::GetInstance();
85             if (kvdbManager == nullptr) {
86                 return -E_INVALID_DB;
87             }
88 
89             // sync module will use handle to fix water mark, if fix fail then migrate fail, not need hold write handle
90             errCode = ReleaseExecutor(handle);
91             if (errCode != E_OK) {
92                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
93                 return errCode;
94             }
95 
96             auto identifier = GetIdentifier();
97             auto kvdb = kvdbManager->FindKvDB(identifier);
98             if (kvdb == nullptr) {
99                 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
100                 return -E_INVALID_DB;
101             }
102 
103             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
104             errCode = kvStore->EraseDeviceWaterMark(dataItem.dev, false);
105             RefObject::DecObjRef(kvdb);
106             if (errCode != E_OK) {
107                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
108                 return errCode;
109             }
110 
111             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
112                 errCode));
113             if (errCode != E_OK) {
114                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
115                 return errCode;
116             }
117         }
118     }
119     return errCode;
120 }
121 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)122 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
123     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
124 {
125     if (syncData.committedData == nullptr) {
126         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
127         if (syncData.committedData == nullptr) {
128             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
129             return -E_OUT_OF_MEMORY;
130         }
131     }
132     InitConflictNotifiedFlag(syncData.committedData);
133 
134     std::vector<DataItem> dataItems;
135     uint64_t minVerIncurCacheDb = 0;
136     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
137     if (errCode != E_OK) {
138         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
139         return errCode;
140     }
141 
142     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
143         ++curMigrateVer;
144         return E_OK;
145     }
146 
147     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
148         curMigrateVer = minVerIncurCacheDb;
149     }
150 
151     // Call the syncer module to erase the water mark.
152     errCode = EraseDeviceWaterMark(handle, dataItems);
153     if (errCode != E_OK) {
154         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
155         return errCode;
156     }
157 
158     // next version need process
159     LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
160         curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
161     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
162     if (errCode != E_OK) {
163         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
164         return errCode;
165     }
166 
167     CommitNotifyForMigrateCache(syncData);
168 
169     Timestamp timestamp = 0;
170     errCode = handle->GetMaxTimestampDuringMigrating(timestamp);
171     if (errCode == E_OK) {
172         SetMaxTimestamp(timestamp);
173     }
174 
175     errCode = ReleaseHandleTransiently(handle, 2ull); // temporary release handle 2ms
176     if (errCode != E_OK) {
177         return errCode;
178     }
179 
180     return E_OK;
181 }
182 
183 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime)184 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime)
185 {
186     int errCode = ReleaseExecutor(handle);
187     if (errCode != E_OK) {
188         LOGE("release executor for reopen database! errCode = [%d]", errCode);
189         return errCode;
190     }
191 
192     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
193     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
194     if (errCode != E_OK) {
195         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
196         return errCode;
197     }
198     return errCode;
199 }
200 
AddSubscribeToMainDBInMigrate()201 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
202 {
203     LOGD("Add subscribe to mainDB from cache. %d", engineState_);
204     std::lock_guard<std::mutex> lock(subscribeMutex_);
205     if (subscribeQuery_.empty()) {
206         return E_OK;
207     }
208     int errCode = E_OK;
209     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
210     if (errCode != E_OK || handle == nullptr) {
211         LOGE("Get available executor for add subscribe failed. %d", errCode);
212         return errCode;
213     }
214     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
215     if (errCode != E_OK) {
216         goto END;
217     }
218     for (auto item : subscribeQuery_) {
219         errCode = handle->AddSubscribeTrigger(item.second, item.first);
220         if (errCode != E_OK) {
221             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
222         }
223     }
224     subscribeQuery_.clear();
225     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
226     (void)handle->Commit();
227 END:
228     ReleaseExecutor(handle);
229     return errCode;
230 }
231 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)232 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
233 {
234     int errCode = E_OK;
235     if (handle == nullptr) {
236         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
237         if (errCode != E_OK) {
238             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
239             return errCode;
240         }
241     }
242 
243     LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion() - 1);
244     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
245     NotifyMigrateSyncData syncData;
246     auto kvdbManager = KvDBManager::GetInstance();
247     if (kvdbManager != nullptr) {
248         auto identifier = GetIdentifier();
249         auto kvdb = kvdbManager->FindKvDB(identifier);
250         if (kvdb != nullptr) {
251             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
252             syncData.isPermitForceWrite =
253                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
254             RefObject::DecObjRef(kvdb);
255         } else {
256             LOGE("[SingleVerEngine] kvdb is null.");
257         }
258     }
259     // cache atomic version represents version of cacheDb input next time
260     while (curMigrateVer < GetCacheRecordVersion()) {
261         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
262         if (errCode != E_OK) {
263             LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
264             break;
265         }
266         if (!syncData.isRemote) {
267             isNeedTriggerSync = true;
268         }
269     }
270     if (syncData.committedData != nullptr) {
271         RefObject::DecObjRef(syncData.committedData);
272         syncData.committedData = nullptr;
273     }
274     // When finished Migrating sync data, will fix engine state
275     return errCode;
276 }
277 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)278 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
279     EngineState stateBeforeMigrate)
280 {
281     LOGD("Begin attach main db and cache db by executor!");
282     // Judge the file corresponding to db by the engine status and attach it to another file
283     int errCode = E_OK;
284     std::string attachAbsPath;
285     if (stateBeforeMigrate == EngineState::MAINDB) {
286         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
287             DBConstant::SQLITE_DB_EXTENSION;
288         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
289     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
290         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
291         DBConstant::SQLITE_DB_EXTENSION;
292         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
293     } else {
294         return -E_NOT_SUPPORT;
295     }
296     if (errCode != E_OK) {
297         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
298         return errCode;
299     }
300 
301     uint64_t maxVersion = 0;
302     errCode = handle->GetMaxVersionInCacheDb(maxVersion);
303     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
304         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
305     }
306 
307     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
308     return errCode;
309 }
310 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const311 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
312 {
313     LOGD("Begin attach main db and cache db by sqlite handle!");
314     // Judge the file corresponding to db by the engine status and attach it to another file
315     int errCode = E_OK;
316     std::string attachAbsPath;
317     if (stateBeforeMigrate == EngineState::MAINDB) {
318         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
319             DBConstant::SQLITE_DB_EXTENSION;
320         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
321     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
322         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
323             DBConstant::SQLITE_DB_EXTENSION;
324         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
325     } else {
326         return -E_NOT_SUPPORT;
327     }
328     if (errCode != E_OK) {
329         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
330         return errCode;
331     }
332 
333     return errCode;
334 }
335 
ReInit()336 int SQLiteSingleVerStorageEngine::ReInit()
337 {
338     return Init();
339 }
340 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)341 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
342 {
343     if (handle == nullptr) {
344         return E_OK;
345     }
346     StorageExecutor *databaseHandle = handle;
347     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
348     Recycle(databaseHandle);
349     handle = nullptr;
350     if (isCorrupted_) {
351         LOGE("Database is corrupted!");
352         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
353     }
354     return E_OK;
355 }
356 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)357 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
358     EngineState stateBeforeMigrate)
359 {
360     LOGI("Begin to finish migrate and reinit db state!");
361     int errCode;
362     if (handle == nullptr) {
363         return -E_INVALID_ARGS;
364     }
365 
366     if (stateBeforeMigrate == EngineState::MAINDB) {
367         sqlite3 *dbHandle = nullptr;
368         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
369         if (errCode != E_OK) {
370             LOGE("Get Db handle failed! errCode = [%d]", errCode);
371             return errCode;
372         }
373 
374         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
375         if (errCode != E_OK) {
376             LOGE("Execute the SQLite detach failed:%d", errCode);
377             return errCode;
378         }
379         // delete cachedb
380         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
381         if (errCode != E_OK) {
382             LOGE("Remove files of cache database after detach:%d", errCode);
383         }
384 
385         SetEngineState(EngineState::MAINDB);
386         return errCode;
387     }
388 
389     errCode = ReleaseExecutor(handle);
390     if (errCode != E_OK) {
391         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
392         return errCode;
393     }
394 
395     // close db for reinit this engine
396     Release();
397 
398     // delete cache db
399     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
400     if (errCode != E_OK) {
401         LOGE("Remove files of cache database after release current db:%d", errCode);
402         return errCode;
403     }
404 
405     // reInit, it will reset engine state
406     errCode = ReInit();
407     if (errCode != E_OK) {
408         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
409         return errCode;
410     }
411 
412     return E_OK;
413 }
414 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)415 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
416     EngineState preMigrateState)
417 {
418     // after attach main and cache need change operate data sql, changing state forbid operate database
419     SetEngineState(EngineState::MIGRATING);
420 
421     int errCode = E_OK;
422     // check if has been attach and attach cache and main for migrate
423     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
424         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
425         if (errCode != E_OK) {
426             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
427             // For lock state open db, can not attach main and cache
428             return errCode;
429         }
430     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
431         // Has been attach, maybe ever crashed, need update version
432         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
433         uint64_t maxVersion = 0;
434         errCode = handle->GetMaxVersionInCacheDb(maxVersion);
435         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
436             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
437         }
438         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
439     } else {
440         return -E_UNEXPECTED_DATA;
441     }
442 
443     return errCode;
444 }
445 
ExecuteMigrate()446 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
447 {
448     EngineState preState = GetEngineState();
449     std::lock_guard<std::mutex> lock(migrateLock_);
450     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
451         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
452         DBConstant::SQLITE_DB_EXTENSION)) {
453         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
454         return E_OK;
455     }
456 
457     // Get write executor for migrate
458     int errCode = E_OK;
459     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
460     if (errCode != E_OK) {
461         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
462         return errCode;
463     }
464 
465     isMigrating_.store(true);
466     LOGD("Migrate start.");
467     bool isNeedTriggerSync = false;
468     errCode = InitExecuteMigrate(handle, preState);
469     if (errCode != E_OK) {
470         LOGE("Init migrate data fail, errCode = [%d]", errCode);
471         goto END;
472     }
473 
474     LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
475         static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
476     // has been attached, Mark start of migration and it can migrate data
477     errCode = MigrateLocalData(handle);
478     if (errCode != E_OK) {
479         LOGE("Migrate local data fail, errCode = [%d]", errCode);
480         goto END;
481     }
482 
483     errCode = MigrateSyncData(handle, isNeedTriggerSync);
484     if (errCode != E_OK) {
485         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
486         goto END;
487     }
488 
489     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
490 
491     // detach database and delete cachedb
492     errCode = FinishMigrateData(handle, preState);
493     if (errCode != E_OK) {
494         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
495         goto END;
496     }
497 
498 END: // after FinishMigrateData, it will reset engine state
499     // there is no need cover the errCode
500     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
501     isMigrating_.store(false);
502     LOGD("Migrate stop.");
503     return errCode;
504 }
505 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)506 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
507     int errCode, bool isNeedTriggerSync)
508 {
509     LOGD("Finish migrating data! errCode = [%d]", errCode);
510     if (errCode != E_OK) {
511         SetEngineState(stateBeforeMigrate);
512     }
513     if (handle != nullptr) {
514         handle->ClearMigrateData();
515     }
516     errCode = ReleaseExecutor(handle);
517     if (errCode != E_OK) {
518         LOGE("release executor after migrating! errCode = [%d]", errCode);
519     }
520 
521     errCode = AddSubscribeToMainDBInMigrate();
522     if (errCode != E_OK) {
523         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
524     }
525     // Notify max timestamp offset for SyncEngine.
526     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
527     RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
528     if (isNeedTriggerSync) {
529         commitNotifyFunc_(SQLITE_GENERAL_FINISH_MIGRATE_EVENT, nullptr);
530     }
531     return;
532 }
533 
IsEngineCorrupted() const534 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
535 {
536     return isCorrupted_;
537 }
538 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)539 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
540 {
541     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
542     if (executor == nullptr) {
543         return executor;
544     }
545     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
546     return executor;
547 }
548 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)549 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
550 {
551     // Only could get the main database handle in the uninitialized and the main status.
552     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
553         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
554         return -E_EKEYREVOKED;
555     }
556 
557     if (!option_.isMemDb) {
558         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
559             DBConstant::SQLITE_DB_EXTENSION;
560     }
561 
562     OpenDbProperties optionTemp = option_;
563     if (!isWrite) {
564         optionTemp.createIfNecessary = false;
565     }
566 
567     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
568     if (errCode != E_OK) {
569         if (errno == EKEYREVOKED) {
570             LOGI("Failed to open the main database for key revoked[%d]", errCode);
571             errCode = -E_EKEYREVOKED;
572         }
573         return errCode;
574     }
575 
576     executorState_ = ExecutorState::MAINDB;
577     // Set the engine state to main status for that the main database is valid.
578     SetEngineState(EngineState::MAINDB);
579 
580     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
581         DBConstant::SQLITE_DB_EXTENSION)) {
582         // In status cacheDb crash
583         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
584         if (errCode != E_OK) {
585             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
586             return E_OK; // not care err to return, only use for print log
587         }
588         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
589         // cache and main existed together, can not read data, must execute migrate first
590         SetEngineState(EngineState::ATTACHING);
591     }
592 
593     return errCode;
594 }
595 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)596 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
597 {
598     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
599     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
600         secOpt.securityLabel, secOpt.securityFlag, DBCommon::TransferStringToHex(identifier_).c_str(), errCode);
601     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
602         return errCode;
603     }
604 
605     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
606         DBConstant::SQLITE_DB_EXTENSION;
607     if (!isWrite || GetEngineState() != EngineState::INVALID ||
608         OS::CheckPathExistence(cacheDbPath)) {
609         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
610             isWrite, GetEngineState());
611         return -E_EKEYREVOKED;
612     }
613 
614     errCode = GetCacheDbHandle(dbHandle);
615     if (errCode != E_OK) {
616         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
617         return errCode;
618     }
619     SetEngineState(CACHEDB);
620     executorState_ = ExecutorState::CACHEDB;
621 
622     ResetCacheRecordVersion();
623     // Get handle means maindb file ekeyevoked, not need attach to
624     return errCode;
625 }
626 
627 namespace CacheDbSqls {
628 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
629     "CREATE TABLE IF NOT EXISTS local_data(" \
630         "key     BLOB   NOT NULL," \
631         "value  BLOB," \
632         "timestamp  INT," \
633         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
634         "flag  INT  NOT NULL);";
635 
636 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
637     "CREATE TABLE IF NOT EXISTS sync_data(" \
638         "key         BLOB NOT NULL," \
639         "value       BLOB," \
640         "timestamp   INT  NOT NULL," \
641         "flag        INT  NOT NULL," \
642         "device      BLOB," \
643         "ori_device  BLOB," \
644         "hash_key    BLOB  NOT NULL," \
645         "w_timestamp INT," \
646         "version     INT  NOT NULL," \
647         "PRIMARY Key(version, hash_key));";
648 }
649 
650 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
651 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)652 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
653 {
654     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
655         DBConstant::SQLITE_DB_EXTENSION;
656     // creatTable
657     option_.sqls = {
658         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
659         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
660     };
661 
662     if (!option_.createIfNecessary) {
663         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
664             DBConstant::SQLITE_DB_EXTENSION;
665         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
666             return -E_INVALID_DB;
667         }
668     }
669 
670     OpenDbProperties option = option_; // copy for no change it
671     option.createIfNecessary = true;
672     int errCode = SQLiteUtils::OpenDatabase(option, db);
673     if (errCode != E_OK) {
674         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
675         return errCode;
676     }
677     return errCode;
678 }
679 
CheckDatabaseSecOpt(const SecurityOption & secOption) const680 int SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
681 {
682     if (!(secOption == option_.securityOpt) &&
683         secOption.securityLabel != SecurityLabel::NOT_SET &&
684         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
685         LOGE("SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]", secOption.securityLabel,
686             secOption.securityFlag, option_.securityOpt.securityLabel, option_.securityOpt.securityFlag);
687         return -E_SECURITY_OPTION_CHECK_ERROR;
688     }
689     return E_OK;
690 }
691 
CreateNewDirsAndSetSecOpt() const692 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
693 {
694     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
695     for (const auto &item : dbDir) {
696         if (OS::CheckPathExistence(option_.subdir + "/" + item)) {
697             continue;
698         }
699 
700         // Dir and old db file not existed, it means that the database is newly created
701         // need create flag of database not incomplete
702         if (!OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
703             !OS::CheckPathExistence(option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE +
704             DBConstant::SQLITE_DB_EXTENSION) &&
705             OS::CreateFileByFileName(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
706             LOGE("Fail to create the token of database incompleted! errCode = [E_SYSTEM_API_FAIL]");
707             return -E_SYSTEM_API_FAIL;
708         }
709 
710         if (DBCommon::CreateDirectory(option_.subdir + "/" + item) != E_OK) {
711             LOGE("Create sub-directory for single ver failed, errno:%d", errno);
712             return -E_SYSTEM_API_FAIL;
713         }
714 
715         if (option_.securityOpt.securityLabel == NOT_SET) {
716             continue;
717         }
718 
719         SecurityOption option = option_.securityOpt;
720         if (item == DBConstant::METADB_DIR) {
721             option.securityLabel = ((option_.securityOpt.securityLabel >= SecurityLabel::S2) ?
722                 SecurityLabel::S2 : option_.securityOpt.securityLabel);
723             option.securityFlag = SecurityFlag::ECE;
724         }
725 
726         int errCode = RuntimeContext::GetInstance()->SetSecurityOption(option_.subdir + "/" + item, option);
727         if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
728             LOGE("Set the security option of sub-directory failed[%d]", errCode);
729             return errCode;
730         }
731     }
732     return E_OK;
733 }
734 
GetExistedSecOption(SecurityOption & secOption) const735 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
736 {
737     // Check the existence of the database, include the origin database and the database in the 'main' directory.
738     auto mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
739     auto mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
740     auto origDbFilePath = option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
741     if (!OS::CheckPathExistence(origDbFilePath) && !OS::CheckPathExistence(mainDbFilePath)) {
742         secOption = option_.securityOpt;
743         return E_OK;
744     }
745 
746     // the main database file has high priority of the security option.
747     int errCode;
748     if (OS::CheckPathExistence(mainDbFilePath)) {
749         errCode = GetPathSecurityOption(mainDbFilePath, secOption);
750     } else {
751         errCode = GetPathSecurityOption(origDbFilePath, secOption);
752     }
753     if (errCode != E_OK) {
754         secOption = SecurityOption();
755         if (errCode == -E_NOT_SUPPORT) {
756             return E_OK;
757         }
758         LOGE("Get the security option of the existed database failed.");
759     }
760     return errCode;
761 }
762 
ClearCorruptedFlag()763 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
764 {
765     isCorrupted_ = false;
766 }
767 
PreCreateExecutor(bool isWrite)768 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
769 {
770     // Assume that create the write executor firstly and the write one we will not be released.
771     // If the write one would be released in the future, should take care the pass through.
772     if (!isWrite) {
773         return E_OK;
774     }
775 
776     if (option_.isMemDb) {
777         return E_OK;
778     }
779 
780     // Get the existed database secure option.
781     SecurityOption existedSecOpt;
782     int errCode = GetExistedSecOption(existedSecOpt);
783     if (errCode != E_OK) {
784         return errCode;
785     }
786 
787     errCode = CheckDatabaseSecOpt(existedSecOpt);
788     if (errCode != E_OK) {
789         return errCode;
790     }
791 
792     // Judge whether need update the security option of the engine.
793     // Should update the security in the import or rekey scene(inner).
794     if (!isNeedUpdateSecOpt_) {
795         option_.securityOpt = existedSecOpt;
796     }
797 
798     errCode = CreateNewDirsAndSetSecOpt();
799     if (errCode != E_OK) {
800         return errCode;
801     }
802 
803     if (!isUpdated_) {
804         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
805         if (errCode != E_OK) {
806             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
807             return errCode;
808         }
809     }
810 
811     return E_OK;
812 }
813 
EndCreateExecutor(bool isWrite)814 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
815 {
816     if (option_.isMemDb || !isWrite) {
817         return E_OK;
818     }
819 
820     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
821         isNeedUpdateSecOpt_);
822     if (errCode != E_OK) {
823         if (errCode == -E_NOT_SUPPORT) {
824             option_.securityOpt = SecurityOption();
825             errCode = E_OK;
826         }
827         LOGE("SetSecOption failed:%d", errCode);
828         return errCode;
829     }
830 
831     // after setting secOption, the database file operation ends
832     // database create completed, delete the token
833     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
834         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
835         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
836         return -E_SYSTEM_API_FAIL;
837     }
838     return errCode;
839 }
840 
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)841 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
842 {
843     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
844     if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
845         int errCode = AttachMetaDatabase(dbHandle, option_);
846         if (errCode != E_OK) {
847             (void)sqlite3_close_v2(dbHandle);
848             dbHandle = nullptr;
849             return errCode;
850         }
851         isAttachMeta = true;
852     }
853     return E_OK;
854 }
855 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)856 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
857 {
858     int errCode = PreCreateExecutor(isWrite);
859     if (errCode != E_OK) {
860         return errCode;
861     }
862 
863     sqlite3 *dbHandle = nullptr;
864     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
865     if (errCode != E_OK) {
866         return errCode;
867     }
868 
869     bool isAttachMeta = false;
870     errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
871     if (errCode != E_OK) {
872         return errCode;
873     }
874 
875     RegisterFunctionIfNeed(dbHandle);
876     errCode = Upgrade(dbHandle);
877     if (errCode != E_OK) {
878         (void)sqlite3_close_v2(dbHandle);
879         dbHandle = nullptr;
880         return errCode;
881     }
882 
883     errCode = EndCreateExecutor(isWrite);
884     if (errCode != E_OK) {
885         LOGE("After create executor, set security option incomplete!");
886         (void)sqlite3_close_v2(dbHandle);
887         dbHandle = nullptr;
888         return errCode;
889     }
890 
891     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
892     if (handle == nullptr) {
893         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
894         (void)sqlite3_close_v2(dbHandle);
895         dbHandle = nullptr;
896         return -E_OUT_OF_MEMORY;
897     }
898     if (isAttachMeta) {
899         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
900         singleVerHandle->SetAttachMetaMode(isAttachMeta);
901     }
902     return E_OK;
903 }
904 
Upgrade(sqlite3 * db)905 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
906 {
907     if (isUpdated_ || GetEngineState() == CACHEDB) {
908         LOGI("Storage engine is in cache status or has been upgraded[%d]!", isUpdated_);
909         return E_OK;
910     }
911 
912     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
913     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
914     if (option_.schema.empty()) {
915         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
916     } else {
917         SchemaObject schema;
918         int errCode = schema.ParseFromSchemaString(option_.schema);
919         if (errCode != E_OK) {
920             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
921             return errCode;
922         }
923         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
924             option_.securityOpt, option_.isMemDb);
925     }
926 
927     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
928     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
929     SecurityOption secOpt = option_.securityOpt;
930     int errCode = E_OK;
931     if (isNeedUpdateSecOpt_) {
932         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
933         if (errCode != E_OK) {
934             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
935             if (errCode != -E_NOT_SUPPORT) {
936                 return errCode;
937             }
938             secOpt = SecurityOption();
939         }
940     }
941 
942     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
943     upgrader->SetSubdir(option_.subdir);
944     errCode = upgrader->Upgrade();
945     if (errCode != E_OK) {
946         LOGE("Single ver database upgrade failed:%d", errCode);
947         return errCode;
948     }
949 
950     LOGD("Finish upgrade single ver database!");
951     isUpdated_ = true; // Identification to avoid repeated upgrades
952     return errCode;
953 }
954 
955 // Attention: This function should be called before "Upgrade".
956 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const957 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
958 {
959     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
960     // not a newly created database, the meta-Table should exist and can be accessed.
961     std::string schemaStr = option_.schema;
962     if (schemaStr.empty()) {
963         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
964         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
965         if (errCode != E_OK) {
966             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
967         }
968     }
969     if (!schemaStr.empty()) {
970         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
971         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
972         if (errCode != E_OK) { // Not very likely
973             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
974             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
975         }
976     }
977 
978     // This function is used to update meta_data in triggers when it's attached to mainDB
979     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
980     if (errCode != E_OK) {
981         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
982     }
983 }
984 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const985 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
986 {
987     int errCode;
988     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
989     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
990         DBConstant::SINGLE_VER_META_STORE + DBConstant::SQLITE_DB_EXTENSION;
991     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
992     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
993         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
994         if (errCode != E_OK) {
995             return errCode;
996         }
997     }
998     CipherPassword passwd;
999     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
1000     if (errCode != E_OK) {
1001         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
1002     }
1003     return errCode;
1004 }
1005 
ResetCacheRecordVersion()1006 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1007 {
1008     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1009 }
1010 
IncreaseCacheRecordVersion()1011 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1012 {
1013     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1014 }
1015 
GetAndIncreaseCacheRecordVersion()1016 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1017 {
1018     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1019 }
1020 
GetCacheRecordVersion() const1021 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1022 {
1023     return cacheRecordVersion_.load(std::memory_order_seq_cst);
1024 }
1025 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1026 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1027     int eventType) const
1028 {
1029     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1030     if (commitNotifyFunc_ == nullptr) {
1031         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1032         RefObject::DecObjRef(committedData);
1033         committedData = nullptr;
1034         return;
1035     }
1036     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1037     committedData = nullptr;
1038     return;
1039 }
1040 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1041 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1042 {
1043     if (committedData == nullptr) {
1044         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1045         return;
1046     }
1047     auto identifier = GetIdentifier();
1048     auto kvdb = KvDBManager::GetInstance()->FindKvDB(identifier);
1049     if (kvdb == nullptr) {
1050         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1051         return;
1052     }
1053     unsigned int conflictFlag = 0;
1054     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1055         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1056     }
1057     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1058         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1059     }
1060     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1061         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_NATIVE_ALL);
1062     }
1063     RefObject::DecObjRef(kvdb);
1064     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1065     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1066 }
1067 
SetMaxTimestamp(Timestamp maxTimestamp) const1068 void SQLiteSingleVerStorageEngine::SetMaxTimestamp(Timestamp maxTimestamp) const
1069 {
1070     auto kvdbManager = KvDBManager::GetInstance();
1071     if (kvdbManager == nullptr) {
1072         return;
1073     }
1074     auto identifier = GetIdentifier();
1075     auto kvdb = kvdbManager->FindKvDB(identifier);
1076     if (kvdb == nullptr) {
1077         LOGE("[SQLiteSingleVerStorageEngine::SetMaxTimestamp] kvdb is null.");
1078         return;
1079     }
1080 
1081     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
1082     kvStore->SetMaxTimestamp(maxTimestamp);
1083     RefObject::DecObjRef(kvdb);
1084     return;
1085 }
1086 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1087 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1088 {
1089     const auto &isRemote = syncData.isRemote;
1090     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1091     auto &committedData = syncData.committedData;
1092     auto &entries = syncData.entries;
1093 
1094     // Put data. Including insert, update and delete.
1095     if (!isRemoveDeviceData) {
1096         if (committedData != nullptr) {
1097             int eventType = isRemote ? SQLITE_GENERAL_NS_SYNC_EVENT : SQLITE_GENERAL_NS_PUT_EVENT;
1098             CommitAndReleaseNotifyData(committedData, eventType);
1099         }
1100         return;
1101     }
1102 
1103     // Remove device data.
1104     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1105         return;
1106     }
1107     size_t totalSize = 0;
1108     for (auto iter = entries.begin(); iter != entries.end();) {
1109         auto &entry = *iter;
1110         if (committedData == nullptr) {
1111             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1112             if (committedData == nullptr) {
1113                 LOGE("Alloc committed notify data failed.");
1114                 return;
1115             }
1116         }
1117         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > DBConstant::MAX_VALUE_SIZE) {
1118             iter++;
1119             continue;
1120         }
1121         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1122             CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1123             totalSize = 0;
1124             continue;
1125         }
1126         totalSize += (entry.key.size() + entry.value.size());
1127         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1128         iter++;
1129     }
1130     if (committedData != nullptr) {
1131         CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1132     }
1133     return;
1134 }
1135 
1136 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1137 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1138 {
1139     std::lock_guard<std::mutex> lock(subscribeMutex_);
1140     subscribeQuery_[subscribeId] = query;
1141 }
1142 }
1143