1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_engine.h"
17
18 #include <memory>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "db_constant.h"
23 #include "sqlite_single_ver_database_upgrader.h"
24 #include "sqlite_single_ver_natural_store.h"
25 #include "sqlite_single_ver_schema_database_upgrader.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "db_common.h"
29 #include "kvdb_manager.h"
30 #include "param_check_utils.h"
31
32 namespace DistributedDB {
33 namespace {
34 const uint64_t CACHE_RECORD_DEFAULT_VERSION = 1;
GetPathSecurityOption(const std::string & filePath,SecurityOption & secOpt)35 int GetPathSecurityOption(const std::string &filePath, SecurityOption &secOpt)
36 {
37 return RuntimeContext::GetInstance()->GetSecurityOption(filePath, secOpt);
38 }
39
40 enum class DbType {
41 MAIN,
42 META,
43 CACHE
44 };
45
GetDbDir(const std::string & subDir,DbType type)46 std::string GetDbDir(const std::string &subDir, DbType type)
47 {
48 static const std::map<DbType, std::string> dbDirDic {
49 { DbType::MAIN, DBConstant::MAINDB_DIR },
50 { DbType::META, DBConstant::METADB_DIR },
51 { DbType::CACHE, DBConstant::CACHEDB_DIR },
52 }; // for ensure static compilation order
53
54 if (dbDirDic.find(type) == dbDirDic.end()) {
55 return std::string();
56 }
57 return subDir + "/" + dbDirDic.at(type);
58 }
59 } // namespace
60
SQLiteSingleVerStorageEngine()61 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
62 : cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
63 executorState_(ExecutorState::INVALID),
64 isCorrupted_(false),
65 isNeedUpdateSecOpt_(false)
66 {}
67
~SQLiteSingleVerStorageEngine()68 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
69 {}
70
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const71 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
72 {
73 return handle->MigrateLocalData();
74 }
75
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)76 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
77 const std::vector<DataItem> &dataItems)
78 {
79 int errCode = E_OK;
80 for (const auto &dataItem : dataItems) {
81 if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
82 (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
83 auto kvdbManager = KvDBManager::GetInstance();
84 if (kvdbManager == nullptr) {
85 return -E_INVALID_DB;
86 }
87
88 // sync module will use handle to fix water mark, if fix fail then migrate fail, not need hold write handle
89 errCode = ReleaseExecutor(handle);
90 if (errCode != E_OK) {
91 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
92 return errCode;
93 }
94
95 auto identifier = GetIdentifier();
96 auto kvdb = kvdbManager->FindKvDB(identifier);
97 if (kvdb == nullptr) {
98 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
99 return -E_INVALID_DB;
100 }
101
102 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
103 errCode = kvStore->EraseDeviceWaterMark(dataItem.dev, false);
104 RefObject::DecObjRef(kvdb);
105 if (errCode != E_OK) {
106 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
107 return errCode;
108 }
109
110 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
111 errCode));
112 if (errCode != E_OK) {
113 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
114 return errCode;
115 }
116 }
117 }
118 return errCode;
119 }
120
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)121 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
122 NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
123 {
124 if (syncData.committedData == nullptr) {
125 syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
126 if (syncData.committedData == nullptr) {
127 LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
128 return -E_OUT_OF_MEMORY;
129 }
130 }
131 InitConflictNotifiedFlag(syncData.committedData);
132
133 std::vector<DataItem> dataItems;
134 uint64_t minVerIncurCacheDb = 0;
135 int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
136 if (errCode != E_OK) {
137 LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
138 return errCode;
139 }
140
141 if (minVerIncurCacheDb == 0) { // min version in cache db is 1
142 ++curMigrateVer;
143 return E_OK;
144 }
145
146 if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
147 curMigrateVer = minVerIncurCacheDb;
148 }
149
150 // Call the syncer module to erase the water mark.
151 errCode = EraseDeviceWaterMark(handle, dataItems);
152 if (errCode != E_OK) {
153 LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
154 return errCode;
155 }
156
157 // next version need process
158 LOGD("MigrateVer[%llu], minVer[%llu] maxVer[%llu]", curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
159 errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
160 if (errCode != E_OK) {
161 LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
162 return errCode;
163 }
164
165 CommitNotifyForMigrateCache(syncData);
166
167 TimeStamp timestamp = 0;
168 errCode = handle->GetMaxTimeStampDuringMigrating(timestamp);
169 if (errCode == E_OK) {
170 SetMaxTimeStamp(timestamp);
171 }
172
173 errCode = ReleaseHandleTransiently(handle, 2ull); // temporary release handle 2ms
174 if (errCode != E_OK) {
175 return errCode;
176 }
177
178 return E_OK;
179 }
180
181 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime)182 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime)
183 {
184 int errCode = ReleaseExecutor(handle);
185 if (errCode != E_OK) {
186 LOGE("release executor for reopen database! errCode = [%d]", errCode);
187 return errCode;
188 }
189
190 std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
191 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
192 if (errCode != E_OK) {
193 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
194 return errCode;
195 }
196 return errCode;
197 }
198
AddSubscribeToMainDBInMigrate()199 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
200 {
201 LOGD("Add subscribe to mainDB from cache. %d", engineState_);
202 std::lock_guard<std::mutex> lock(subscribeMutex_);
203 if (subscribeQuery_.empty()) {
204 return E_OK;
205 }
206 int errCode = E_OK;
207 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
208 if (errCode != E_OK || handle == nullptr) {
209 LOGE("Get available executor for add subscribe failed. %d", errCode);
210 return errCode;
211 }
212 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
213 if (errCode != E_OK) {
214 goto END;
215 }
216 for (auto item : subscribeQuery_) {
217 errCode = handle->AddSubscribeTrigger(item.second, item.first);
218 if (errCode != E_OK) {
219 LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
220 }
221 }
222 subscribeQuery_.clear();
223 // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
224 (void)handle->Commit();
225 END:
226 ReleaseExecutor(handle);
227 return errCode;
228 }
229
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)230 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
231 {
232 int errCode = E_OK;
233 if (handle == nullptr) {
234 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
235 if (errCode != E_OK) {
236 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
237 return errCode;
238 }
239 }
240
241 LOGD("Begin migrate sync data, need migrate version[%llu]", GetCacheRecordVersion() - 1);
242 uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
243 NotifyMigrateSyncData syncData;
244 auto kvdbManager = KvDBManager::GetInstance();
245 if (kvdbManager != nullptr) {
246 auto identifier = GetIdentifier();
247 auto kvdb = kvdbManager->FindKvDB(identifier);
248 if (kvdb != nullptr) {
249 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
250 syncData.isPermitForceWrite =
251 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
252 RefObject::DecObjRef(kvdb);
253 } else {
254 LOGE("[SingleVerEngine] kvdb is null.");
255 }
256 }
257 // cache atomic version represents version of cacheDb input next time
258 while (curMigrateVer < GetCacheRecordVersion()) {
259 errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
260 if (errCode != E_OK) {
261 LOGE("Migrate version[%llu] failed! errCode = [%d]", curMigrateVer, errCode);
262 break;
263 }
264 if (!syncData.isRemote) {
265 isNeedTriggerSync = true;
266 }
267 }
268 if (syncData.committedData != nullptr) {
269 RefObject::DecObjRef(syncData.committedData);
270 syncData.committedData = nullptr;
271 }
272 // When finished Migrating sync data, will fix engine state
273 return errCode;
274 }
275
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)276 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
277 EngineState stateBeforeMigrate)
278 {
279 LOGD("Begin attach main db and cache db by executor!");
280 // Judge the file corresponding to db by the engine status and attach it to another file
281 int errCode = E_OK;
282 std::string attachAbsPath;
283 if (stateBeforeMigrate == EngineState::MAINDB) {
284 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
285 DBConstant::SQLITE_DB_EXTENSION;
286 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
287 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
288 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
289 DBConstant::SQLITE_DB_EXTENSION;
290 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
291 } else {
292 return -E_NOT_SUPPORT;
293 }
294 if (errCode != E_OK) {
295 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
296 return errCode;
297 }
298
299 uint64_t maxVersion = 0;
300 errCode = handle->GetMaxVersionIncacheDb(maxVersion);
301 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
302 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
303 }
304
305 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
306 return errCode;
307 }
308
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const309 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
310 {
311 LOGD("Begin attach main db and cache db by sqlite handle!");
312 // Judge the file corresponding to db by the engine status and attach it to another file
313 int errCode = E_OK;
314 std::string attachAbsPath;
315 if (stateBeforeMigrate == EngineState::MAINDB) {
316 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
317 DBConstant::SQLITE_DB_EXTENSION;
318 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
319 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
320 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
321 DBConstant::SQLITE_DB_EXTENSION;
322 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
323 } else {
324 return -E_NOT_SUPPORT;
325 }
326 if (errCode != E_OK) {
327 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
328 return errCode;
329 }
330
331 return errCode;
332 }
333
ReInit()334 int SQLiteSingleVerStorageEngine::ReInit()
335 {
336 return Init();
337 }
338
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)339 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
340 {
341 if (handle == nullptr) {
342 return E_OK;
343 }
344 StorageExecutor *databaseHandle = handle;
345 isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
346 Recycle(databaseHandle);
347 handle = nullptr;
348 if (isCorrupted_) {
349 LOGE("Database is corrupted!");
350 return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
351 }
352 return E_OK;
353 }
354
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)355 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
356 EngineState stateBeforeMigrate)
357 {
358 LOGI("Begin to finish migrate and reinit db state!");
359 int errCode;
360 if (handle == nullptr) {
361 return -E_INVALID_ARGS;
362 }
363
364 if (stateBeforeMigrate == EngineState::MAINDB) {
365 sqlite3 *dbHandle = nullptr;
366 errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
367 if (errCode != E_OK) {
368 LOGE("Get Db handle failed! errCode = [%d]", errCode);
369 return errCode;
370 }
371
372 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
373 if (errCode != E_OK) {
374 LOGE("Execute the SQLite detach failed:%d", errCode);
375 return errCode;
376 }
377 // delete cachedb
378 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
379 if (errCode != E_OK) {
380 LOGE("Remove files of cache database after detach:%d", errCode);
381 }
382
383 SetEngineState(EngineState::MAINDB);
384 return errCode;
385 }
386
387 errCode = ReleaseExecutor(handle);
388 if (errCode != E_OK) {
389 LOGE("Release executor for reopen database! errCode = [%d]", errCode);
390 return errCode;
391 }
392
393 // close db for reinit this engine
394 Release();
395
396 // delete cache db
397 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
398 if (errCode != E_OK) {
399 LOGE("Remove files of cache database after release current db:%d", errCode);
400 return errCode;
401 }
402
403 // reInit, it will reset engine state
404 errCode = ReInit();
405 if (errCode != E_OK) {
406 LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
407 return errCode;
408 }
409
410 return E_OK;
411 }
412
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)413 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
414 EngineState preMigrateState)
415 {
416 // after attach main and cache need change operate data sql, changing state forbid operate database
417 SetEngineState(EngineState::MIGRATING);
418
419 int errCode = E_OK;
420 // check if has been attach and attach cache and main for migrate
421 if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
422 errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
423 if (errCode != E_OK) {
424 LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
425 // For lock state open db, can not attach main and cache
426 return errCode;
427 }
428 } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
429 // Has been attach, maybe ever crashed, need update version
430 executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
431 uint64_t maxVersion = 0;
432 errCode = handle->GetMaxVersionIncacheDb(maxVersion);
433 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
434 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
435 }
436 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
437 } else {
438 return -E_UNEXPECTED_DATA;
439 }
440
441 return errCode;
442 }
443
ExecuteMigrate()444 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
445 {
446 EngineState preState = GetEngineState();
447 std::lock_guard<std::mutex> lock(migrateLock_);
448 if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
449 !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
450 DBConstant::SQLITE_DB_EXTENSION)) {
451 LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%d]", preState);
452 return E_OK;
453 }
454
455 // Get write executor for migrate
456 int errCode = E_OK;
457 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
458 if (errCode != E_OK) {
459 LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
460 return errCode;
461 }
462
463 isMigrating_.store(true);
464 LOGD("Migrate start.");
465 bool isNeedTriggerSync = false;
466 errCode = InitExecuteMigrate(handle, preState);
467 if (errCode != E_OK) {
468 LOGE("Init migrate data fail, errCode = [%d]", errCode);
469 goto END;
470 }
471
472 LOGD("[SqlSingleVerEngine] Current engineState [%d] executorState [%d], begin to executing singleVer db migrate!",
473 preState, executorState_);
474 // has been attached, Mark start of migration and it can migrate data
475 errCode = MigrateLocalData(handle);
476 if (errCode != E_OK) {
477 LOGE("Migrate local data fail, errCode = [%d]", errCode);
478 goto END;
479 }
480
481 errCode = MigrateSyncData(handle, isNeedTriggerSync);
482 if (errCode != E_OK) {
483 LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
484 goto END;
485 }
486
487 SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
488
489 // detach database and delete cachedb
490 errCode = FinishMigrateData(handle, preState);
491 if (errCode != E_OK) {
492 LOGE("Finish migrating data fail, errCode = [%d]", errCode);
493 goto END;
494 }
495
496 END: // after FinishMigrateData, it will reset engine state
497 // there is no need cover the errCode
498 EndMigrate(handle, preState, errCode, isNeedTriggerSync);
499 isMigrating_.store(false);
500 LOGD("Migrate stop.");
501 return errCode;
502 }
503
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)504 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
505 int errCode, bool isNeedTriggerSync)
506 {
507 LOGD("Finish migrating data! errCode = [%d]", errCode);
508 if (errCode != E_OK) {
509 SetEngineState(stateBeforeMigrate);
510 }
511 if (handle != nullptr) {
512 handle->ClearMigrateData();
513 }
514 errCode = ReleaseExecutor(handle);
515 if (errCode != E_OK) {
516 LOGE("release executor after migrating! errCode = [%d]", errCode);
517 }
518
519 errCode = AddSubscribeToMainDBInMigrate();
520 if (errCode != E_OK) {
521 LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
522 }
523 // Notify max timestamp offset for SyncEngine.
524 // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
525 RuntimeContext::GetInstance()->NotifyTimeStampChanged(0);
526 if (isNeedTriggerSync) {
527 commitNotifyFunc_(SQLITE_GENERAL_FINISH_MIGRATE_EVENT, nullptr);
528 }
529 return;
530 }
531
IsEngineCorrupted() const532 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
533 {
534 return isCorrupted_;
535 }
536
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)537 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
538 {
539 auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
540 if (executor == nullptr) {
541 return executor;
542 }
543 executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
544 return executor;
545 }
546
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)547 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
548 {
549 // Only could get the main database handle in the uninitialized and the main status.
550 if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
551 LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
552 return -E_EKEYREVOKED;
553 }
554
555 if (!option_.isMemDb) {
556 option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
557 DBConstant::SQLITE_DB_EXTENSION;
558 }
559
560 OpenDbProperties optionTemp = option_;
561 if (!isWrite) {
562 optionTemp.createIfNecessary = false;
563 }
564
565 int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
566 if (errCode != E_OK) {
567 if (errno == EKEYREVOKED) {
568 LOGI("Failed to open the main database for key revoked[%d]", errCode);
569 errCode = -E_EKEYREVOKED;
570 }
571 return errCode;
572 }
573
574 executorState_ = ExecutorState::MAINDB;
575 // Set the engine state to main status for that the main database is valid.
576 SetEngineState(EngineState::MAINDB);
577
578 if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
579 DBConstant::SQLITE_DB_EXTENSION)) {
580 // In status cacheDb crash
581 errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
582 if (errCode != E_OK) {
583 LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
584 return E_OK; // not care err to return, only use for print log
585 }
586 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
587 // cache and main existed together, can not read data, must execute migrate first
588 SetEngineState(EngineState::ATTACHING);
589 }
590
591 return errCode;
592 }
593
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)594 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
595 {
596 int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
597 LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]", isWrite,
598 secOpt.securityLabel, secOpt.securityFlag, DBCommon::TransferStringToHex(identifier_).c_str(), errCode);
599 if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
600 return errCode;
601 }
602
603 std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
604 DBConstant::SQLITE_DB_EXTENSION;
605 if (!isWrite || GetEngineState() != EngineState::INVALID ||
606 OS::CheckPathExistence(cacheDbPath)) {
607 LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
608 isWrite, GetEngineState());
609 return -E_EKEYREVOKED;
610 }
611
612 errCode = GetCacheDbHandle(dbHandle);
613 if (errCode != E_OK) {
614 LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
615 return errCode;
616 }
617 SetEngineState(CACHEDB);
618 executorState_ = ExecutorState::CACHEDB;
619
620 ResetCacheRecordVersion();
621 // Get handle means maindb file ekeyevoked, not need attach to
622 return errCode;
623 }
624
625 namespace CacheDbSqls {
626 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
627 "CREATE TABLE IF NOT EXISTS local_data(" \
628 "key BLOB NOT NULL," \
629 "value BLOB," \
630 "timestamp INT," \
631 "hash_key BLOB PRIMARY KEY NOT NULL," \
632 "flag INT NOT NULL);";
633
634 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
635 "CREATE TABLE IF NOT EXISTS sync_data(" \
636 "key BLOB NOT NULL," \
637 "value BLOB," \
638 "timestamp INT NOT NULL," \
639 "flag INT NOT NULL," \
640 "device BLOB," \
641 "ori_device BLOB," \
642 "hash_key BLOB NOT NULL," \
643 "w_timestamp INT," \
644 "version INT NOT NULL," \
645 "PRIMARY Key(version, hash_key));";
646 }
647
648 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
649 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)650 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
651 {
652 option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
653 DBConstant::SQLITE_DB_EXTENSION;
654 // creatTable
655 option_.sqls = {
656 CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
657 CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
658 };
659
660 if (!option_.createIfNecessary) {
661 std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
662 DBConstant::SQLITE_DB_EXTENSION;
663 if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
664 return -E_INVALID_DB;
665 }
666 }
667
668 OpenDbProperties option = option_; // copy for no change it
669 option.createIfNecessary = true;
670 int errCode = SQLiteUtils::OpenDatabase(option, db);
671 if (errCode != E_OK) {
672 LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
673 return errCode;
674 }
675 return errCode;
676 }
677
CheckDatabaseSecOpt(const SecurityOption & secOption) const678 int SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
679 {
680 if (!(secOption == option_.securityOpt) &&
681 secOption.securityLabel != SecurityLabel::NOT_SET &&
682 option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
683 LOGE("SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]", secOption.securityLabel,
684 secOption.securityFlag, option_.securityOpt.securityLabel, option_.securityOpt.securityFlag);
685 return -E_SECURITY_OPTION_CHECK_ERROR;
686 }
687 return E_OK;
688 }
689
CreateNewDirsAndSetSecOpt() const690 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
691 {
692 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
693 for (const auto &item : dbDir) {
694 if (OS::CheckPathExistence(option_.subdir + "/" + item)) {
695 continue;
696 }
697
698 // Dir and old db file not existed, it means that the database is newly created
699 // need create flag of database not incomplete
700 if (!OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
701 !OS::CheckPathExistence(option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE +
702 DBConstant::SQLITE_DB_EXTENSION) &&
703 OS::CreateFileByFileName(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
704 LOGE("Fail to create the token of database incompleted! errCode = [E_SYSTEM_API_FAIL]");
705 return -E_SYSTEM_API_FAIL;
706 }
707
708 if (DBCommon::CreateDirectory(option_.subdir + "/" + item) != E_OK) {
709 LOGE("Create sub-directory for single ver failed, errno:%d", errno);
710 return -E_SYSTEM_API_FAIL;
711 }
712
713 if (option_.securityOpt.securityLabel == NOT_SET) {
714 continue;
715 }
716
717 SecurityOption option = option_.securityOpt;
718 if (item == DBConstant::METADB_DIR) {
719 option.securityLabel = ((option_.securityOpt.securityLabel >= SecurityLabel::S2) ?
720 SecurityLabel::S2 : option_.securityOpt.securityLabel);
721 option.securityFlag = SecurityFlag::ECE;
722 }
723
724 int errCode = RuntimeContext::GetInstance()->SetSecurityOption(option_.subdir + "/" + item, option);
725 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
726 LOGE("Set the security option of sub-directory failed[%d]", errCode);
727 return errCode;
728 }
729 }
730 return E_OK;
731 }
732
GetExistedSecOption(SecurityOption & secOption) const733 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
734 {
735 // Check the existence of the database, include the origin database and the database in the 'main' directory.
736 auto mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
737 auto mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
738 auto origDbFilePath = option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
739 if (!OS::CheckPathExistence(origDbFilePath) && !OS::CheckPathExistence(mainDbFilePath)) {
740 secOption = option_.securityOpt;
741 return E_OK;
742 }
743
744 // the main database file has high priority of the security option.
745 int errCode;
746 if (OS::CheckPathExistence(mainDbFilePath)) {
747 errCode = GetPathSecurityOption(mainDbFilePath, secOption);
748 } else {
749 errCode = GetPathSecurityOption(origDbFilePath, secOption);
750 }
751 if (errCode != E_OK) {
752 secOption = SecurityOption();
753 if (errCode == -E_NOT_SUPPORT) {
754 return E_OK;
755 }
756 LOGE("Get the security option of the existed database failed.");
757 }
758 return errCode;
759 }
760
ClearCorruptedFlag()761 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
762 {
763 isCorrupted_ = false;
764 }
765
PreCreateExecutor(bool isWrite)766 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
767 {
768 // Assume that create the write executor firstly and the write one we will not be released.
769 // If the write one would be released in the future, should take care the pass through.
770 if (!isWrite) {
771 return E_OK;
772 }
773
774 if (option_.isMemDb) {
775 return E_OK;
776 }
777
778 // Get the existed database secure option.
779 SecurityOption existedSecOpt;
780 int errCode = GetExistedSecOption(existedSecOpt);
781 if (errCode != E_OK) {
782 return errCode;
783 }
784
785 errCode = CheckDatabaseSecOpt(existedSecOpt);
786 if (errCode != E_OK) {
787 return errCode;
788 }
789
790 // Judge whether need update the security option of the engine.
791 // Should update the security in the import or rekey scene(inner).
792 if (!isNeedUpdateSecOpt_) {
793 option_.securityOpt = existedSecOpt;
794 }
795
796 errCode = CreateNewDirsAndSetSecOpt();
797 if (errCode != E_OK) {
798 return errCode;
799 }
800
801 if (!isUpdated_) {
802 errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
803 if (errCode != E_OK) {
804 LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
805 return errCode;
806 }
807 }
808
809 return E_OK;
810 }
811
EndCreateExecutor(bool isWrite)812 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
813 {
814 if (option_.isMemDb || !isWrite) {
815 return E_OK;
816 }
817
818 int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
819 isNeedUpdateSecOpt_);
820 if (errCode != E_OK) {
821 if (errCode == -E_NOT_SUPPORT) {
822 option_.securityOpt = SecurityOption();
823 errCode = E_OK;
824 }
825 LOGE("SetSecOption failed:%d", errCode);
826 return errCode;
827 }
828
829 // after setting secOption, the database file operation ends
830 // database create completed, delete the token
831 if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
832 OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
833 LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
834 return -E_SYSTEM_API_FAIL;
835 }
836 return errCode;
837 }
838
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)839 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
840 {
841 // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
842 if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
843 int errCode = AttachMetaDatabase(dbHandle, option_);
844 if (errCode != E_OK) {
845 (void)sqlite3_close_v2(dbHandle);
846 dbHandle = nullptr;
847 return errCode;
848 }
849 isAttachMeta = true;
850 }
851 return E_OK;
852 }
853
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)854 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
855 {
856 int errCode = PreCreateExecutor(isWrite);
857 if (errCode != E_OK) {
858 return errCode;
859 }
860
861 sqlite3 *dbHandle = nullptr;
862 errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
863 if (errCode != E_OK) {
864 return errCode;
865 }
866
867 bool isAttachMeta = false;
868 errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
869 if (errCode != E_OK) {
870 return errCode;
871 }
872
873 RegisterFunctionIfNeed(dbHandle);
874 errCode = Upgrade(dbHandle);
875 if (errCode != E_OK) {
876 (void)sqlite3_close_v2(dbHandle);
877 dbHandle = nullptr;
878 return errCode;
879 }
880
881 errCode = EndCreateExecutor(isWrite);
882 if (errCode != E_OK) {
883 LOGE("After create executor, set security option incomplete!");
884 (void)sqlite3_close_v2(dbHandle);
885 dbHandle = nullptr;
886 return errCode;
887 }
888
889 handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
890 if (handle == nullptr) {
891 LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
892 (void)sqlite3_close_v2(dbHandle);
893 dbHandle = nullptr;
894 return -E_OUT_OF_MEMORY;
895 }
896 if (isAttachMeta) {
897 SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
898 singleVerHandle->SetAttachMetaMode(isAttachMeta);
899 }
900 return E_OK;
901 }
902
Upgrade(sqlite3 * db)903 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
904 {
905 if (isUpdated_ || GetEngineState() == CACHEDB) {
906 LOGI("Storage engine is in cache status or has been upgraded[%d]!", isUpdated_);
907 return E_OK;
908 }
909
910 std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
911 LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
912 if (option_.schema.empty()) {
913 upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
914 } else {
915 SchemaObject schema;
916 int errCode = schema.ParseFromSchemaString(option_.schema);
917 if (errCode != E_OK) {
918 LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
919 return errCode;
920 }
921 upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
922 option_.securityOpt, option_.isMemDb);
923 }
924
925 std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
926 std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
927 SecurityOption secOpt = option_.securityOpt;
928 int errCode = E_OK;
929 if (isNeedUpdateSecOpt_) {
930 errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
931 if (errCode != E_OK) {
932 LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
933 if (errCode != -E_NOT_SUPPORT) {
934 return errCode;
935 }
936 secOpt = SecurityOption();
937 }
938 }
939
940 upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
941 upgrader->SetSubdir(option_.subdir);
942 errCode = upgrader->Upgrade();
943 if (errCode != E_OK) {
944 LOGE("Single ver database upgrade failed:%d", errCode);
945 return errCode;
946 }
947
948 LOGD("Finish upgrade single ver database!");
949 isUpdated_ = true; // Identification to avoid repeated upgrades
950 return errCode;
951 }
952
953 // Attention: This function should be called before "Upgrade".
954 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const955 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
956 {
957 // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
958 // not a newly created database, the meta-Table should exist and can be accessed.
959 std::string schemaStr = option_.schema;
960 if (schemaStr.empty()) {
961 // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
962 int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
963 if (errCode != E_OK) {
964 LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
965 }
966 }
967 if (!schemaStr.empty()) {
968 // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
969 int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
970 if (errCode != E_OK) { // Not very likely
971 // Just warning, if no index had been or need to be created, then put or kv-get can still use.
972 LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
973 }
974 }
975
976 // This function is used to update meta_data in triggers when it's attached to mainDB
977 int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
978 if (errCode != E_OK) {
979 LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
980 }
981 }
982
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const983 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
984 {
985 int errCode;
986 LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
987 std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
988 DBConstant::SINGLE_VER_META_STORE + DBConstant::SQLITE_DB_EXTENSION;
989 // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
990 if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
991 errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
992 if (errCode != E_OK) {
993 return errCode;
994 }
995 }
996 CipherPassword passwd;
997 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
998 if (errCode != E_OK) {
999 LOGE("AttachNewDatabase fail, errCode = %d", errCode);
1000 }
1001 return errCode;
1002 }
1003
ResetCacheRecordVersion()1004 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1005 {
1006 (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1007 }
1008
IncreaseCacheRecordVersion()1009 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1010 {
1011 (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1012 }
1013
GetAndIncreaseCacheRecordVersion()1014 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1015 {
1016 return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1017 }
1018
GetCacheRecordVersion() const1019 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1020 {
1021 return cacheRecordVersion_.load(std::memory_order_seq_cst);
1022 }
1023
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1024 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1025 int eventType) const
1026 {
1027 std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1028 if (commitNotifyFunc_ == nullptr) {
1029 LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1030 RefObject::DecObjRef(committedData);
1031 committedData = nullptr;
1032 return;
1033 }
1034 commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1035 committedData = nullptr;
1036 return;
1037 }
1038
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1039 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1040 {
1041 if (committedData == nullptr) {
1042 LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1043 return;
1044 }
1045 auto identifier = GetIdentifier();
1046 auto kvdb = KvDBManager::GetInstance()->FindKvDB(identifier);
1047 if (kvdb == nullptr) {
1048 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1049 return;
1050 }
1051 unsigned int conflictFlag = 0;
1052 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1053 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1054 }
1055 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1056 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1057 }
1058 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1059 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_NATIVE_ALL);
1060 }
1061 RefObject::DecObjRef(kvdb);
1062 LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1063 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1064 }
1065
SetMaxTimeStamp(TimeStamp maxTimeStamp) const1066 void SQLiteSingleVerStorageEngine::SetMaxTimeStamp(TimeStamp maxTimeStamp) const
1067 {
1068 auto kvdbManager = KvDBManager::GetInstance();
1069 if (kvdbManager == nullptr) {
1070 return;
1071 }
1072 auto identifier = GetIdentifier();
1073 auto kvdb = kvdbManager->FindKvDB(identifier);
1074 if (kvdb == nullptr) {
1075 LOGE("[SQLiteSingleVerStorageEngine::SetMaxTimeStamp] kvdb is null.");
1076 return;
1077 }
1078
1079 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
1080 kvStore->SetMaxTimeStamp(maxTimeStamp);
1081 RefObject::DecObjRef(kvdb);
1082 return;
1083 }
1084
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1085 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1086 {
1087 const auto &isRemote = syncData.isRemote;
1088 const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1089 auto &committedData = syncData.committedData;
1090 auto &entries = syncData.entries;
1091
1092 // Put data. Including insert, update and delete.
1093 if (!isRemoveDeviceData) {
1094 if (committedData != nullptr) {
1095 int eventType = isRemote ? SQLITE_GENERAL_NS_SYNC_EVENT : SQLITE_GENERAL_NS_PUT_EVENT;
1096 CommitAndReleaseNotifyData(committedData, eventType);
1097 }
1098 return;
1099 }
1100
1101 // Remove device data.
1102 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1103 return;
1104 }
1105 size_t totalSize = 0;
1106 for (auto iter = entries.begin(); iter != entries.end();) {
1107 auto &entry = *iter;
1108 if (committedData == nullptr) {
1109 committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1110 if (committedData == nullptr) {
1111 LOGE("Alloc committed notify data failed.");
1112 return;
1113 }
1114 }
1115 if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > DBConstant::MAX_VALUE_SIZE) {
1116 iter++;
1117 continue;
1118 }
1119 if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1120 CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1121 totalSize = 0;
1122 continue;
1123 }
1124 totalSize += (entry.key.size() + entry.value.size());
1125 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1126 iter++;
1127 }
1128 if (committedData != nullptr) {
1129 CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1130 }
1131 return;
1132 }
1133
1134 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1135 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1136 {
1137 std::lock_guard<std::mutex> lock(subscribeMutex_);
1138 subscribeQuery_[subscribeId] = query;
1139 }
1140 }
1141