• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_engine.h"
17 
18 #include <memory>
19 
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "db_constant.h"
23 #include "sqlite_single_ver_database_upgrader.h"
24 #include "sqlite_single_ver_natural_store.h"
25 #include "sqlite_single_ver_schema_database_upgrader.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "db_common.h"
29 #include "kvdb_manager.h"
30 #include "param_check_utils.h"
31 
32 namespace DistributedDB {
33 namespace {
34     const uint64_t CACHE_RECORD_DEFAULT_VERSION = 1;
GetPathSecurityOption(const std::string & filePath,SecurityOption & secOpt)35     int GetPathSecurityOption(const std::string &filePath, SecurityOption &secOpt)
36     {
37         return RuntimeContext::GetInstance()->GetSecurityOption(filePath, secOpt);
38     }
39 
40     enum class DbType {
41         MAIN,
42         META,
43         CACHE
44     };
45 
GetDbDir(const std::string & subDir,DbType type)46     std::string GetDbDir(const std::string &subDir, DbType type)
47     {
48         static const std::map<DbType, std::string> dbDirDic {
49             { DbType::MAIN, DBConstant::MAINDB_DIR },
50             { DbType::META, DBConstant::METADB_DIR },
51             { DbType::CACHE, DBConstant::CACHEDB_DIR },
52         }; // for ensure static compilation order
53 
54         if (dbDirDic.find(type) == dbDirDic.end()) {
55             return std::string();
56         }
57         return subDir + "/" + dbDirDic.at(type);
58     }
59 } // namespace
60 
SQLiteSingleVerStorageEngine()61 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
62     : cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
63       executorState_(ExecutorState::INVALID),
64       isCorrupted_(false),
65       isNeedUpdateSecOpt_(false)
66 {}
67 
~SQLiteSingleVerStorageEngine()68 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
69 {}
70 
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const71 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
72 {
73     return handle->MigrateLocalData();
74 }
75 
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)76 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
77     const std::vector<DataItem> &dataItems)
78 {
79     int errCode = E_OK;
80     for (const auto &dataItem : dataItems) {
81         if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
82             (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
83             auto kvdbManager = KvDBManager::GetInstance();
84             if (kvdbManager == nullptr) {
85                 return -E_INVALID_DB;
86             }
87 
88             // sync module will use handle to fix water mark, if fix fail then migrate fail, not need hold write handle
89             errCode = ReleaseExecutor(handle);
90             if (errCode != E_OK) {
91                 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
92                 return errCode;
93             }
94 
95             auto identifier = GetIdentifier();
96             auto kvdb = kvdbManager->FindKvDB(identifier);
97             if (kvdb == nullptr) {
98                 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
99                 return -E_INVALID_DB;
100             }
101 
102             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
103             errCode = kvStore->EraseDeviceWaterMark(dataItem.dev, false);
104             RefObject::DecObjRef(kvdb);
105             if (errCode != E_OK) {
106                 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
107                 return errCode;
108             }
109 
110             handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
111                 errCode));
112             if (errCode != E_OK) {
113                 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
114                 return errCode;
115             }
116         }
117     }
118     return errCode;
119 }
120 
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)121 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
122     NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
123 {
124     if (syncData.committedData == nullptr) {
125         syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
126         if (syncData.committedData == nullptr) {
127             LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
128             return -E_OUT_OF_MEMORY;
129         }
130     }
131     InitConflictNotifiedFlag(syncData.committedData);
132 
133     std::vector<DataItem> dataItems;
134     uint64_t minVerIncurCacheDb = 0;
135     int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
136     if (errCode != E_OK) {
137         LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
138         return errCode;
139     }
140 
141     if (minVerIncurCacheDb == 0) { // min version in cache db is 1
142         ++curMigrateVer;
143         return E_OK;
144     }
145 
146     if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
147         curMigrateVer = minVerIncurCacheDb;
148     }
149 
150     // Call the syncer module to erase the water mark.
151     errCode = EraseDeviceWaterMark(handle, dataItems);
152     if (errCode != E_OK) {
153         LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
154         return errCode;
155     }
156 
157     // next version need process
158     LOGD("MigrateVer[%llu], minVer[%llu] maxVer[%llu]", curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
159     errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
160     if (errCode != E_OK) {
161         LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
162         return errCode;
163     }
164 
165     CommitNotifyForMigrateCache(syncData);
166 
167     TimeStamp timestamp = 0;
168     errCode = handle->GetMaxTimeStampDuringMigrating(timestamp);
169     if (errCode == E_OK) {
170         SetMaxTimeStamp(timestamp);
171     }
172 
173     errCode = ReleaseHandleTransiently(handle, 2ull); // temporary release handle 2ms
174     if (errCode != E_OK) {
175         return errCode;
176     }
177 
178     return E_OK;
179 }
180 
181 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime)182 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime)
183 {
184     int errCode = ReleaseExecutor(handle);
185     if (errCode != E_OK) {
186         LOGE("release executor for reopen database! errCode = [%d]", errCode);
187         return errCode;
188     }
189 
190     std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
191     handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
192     if (errCode != E_OK) {
193         LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
194         return errCode;
195     }
196     return errCode;
197 }
198 
AddSubscribeToMainDBInMigrate()199 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
200 {
201     LOGD("Add subscribe to mainDB from cache. %d", engineState_);
202     std::lock_guard<std::mutex> lock(subscribeMutex_);
203     if (subscribeQuery_.empty()) {
204         return E_OK;
205     }
206     int errCode = E_OK;
207     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
208     if (errCode != E_OK || handle == nullptr) {
209         LOGE("Get available executor for add subscribe failed. %d", errCode);
210         return errCode;
211     }
212     errCode = handle->StartTransaction(TransactType::IMMEDIATE);
213     if (errCode != E_OK) {
214         goto END;
215     }
216     for (auto item : subscribeQuery_) {
217         errCode = handle->AddSubscribeTrigger(item.second, item.first);
218         if (errCode != E_OK) {
219             LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
220         }
221     }
222     subscribeQuery_.clear();
223     // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
224     (void)handle->Commit();
225 END:
226     ReleaseExecutor(handle);
227     return errCode;
228 }
229 
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)230 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
231 {
232     int errCode = E_OK;
233     if (handle == nullptr) {
234         handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
235         if (errCode != E_OK) {
236             LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
237             return errCode;
238         }
239     }
240 
241     LOGD("Begin migrate sync data, need migrate version[%llu]", GetCacheRecordVersion() - 1);
242     uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
243     NotifyMigrateSyncData syncData;
244     auto kvdbManager = KvDBManager::GetInstance();
245     if (kvdbManager != nullptr) {
246         auto identifier = GetIdentifier();
247         auto kvdb = kvdbManager->FindKvDB(identifier);
248         if (kvdb != nullptr) {
249             auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
250             syncData.isPermitForceWrite =
251                 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
252             RefObject::DecObjRef(kvdb);
253         } else {
254             LOGE("[SingleVerEngine] kvdb is null.");
255         }
256     }
257     // cache atomic version represents version of cacheDb input next time
258     while (curMigrateVer < GetCacheRecordVersion()) {
259         errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
260         if (errCode != E_OK) {
261             LOGE("Migrate version[%llu] failed! errCode = [%d]", curMigrateVer, errCode);
262             break;
263         }
264         if (!syncData.isRemote) {
265             isNeedTriggerSync = true;
266         }
267     }
268     if (syncData.committedData != nullptr) {
269         RefObject::DecObjRef(syncData.committedData);
270         syncData.committedData = nullptr;
271     }
272     // When finished Migrating sync data, will fix engine state
273     return errCode;
274 }
275 
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)276 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
277     EngineState stateBeforeMigrate)
278 {
279     LOGD("Begin attach main db and cache db by executor!");
280     // Judge the file corresponding to db by the engine status and attach it to another file
281     int errCode = E_OK;
282     std::string attachAbsPath;
283     if (stateBeforeMigrate == EngineState::MAINDB) {
284         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
285             DBConstant::SQLITE_DB_EXTENSION;
286         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
287     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
288         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
289         DBConstant::SQLITE_DB_EXTENSION;
290         errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
291     } else {
292         return -E_NOT_SUPPORT;
293     }
294     if (errCode != E_OK) {
295         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
296         return errCode;
297     }
298 
299     uint64_t maxVersion = 0;
300     errCode = handle->GetMaxVersionIncacheDb(maxVersion);
301     if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
302         maxVersion = CACHE_RECORD_DEFAULT_VERSION;
303     }
304 
305     (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
306     return errCode;
307 }
308 
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const309 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
310 {
311     LOGD("Begin attach main db and cache db by sqlite handle!");
312     // Judge the file corresponding to db by the engine status and attach it to another file
313     int errCode = E_OK;
314     std::string attachAbsPath;
315     if (stateBeforeMigrate == EngineState::MAINDB) {
316         attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
317             DBConstant::SQLITE_DB_EXTENSION;
318         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
319     } else if (stateBeforeMigrate == EngineState::CACHEDB) {
320         attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
321             DBConstant::SQLITE_DB_EXTENSION;
322         errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
323     } else {
324         return -E_NOT_SUPPORT;
325     }
326     if (errCode != E_OK) {
327         LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
328         return errCode;
329     }
330 
331     return errCode;
332 }
333 
ReInit()334 int SQLiteSingleVerStorageEngine::ReInit()
335 {
336     return Init();
337 }
338 
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)339 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
340 {
341     if (handle == nullptr) {
342         return E_OK;
343     }
344     StorageExecutor *databaseHandle = handle;
345     isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
346     Recycle(databaseHandle);
347     handle = nullptr;
348     if (isCorrupted_) {
349         LOGE("Database is corrupted!");
350         return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
351     }
352     return E_OK;
353 }
354 
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)355 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
356     EngineState stateBeforeMigrate)
357 {
358     LOGI("Begin to finish migrate and reinit db state!");
359     int errCode;
360     if (handle == nullptr) {
361         return -E_INVALID_ARGS;
362     }
363 
364     if (stateBeforeMigrate == EngineState::MAINDB) {
365         sqlite3 *dbHandle = nullptr;
366         errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
367         if (errCode != E_OK) {
368             LOGE("Get Db handle failed! errCode = [%d]", errCode);
369             return errCode;
370         }
371 
372         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
373         if (errCode != E_OK) {
374             LOGE("Execute the SQLite detach failed:%d", errCode);
375             return errCode;
376         }
377         // delete cachedb
378         errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
379         if (errCode != E_OK) {
380             LOGE("Remove files of cache database after detach:%d", errCode);
381         }
382 
383         SetEngineState(EngineState::MAINDB);
384         return errCode;
385     }
386 
387     errCode = ReleaseExecutor(handle);
388     if (errCode != E_OK) {
389         LOGE("Release executor for reopen database! errCode = [%d]", errCode);
390         return errCode;
391     }
392 
393     // close db for reinit this engine
394     Release();
395 
396     // delete cache db
397     errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
398     if (errCode != E_OK) {
399         LOGE("Remove files of cache database after release current db:%d", errCode);
400         return errCode;
401     }
402 
403     // reInit, it will reset engine state
404     errCode = ReInit();
405     if (errCode != E_OK) {
406         LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
407         return errCode;
408     }
409 
410     return E_OK;
411 }
412 
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)413 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
414     EngineState preMigrateState)
415 {
416     // after attach main and cache need change operate data sql, changing state forbid operate database
417     SetEngineState(EngineState::MIGRATING);
418 
419     int errCode = E_OK;
420     // check if has been attach and attach cache and main for migrate
421     if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
422         errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
423         if (errCode != E_OK) {
424             LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
425             // For lock state open db, can not attach main and cache
426             return errCode;
427         }
428     } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
429         // Has been attach, maybe ever crashed, need update version
430         executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
431         uint64_t maxVersion = 0;
432         errCode = handle->GetMaxVersionIncacheDb(maxVersion);
433         if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
434             maxVersion = CACHE_RECORD_DEFAULT_VERSION;
435         }
436         (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
437     } else {
438         return -E_UNEXPECTED_DATA;
439     }
440 
441     return errCode;
442 }
443 
ExecuteMigrate()444 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
445 {
446     EngineState preState = GetEngineState();
447     std::lock_guard<std::mutex> lock(migrateLock_);
448     if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
449         !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
450         DBConstant::SQLITE_DB_EXTENSION)) {
451         LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%d]", preState);
452         return E_OK;
453     }
454 
455     // Get write executor for migrate
456     int errCode = E_OK;
457     auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
458     if (errCode != E_OK) {
459         LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
460         return errCode;
461     }
462 
463     isMigrating_.store(true);
464     LOGD("Migrate start.");
465     bool isNeedTriggerSync = false;
466     errCode = InitExecuteMigrate(handle, preState);
467     if (errCode != E_OK) {
468         LOGE("Init migrate data fail, errCode = [%d]", errCode);
469         goto END;
470     }
471 
472     LOGD("[SqlSingleVerEngine] Current engineState [%d] executorState [%d], begin to executing singleVer db migrate!",
473         preState, executorState_);
474     // has been attached, Mark start of migration and it can migrate data
475     errCode = MigrateLocalData(handle);
476     if (errCode != E_OK) {
477         LOGE("Migrate local data fail, errCode = [%d]", errCode);
478         goto END;
479     }
480 
481     errCode = MigrateSyncData(handle, isNeedTriggerSync);
482     if (errCode != E_OK) {
483         LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
484         goto END;
485     }
486 
487     SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
488 
489     // detach database and delete cachedb
490     errCode = FinishMigrateData(handle, preState);
491     if (errCode != E_OK) {
492         LOGE("Finish migrating data fail, errCode = [%d]", errCode);
493         goto END;
494     }
495 
496 END: // after FinishMigrateData, it will reset engine state
497     // there is no need cover the errCode
498     EndMigrate(handle, preState, errCode, isNeedTriggerSync);
499     isMigrating_.store(false);
500     LOGD("Migrate stop.");
501     return errCode;
502 }
503 
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)504 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
505     int errCode, bool isNeedTriggerSync)
506 {
507     LOGD("Finish migrating data! errCode = [%d]", errCode);
508     if (errCode != E_OK) {
509         SetEngineState(stateBeforeMigrate);
510     }
511     if (handle != nullptr) {
512         handle->ClearMigrateData();
513     }
514     errCode = ReleaseExecutor(handle);
515     if (errCode != E_OK) {
516         LOGE("release executor after migrating! errCode = [%d]", errCode);
517     }
518 
519     errCode = AddSubscribeToMainDBInMigrate();
520     if (errCode != E_OK) {
521         LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
522     }
523     // Notify max timestamp offset for SyncEngine.
524     // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
525     RuntimeContext::GetInstance()->NotifyTimeStampChanged(0);
526     if (isNeedTriggerSync) {
527         commitNotifyFunc_(SQLITE_GENERAL_FINISH_MIGRATE_EVENT, nullptr);
528     }
529     return;
530 }
531 
IsEngineCorrupted() const532 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
533 {
534     return isCorrupted_;
535 }
536 
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)537 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
538 {
539     auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
540     if (executor == nullptr) {
541         return executor;
542     }
543     executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
544     return executor;
545 }
546 
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)547 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
548 {
549     // Only could get the main database handle in the uninitialized and the main status.
550     if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
551         LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
552         return -E_EKEYREVOKED;
553     }
554 
555     if (!option_.isMemDb) {
556         option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
557             DBConstant::SQLITE_DB_EXTENSION;
558     }
559 
560     OpenDbProperties optionTemp = option_;
561     if (!isWrite) {
562         optionTemp.createIfNecessary = false;
563     }
564 
565     int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
566     if (errCode != E_OK) {
567         if (errno == EKEYREVOKED) {
568             LOGI("Failed to open the main database for key revoked[%d]", errCode);
569             errCode = -E_EKEYREVOKED;
570         }
571         return errCode;
572     }
573 
574     executorState_ = ExecutorState::MAINDB;
575     // Set the engine state to main status for that the main database is valid.
576     SetEngineState(EngineState::MAINDB);
577 
578     if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
579         DBConstant::SQLITE_DB_EXTENSION)) {
580         // In status cacheDb crash
581         errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
582         if (errCode != E_OK) {
583             LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
584             return E_OK; // not care err to return, only use for print log
585         }
586         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
587         // cache and main existed together, can not read data, must execute migrate first
588         SetEngineState(EngineState::ATTACHING);
589     }
590 
591     return errCode;
592 }
593 
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)594 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
595 {
596     int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
597     LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]",  isWrite,
598         secOpt.securityLabel, secOpt.securityFlag, DBCommon::TransferStringToHex(identifier_).c_str(), errCode);
599     if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
600         return errCode;
601     }
602 
603     std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
604         DBConstant::SQLITE_DB_EXTENSION;
605     if (!isWrite || GetEngineState() != EngineState::INVALID ||
606         OS::CheckPathExistence(cacheDbPath)) {
607         LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
608             isWrite, GetEngineState());
609         return -E_EKEYREVOKED;
610     }
611 
612     errCode = GetCacheDbHandle(dbHandle);
613     if (errCode != E_OK) {
614         LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
615         return errCode;
616     }
617     SetEngineState(CACHEDB);
618     executorState_ = ExecutorState::CACHEDB;
619 
620     ResetCacheRecordVersion();
621     // Get handle means maindb file ekeyevoked, not need attach to
622     return errCode;
623 }
624 
625 namespace CacheDbSqls {
626 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
627     "CREATE TABLE IF NOT EXISTS local_data(" \
628         "key     BLOB   NOT NULL," \
629         "value  BLOB," \
630         "timestamp  INT," \
631         "hash_key   BLOB   PRIMARY KEY   NOT NULL," \
632         "flag  INT  NOT NULL);";
633 
634 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
635     "CREATE TABLE IF NOT EXISTS sync_data(" \
636         "key         BLOB NOT NULL," \
637         "value       BLOB," \
638         "timestamp   INT  NOT NULL," \
639         "flag        INT  NOT NULL," \
640         "device      BLOB," \
641         "ori_device  BLOB," \
642         "hash_key    BLOB  NOT NULL," \
643         "w_timestamp INT," \
644         "version     INT  NOT NULL," \
645         "PRIMARY Key(version, hash_key));";
646 }
647 
648 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
649 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)650 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
651 {
652     option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
653         DBConstant::SQLITE_DB_EXTENSION;
654     // creatTable
655     option_.sqls = {
656         CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
657         CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
658     };
659 
660     if (!option_.createIfNecessary) {
661         std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
662             DBConstant::SQLITE_DB_EXTENSION;
663         if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
664             return -E_INVALID_DB;
665         }
666     }
667 
668     OpenDbProperties option = option_; // copy for no change it
669     option.createIfNecessary = true;
670     int errCode = SQLiteUtils::OpenDatabase(option, db);
671     if (errCode != E_OK) {
672         LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
673         return errCode;
674     }
675     return errCode;
676 }
677 
CheckDatabaseSecOpt(const SecurityOption & secOption) const678 int SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
679 {
680     if (!(secOption == option_.securityOpt) &&
681         secOption.securityLabel != SecurityLabel::NOT_SET &&
682         option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
683         LOGE("SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]", secOption.securityLabel,
684             secOption.securityFlag, option_.securityOpt.securityLabel, option_.securityOpt.securityFlag);
685         return -E_SECURITY_OPTION_CHECK_ERROR;
686     }
687     return E_OK;
688 }
689 
CreateNewDirsAndSetSecOpt() const690 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
691 {
692     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
693     for (const auto &item : dbDir) {
694         if (OS::CheckPathExistence(option_.subdir + "/" + item)) {
695             continue;
696         }
697 
698         // Dir and old db file not existed, it means that the database is newly created
699         // need create flag of database not incomplete
700         if (!OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
701             !OS::CheckPathExistence(option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE +
702             DBConstant::SQLITE_DB_EXTENSION) &&
703             OS::CreateFileByFileName(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
704             LOGE("Fail to create the token of database incompleted! errCode = [E_SYSTEM_API_FAIL]");
705             return -E_SYSTEM_API_FAIL;
706         }
707 
708         if (DBCommon::CreateDirectory(option_.subdir + "/" + item) != E_OK) {
709             LOGE("Create sub-directory for single ver failed, errno:%d", errno);
710             return -E_SYSTEM_API_FAIL;
711         }
712 
713         if (option_.securityOpt.securityLabel == NOT_SET) {
714             continue;
715         }
716 
717         SecurityOption option = option_.securityOpt;
718         if (item == DBConstant::METADB_DIR) {
719             option.securityLabel = ((option_.securityOpt.securityLabel >= SecurityLabel::S2) ?
720                 SecurityLabel::S2 : option_.securityOpt.securityLabel);
721             option.securityFlag = SecurityFlag::ECE;
722         }
723 
724         int errCode = RuntimeContext::GetInstance()->SetSecurityOption(option_.subdir + "/" + item, option);
725         if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
726             LOGE("Set the security option of sub-directory failed[%d]", errCode);
727             return errCode;
728         }
729     }
730     return E_OK;
731 }
732 
GetExistedSecOption(SecurityOption & secOption) const733 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
734 {
735     // Check the existence of the database, include the origin database and the database in the 'main' directory.
736     auto mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
737     auto mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
738     auto origDbFilePath = option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
739     if (!OS::CheckPathExistence(origDbFilePath) && !OS::CheckPathExistence(mainDbFilePath)) {
740         secOption = option_.securityOpt;
741         return E_OK;
742     }
743 
744     // the main database file has high priority of the security option.
745     int errCode;
746     if (OS::CheckPathExistence(mainDbFilePath)) {
747         errCode = GetPathSecurityOption(mainDbFilePath, secOption);
748     } else {
749         errCode = GetPathSecurityOption(origDbFilePath, secOption);
750     }
751     if (errCode != E_OK) {
752         secOption = SecurityOption();
753         if (errCode == -E_NOT_SUPPORT) {
754             return E_OK;
755         }
756         LOGE("Get the security option of the existed database failed.");
757     }
758     return errCode;
759 }
760 
ClearCorruptedFlag()761 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
762 {
763     isCorrupted_ = false;
764 }
765 
PreCreateExecutor(bool isWrite)766 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
767 {
768     // Assume that create the write executor firstly and the write one we will not be released.
769     // If the write one would be released in the future, should take care the pass through.
770     if (!isWrite) {
771         return E_OK;
772     }
773 
774     if (option_.isMemDb) {
775         return E_OK;
776     }
777 
778     // Get the existed database secure option.
779     SecurityOption existedSecOpt;
780     int errCode = GetExistedSecOption(existedSecOpt);
781     if (errCode != E_OK) {
782         return errCode;
783     }
784 
785     errCode = CheckDatabaseSecOpt(existedSecOpt);
786     if (errCode != E_OK) {
787         return errCode;
788     }
789 
790     // Judge whether need update the security option of the engine.
791     // Should update the security in the import or rekey scene(inner).
792     if (!isNeedUpdateSecOpt_) {
793         option_.securityOpt = existedSecOpt;
794     }
795 
796     errCode = CreateNewDirsAndSetSecOpt();
797     if (errCode != E_OK) {
798         return errCode;
799     }
800 
801     if (!isUpdated_) {
802         errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
803         if (errCode != E_OK) {
804             LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
805             return errCode;
806         }
807     }
808 
809     return E_OK;
810 }
811 
EndCreateExecutor(bool isWrite)812 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
813 {
814     if (option_.isMemDb || !isWrite) {
815         return E_OK;
816     }
817 
818     int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
819         isNeedUpdateSecOpt_);
820     if (errCode != E_OK) {
821         if (errCode == -E_NOT_SUPPORT) {
822             option_.securityOpt = SecurityOption();
823             errCode = E_OK;
824         }
825         LOGE("SetSecOption failed:%d", errCode);
826         return errCode;
827     }
828 
829     // after setting secOption, the database file operation ends
830     // database create completed, delete the token
831     if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
832         OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
833         LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
834         return -E_SYSTEM_API_FAIL;
835     }
836     return errCode;
837 }
838 
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)839 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
840 {
841     // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
842     if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
843         int errCode = AttachMetaDatabase(dbHandle, option_);
844         if (errCode != E_OK) {
845             (void)sqlite3_close_v2(dbHandle);
846             dbHandle = nullptr;
847             return errCode;
848         }
849         isAttachMeta = true;
850     }
851     return E_OK;
852 }
853 
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)854 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
855 {
856     int errCode = PreCreateExecutor(isWrite);
857     if (errCode != E_OK) {
858         return errCode;
859     }
860 
861     sqlite3 *dbHandle = nullptr;
862     errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
863     if (errCode != E_OK) {
864         return errCode;
865     }
866 
867     bool isAttachMeta = false;
868     errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
869     if (errCode != E_OK) {
870         return errCode;
871     }
872 
873     RegisterFunctionIfNeed(dbHandle);
874     errCode = Upgrade(dbHandle);
875     if (errCode != E_OK) {
876         (void)sqlite3_close_v2(dbHandle);
877         dbHandle = nullptr;
878         return errCode;
879     }
880 
881     errCode = EndCreateExecutor(isWrite);
882     if (errCode != E_OK) {
883         LOGE("After create executor, set security option incomplete!");
884         (void)sqlite3_close_v2(dbHandle);
885         dbHandle = nullptr;
886         return errCode;
887     }
888 
889     handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
890     if (handle == nullptr) {
891         LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
892         (void)sqlite3_close_v2(dbHandle);
893         dbHandle = nullptr;
894         return -E_OUT_OF_MEMORY;
895     }
896     if (isAttachMeta) {
897         SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
898         singleVerHandle->SetAttachMetaMode(isAttachMeta);
899     }
900     return E_OK;
901 }
902 
Upgrade(sqlite3 * db)903 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
904 {
905     if (isUpdated_ || GetEngineState() == CACHEDB) {
906         LOGI("Storage engine is in cache status or has been upgraded[%d]!", isUpdated_);
907         return E_OK;
908     }
909 
910     std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
911     LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
912     if (option_.schema.empty()) {
913         upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
914     } else {
915         SchemaObject schema;
916         int errCode = schema.ParseFromSchemaString(option_.schema);
917         if (errCode != E_OK) {
918             LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
919             return errCode;
920         }
921         upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
922             option_.securityOpt, option_.isMemDb);
923     }
924 
925     std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
926     std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
927     SecurityOption secOpt = option_.securityOpt;
928     int errCode = E_OK;
929     if (isNeedUpdateSecOpt_) {
930         errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
931         if (errCode != E_OK) {
932             LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
933             if (errCode != -E_NOT_SUPPORT) {
934                 return errCode;
935             }
936             secOpt = SecurityOption();
937         }
938     }
939 
940     upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
941     upgrader->SetSubdir(option_.subdir);
942     errCode = upgrader->Upgrade();
943     if (errCode != E_OK) {
944         LOGE("Single ver database upgrade failed:%d", errCode);
945         return errCode;
946     }
947 
948     LOGD("Finish upgrade single ver database!");
949     isUpdated_ = true; // Identification to avoid repeated upgrades
950     return errCode;
951 }
952 
953 // Attention: This function should be called before "Upgrade".
954 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const955 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
956 {
957     // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
958     // not a newly created database, the meta-Table should exist and can be accessed.
959     std::string schemaStr = option_.schema;
960     if (schemaStr.empty()) {
961         // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
962         int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
963         if (errCode != E_OK) {
964             LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
965         }
966     }
967     if (!schemaStr.empty()) {
968         // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
969         int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
970         if (errCode != E_OK) { // Not very likely
971             // Just warning, if no index had been or need to be created, then put or kv-get can still use.
972             LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
973         }
974     }
975 
976     // This function is used to update meta_data in triggers when it's attached to mainDB
977     int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
978     if (errCode != E_OK) {
979         LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
980     }
981 }
982 
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const983 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
984 {
985     int errCode;
986     LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
987     std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
988         DBConstant::SINGLE_VER_META_STORE + DBConstant::SQLITE_DB_EXTENSION;
989     // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
990     if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
991         errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
992         if (errCode != E_OK) {
993             return errCode;
994         }
995     }
996     CipherPassword passwd;
997     errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
998     if (errCode != E_OK) {
999         LOGE("AttachNewDatabase fail, errCode = %d", errCode);
1000     }
1001     return errCode;
1002 }
1003 
ResetCacheRecordVersion()1004 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1005 {
1006     (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1007 }
1008 
IncreaseCacheRecordVersion()1009 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1010 {
1011     (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1012 }
1013 
GetAndIncreaseCacheRecordVersion()1014 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1015 {
1016     return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1017 }
1018 
GetCacheRecordVersion() const1019 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1020 {
1021     return cacheRecordVersion_.load(std::memory_order_seq_cst);
1022 }
1023 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1024 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1025     int eventType) const
1026 {
1027     std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1028     if (commitNotifyFunc_ == nullptr) {
1029         LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1030         RefObject::DecObjRef(committedData);
1031         committedData = nullptr;
1032         return;
1033     }
1034     commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1035     committedData = nullptr;
1036     return;
1037 }
1038 
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1039 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1040 {
1041     if (committedData == nullptr) {
1042         LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1043         return;
1044     }
1045     auto identifier = GetIdentifier();
1046     auto kvdb = KvDBManager::GetInstance()->FindKvDB(identifier);
1047     if (kvdb == nullptr) {
1048         LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1049         return;
1050     }
1051     unsigned int conflictFlag = 0;
1052     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1053         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1054     }
1055     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1056         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1057     }
1058     if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1059         conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_NATIVE_ALL);
1060     }
1061     RefObject::DecObjRef(kvdb);
1062     LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1063     committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1064 }
1065 
SetMaxTimeStamp(TimeStamp maxTimeStamp) const1066 void SQLiteSingleVerStorageEngine::SetMaxTimeStamp(TimeStamp maxTimeStamp) const
1067 {
1068     auto kvdbManager = KvDBManager::GetInstance();
1069     if (kvdbManager == nullptr) {
1070         return;
1071     }
1072     auto identifier = GetIdentifier();
1073     auto kvdb = kvdbManager->FindKvDB(identifier);
1074     if (kvdb == nullptr) {
1075         LOGE("[SQLiteSingleVerStorageEngine::SetMaxTimeStamp] kvdb is null.");
1076         return;
1077     }
1078 
1079     auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
1080     kvStore->SetMaxTimeStamp(maxTimeStamp);
1081     RefObject::DecObjRef(kvdb);
1082     return;
1083 }
1084 
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1085 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1086 {
1087     const auto &isRemote = syncData.isRemote;
1088     const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1089     auto &committedData = syncData.committedData;
1090     auto &entries = syncData.entries;
1091 
1092     // Put data. Including insert, update and delete.
1093     if (!isRemoveDeviceData) {
1094         if (committedData != nullptr) {
1095             int eventType = isRemote ? SQLITE_GENERAL_NS_SYNC_EVENT : SQLITE_GENERAL_NS_PUT_EVENT;
1096             CommitAndReleaseNotifyData(committedData, eventType);
1097         }
1098         return;
1099     }
1100 
1101     // Remove device data.
1102     if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1103         return;
1104     }
1105     size_t totalSize = 0;
1106     for (auto iter = entries.begin(); iter != entries.end();) {
1107         auto &entry = *iter;
1108         if (committedData == nullptr) {
1109             committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1110             if (committedData == nullptr) {
1111                 LOGE("Alloc committed notify data failed.");
1112                 return;
1113             }
1114         }
1115         if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > DBConstant::MAX_VALUE_SIZE) {
1116             iter++;
1117             continue;
1118         }
1119         if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1120             CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1121             totalSize = 0;
1122             continue;
1123         }
1124         totalSize += (entry.key.size() + entry.value.size());
1125         committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1126         iter++;
1127     }
1128     if (committedData != nullptr) {
1129         CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1130     }
1131     return;
1132 }
1133 
1134 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1135 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1136 {
1137     std::lock_guard<std::mutex> lock(subscribeMutex_);
1138     subscribeQuery_[subscribeId] = query;
1139 }
1140 }
1141