1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_engine.h"
17
18 #include <memory>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "db_constant.h"
23 #include "sqlite_single_ver_database_upgrader.h"
24 #include "sqlite_single_ver_natural_store.h"
25 #include "sqlite_single_ver_schema_database_upgrader.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "db_common.h"
29 #include "kvdb_manager.h"
30 #include "param_check_utils.h"
31
32 namespace DistributedDB {
33 namespace {
34 const uint64_t CACHE_RECORD_DEFAULT_VERSION = 1;
GetPathSecurityOption(const std::string & filePath,SecurityOption & secOpt)35 int GetPathSecurityOption(const std::string &filePath, SecurityOption &secOpt)
36 {
37 return RuntimeContext::GetInstance()->GetSecurityOption(filePath, secOpt);
38 }
39
40 enum class DbType {
41 MAIN,
42 META,
43 CACHE
44 };
45
GetDbDir(const std::string & subDir,DbType type)46 std::string GetDbDir(const std::string &subDir, DbType type)
47 {
48 static const std::map<DbType, std::string> dbDirDic {
49 { DbType::MAIN, DBConstant::MAINDB_DIR },
50 { DbType::META, DBConstant::METADB_DIR },
51 { DbType::CACHE, DBConstant::CACHEDB_DIR },
52 }; // for ensure static compilation order
53
54 if (dbDirDic.find(type) == dbDirDic.end()) {
55 return std::string();
56 }
57 return subDir + "/" + dbDirDic.at(type);
58 }
59 } // namespace
60
SQLiteSingleVerStorageEngine()61 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
62 : cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
63 executorState_(ExecutorState::INVALID),
64 isCorrupted_(false),
65 isNeedUpdateSecOpt_(false)
66 {}
67
~SQLiteSingleVerStorageEngine()68 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
69 {
70 }
71
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const72 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
73 {
74 return handle->MigrateLocalData();
75 }
76
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)77 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
78 const std::vector<DataItem> &dataItems)
79 {
80 int errCode = E_OK;
81 for (const auto &dataItem : dataItems) {
82 if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
83 (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
84 auto kvdbManager = KvDBManager::GetInstance();
85 if (kvdbManager == nullptr) {
86 return -E_INVALID_DB;
87 }
88
89 // sync module will use handle to fix water mark, if fix fail then migrate fail, not need hold write handle
90 errCode = ReleaseExecutor(handle);
91 if (errCode != E_OK) {
92 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
93 return errCode;
94 }
95
96 auto identifier = GetIdentifier();
97 auto kvdb = kvdbManager->FindKvDB(identifier);
98 if (kvdb == nullptr) {
99 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
100 return -E_INVALID_DB;
101 }
102
103 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
104 errCode = kvStore->EraseDeviceWaterMark(dataItem.dev, false);
105 RefObject::DecObjRef(kvdb);
106 if (errCode != E_OK) {
107 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
108 return errCode;
109 }
110
111 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
112 errCode));
113 if (errCode != E_OK) {
114 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
115 return errCode;
116 }
117 }
118 }
119 return errCode;
120 }
121
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)122 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
123 NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
124 {
125 if (syncData.committedData == nullptr) {
126 syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
127 if (syncData.committedData == nullptr) {
128 LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
129 return -E_OUT_OF_MEMORY;
130 }
131 }
132 InitConflictNotifiedFlag(syncData.committedData);
133
134 std::vector<DataItem> dataItems;
135 uint64_t minVerIncurCacheDb = 0;
136 int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
137 if (errCode != E_OK) {
138 LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
139 return errCode;
140 }
141
142 if (minVerIncurCacheDb == 0) { // min version in cache db is 1
143 ++curMigrateVer;
144 return E_OK;
145 }
146
147 if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
148 curMigrateVer = minVerIncurCacheDb;
149 }
150
151 // Call the syncer module to erase the water mark.
152 errCode = EraseDeviceWaterMark(handle, dataItems);
153 if (errCode != E_OK) {
154 LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
155 return errCode;
156 }
157
158 // next version need process
159 LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
160 curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
161 errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
162 if (errCode != E_OK) {
163 LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
164 return errCode;
165 }
166
167 CommitNotifyForMigrateCache(syncData);
168
169 Timestamp timestamp = 0;
170 errCode = handle->GetMaxTimestampDuringMigrating(timestamp);
171 if (errCode == E_OK) {
172 SetMaxTimestamp(timestamp);
173 }
174
175 errCode = ReleaseHandleTransiently(handle, 2ull); // temporary release handle 2ms
176 if (errCode != E_OK) {
177 return errCode;
178 }
179
180 return E_OK;
181 }
182
183 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime)184 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime)
185 {
186 int errCode = ReleaseExecutor(handle);
187 if (errCode != E_OK) {
188 LOGE("release executor for reopen database! errCode = [%d]", errCode);
189 return errCode;
190 }
191
192 std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
193 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
194 if (errCode != E_OK) {
195 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
196 return errCode;
197 }
198 return errCode;
199 }
200
AddSubscribeToMainDBInMigrate()201 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
202 {
203 LOGD("Add subscribe to mainDB from cache. %d", engineState_);
204 std::lock_guard<std::mutex> lock(subscribeMutex_);
205 if (subscribeQuery_.empty()) {
206 return E_OK;
207 }
208 int errCode = E_OK;
209 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
210 if (errCode != E_OK || handle == nullptr) {
211 LOGE("Get available executor for add subscribe failed. %d", errCode);
212 return errCode;
213 }
214 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
215 if (errCode != E_OK) {
216 goto END;
217 }
218 for (auto item : subscribeQuery_) {
219 errCode = handle->AddSubscribeTrigger(item.second, item.first);
220 if (errCode != E_OK) {
221 LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
222 }
223 }
224 subscribeQuery_.clear();
225 // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
226 (void)handle->Commit();
227 END:
228 ReleaseExecutor(handle);
229 return errCode;
230 }
231
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)232 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
233 {
234 int errCode = E_OK;
235 if (handle == nullptr) {
236 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
237 if (errCode != E_OK) {
238 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
239 return errCode;
240 }
241 }
242
243 LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion() - 1);
244 uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
245 NotifyMigrateSyncData syncData;
246 auto kvdbManager = KvDBManager::GetInstance();
247 if (kvdbManager != nullptr) {
248 auto identifier = GetIdentifier();
249 auto kvdb = kvdbManager->FindKvDB(identifier);
250 if (kvdb != nullptr) {
251 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
252 syncData.isPermitForceWrite =
253 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
254 RefObject::DecObjRef(kvdb);
255 } else {
256 LOGE("[SingleVerEngine] kvdb is null.");
257 }
258 }
259 // cache atomic version represents version of cacheDb input next time
260 while (curMigrateVer < GetCacheRecordVersion()) {
261 errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
262 if (errCode != E_OK) {
263 LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
264 break;
265 }
266 if (!syncData.isRemote) {
267 isNeedTriggerSync = true;
268 }
269 }
270 if (syncData.committedData != nullptr) {
271 RefObject::DecObjRef(syncData.committedData);
272 syncData.committedData = nullptr;
273 }
274 // When finished Migrating sync data, will fix engine state
275 return errCode;
276 }
277
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)278 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
279 EngineState stateBeforeMigrate)
280 {
281 LOGD("Begin attach main db and cache db by executor!");
282 // Judge the file corresponding to db by the engine status and attach it to another file
283 int errCode = E_OK;
284 std::string attachAbsPath;
285 if (stateBeforeMigrate == EngineState::MAINDB) {
286 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
287 DBConstant::SQLITE_DB_EXTENSION;
288 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
289 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
290 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
291 DBConstant::SQLITE_DB_EXTENSION;
292 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
293 } else {
294 return -E_NOT_SUPPORT;
295 }
296 if (errCode != E_OK) {
297 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
298 return errCode;
299 }
300
301 uint64_t maxVersion = 0;
302 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
303 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
304 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
305 }
306
307 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
308 return errCode;
309 }
310
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const311 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
312 {
313 LOGD("Begin attach main db and cache db by sqlite handle!");
314 // Judge the file corresponding to db by the engine status and attach it to another file
315 int errCode = E_OK;
316 std::string attachAbsPath;
317 if (stateBeforeMigrate == EngineState::MAINDB) {
318 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
319 DBConstant::SQLITE_DB_EXTENSION;
320 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
321 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
322 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
323 DBConstant::SQLITE_DB_EXTENSION;
324 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
325 } else {
326 return -E_NOT_SUPPORT;
327 }
328 if (errCode != E_OK) {
329 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
330 return errCode;
331 }
332
333 return errCode;
334 }
335
ReInit()336 int SQLiteSingleVerStorageEngine::ReInit()
337 {
338 return Init();
339 }
340
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)341 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
342 {
343 if (handle == nullptr) {
344 return E_OK;
345 }
346 StorageExecutor *databaseHandle = handle;
347 isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
348 Recycle(databaseHandle);
349 handle = nullptr;
350 if (isCorrupted_) {
351 LOGE("Database is corrupted!");
352 return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
353 }
354 return E_OK;
355 }
356
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)357 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
358 EngineState stateBeforeMigrate)
359 {
360 LOGI("Begin to finish migrate and reinit db state!");
361 int errCode;
362 if (handle == nullptr) {
363 return -E_INVALID_ARGS;
364 }
365
366 if (stateBeforeMigrate == EngineState::MAINDB) {
367 sqlite3 *dbHandle = nullptr;
368 errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
369 if (errCode != E_OK) {
370 LOGE("Get Db handle failed! errCode = [%d]", errCode);
371 return errCode;
372 }
373
374 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
375 if (errCode != E_OK) {
376 LOGE("Execute the SQLite detach failed:%d", errCode);
377 return errCode;
378 }
379 // delete cachedb
380 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
381 if (errCode != E_OK) {
382 LOGE("Remove files of cache database after detach:%d", errCode);
383 }
384
385 SetEngineState(EngineState::MAINDB);
386 return errCode;
387 }
388
389 errCode = ReleaseExecutor(handle);
390 if (errCode != E_OK) {
391 LOGE("Release executor for reopen database! errCode = [%d]", errCode);
392 return errCode;
393 }
394
395 // close db for reinit this engine
396 Release();
397
398 // delete cache db
399 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
400 if (errCode != E_OK) {
401 LOGE("Remove files of cache database after release current db:%d", errCode);
402 return errCode;
403 }
404
405 // reInit, it will reset engine state
406 errCode = ReInit();
407 if (errCode != E_OK) {
408 LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
409 return errCode;
410 }
411
412 return E_OK;
413 }
414
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)415 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
416 EngineState preMigrateState)
417 {
418 // after attach main and cache need change operate data sql, changing state forbid operate database
419 SetEngineState(EngineState::MIGRATING);
420
421 int errCode = E_OK;
422 // check if has been attach and attach cache and main for migrate
423 if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
424 errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
425 if (errCode != E_OK) {
426 LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
427 // For lock state open db, can not attach main and cache
428 return errCode;
429 }
430 } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
431 // Has been attach, maybe ever crashed, need update version
432 executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
433 uint64_t maxVersion = 0;
434 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
435 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
436 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
437 }
438 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
439 } else {
440 return -E_UNEXPECTED_DATA;
441 }
442
443 return errCode;
444 }
445
ExecuteMigrate()446 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
447 {
448 EngineState preState = GetEngineState();
449 std::lock_guard<std::mutex> lock(migrateLock_);
450 if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
451 !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
452 DBConstant::SQLITE_DB_EXTENSION)) {
453 LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
454 return E_OK;
455 }
456
457 // Get write executor for migrate
458 int errCode = E_OK;
459 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
460 if (errCode != E_OK) {
461 LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
462 return errCode;
463 }
464
465 isMigrating_.store(true);
466 LOGD("Migrate start.");
467 bool isNeedTriggerSync = false;
468 errCode = InitExecuteMigrate(handle, preState);
469 if (errCode != E_OK) {
470 LOGE("Init migrate data fail, errCode = [%d]", errCode);
471 goto END;
472 }
473
474 LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
475 static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
476 // has been attached, Mark start of migration and it can migrate data
477 errCode = MigrateLocalData(handle);
478 if (errCode != E_OK) {
479 LOGE("Migrate local data fail, errCode = [%d]", errCode);
480 goto END;
481 }
482
483 errCode = MigrateSyncData(handle, isNeedTriggerSync);
484 if (errCode != E_OK) {
485 LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
486 goto END;
487 }
488
489 SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
490
491 // detach database and delete cachedb
492 errCode = FinishMigrateData(handle, preState);
493 if (errCode != E_OK) {
494 LOGE("Finish migrating data fail, errCode = [%d]", errCode);
495 goto END;
496 }
497
498 END: // after FinishMigrateData, it will reset engine state
499 // there is no need cover the errCode
500 EndMigrate(handle, preState, errCode, isNeedTriggerSync);
501 isMigrating_.store(false);
502 LOGD("Migrate stop.");
503 return errCode;
504 }
505
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)506 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
507 int errCode, bool isNeedTriggerSync)
508 {
509 LOGD("Finish migrating data! errCode = [%d]", errCode);
510 if (errCode != E_OK) {
511 SetEngineState(stateBeforeMigrate);
512 }
513 if (handle != nullptr) {
514 handle->ClearMigrateData();
515 }
516 errCode = ReleaseExecutor(handle);
517 if (errCode != E_OK) {
518 LOGE("release executor after migrating! errCode = [%d]", errCode);
519 }
520
521 errCode = AddSubscribeToMainDBInMigrate();
522 if (errCode != E_OK) {
523 LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
524 }
525 // Notify max timestamp offset for SyncEngine.
526 // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
527 RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
528 if (isNeedTriggerSync) {
529 commitNotifyFunc_(SQLITE_GENERAL_FINISH_MIGRATE_EVENT, nullptr);
530 }
531 return;
532 }
533
IsEngineCorrupted() const534 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
535 {
536 return isCorrupted_;
537 }
538
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)539 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
540 {
541 auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
542 if (executor == nullptr) {
543 return executor;
544 }
545 executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
546 return executor;
547 }
548
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)549 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
550 {
551 // Only could get the main database handle in the uninitialized and the main status.
552 if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
553 LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
554 return -E_EKEYREVOKED;
555 }
556
557 if (!option_.isMemDb) {
558 option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
559 DBConstant::SQLITE_DB_EXTENSION;
560 }
561
562 OpenDbProperties optionTemp = option_;
563 if (!isWrite) {
564 optionTemp.createIfNecessary = false;
565 }
566
567 int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
568 if (errCode != E_OK) {
569 if (errno == EKEYREVOKED) {
570 LOGI("Failed to open the main database for key revoked[%d]", errCode);
571 errCode = -E_EKEYREVOKED;
572 }
573 return errCode;
574 }
575
576 executorState_ = ExecutorState::MAINDB;
577 // Set the engine state to main status for that the main database is valid.
578 SetEngineState(EngineState::MAINDB);
579
580 if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
581 DBConstant::SQLITE_DB_EXTENSION)) {
582 // In status cacheDb crash
583 errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
584 if (errCode != E_OK) {
585 LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
586 return E_OK; // not care err to return, only use for print log
587 }
588 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
589 // cache and main existed together, can not read data, must execute migrate first
590 SetEngineState(EngineState::ATTACHING);
591 }
592
593 return errCode;
594 }
595
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)596 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
597 {
598 int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
599 LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]", isWrite,
600 secOpt.securityLabel, secOpt.securityFlag, DBCommon::TransferStringToHex(identifier_).c_str(), errCode);
601 if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
602 return errCode;
603 }
604
605 std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
606 DBConstant::SQLITE_DB_EXTENSION;
607 if (!isWrite || GetEngineState() != EngineState::INVALID ||
608 OS::CheckPathExistence(cacheDbPath)) {
609 LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
610 isWrite, GetEngineState());
611 return -E_EKEYREVOKED;
612 }
613
614 errCode = GetCacheDbHandle(dbHandle);
615 if (errCode != E_OK) {
616 LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
617 return errCode;
618 }
619 SetEngineState(CACHEDB);
620 executorState_ = ExecutorState::CACHEDB;
621
622 ResetCacheRecordVersion();
623 // Get handle means maindb file ekeyevoked, not need attach to
624 return errCode;
625 }
626
627 namespace CacheDbSqls {
628 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
629 "CREATE TABLE IF NOT EXISTS local_data(" \
630 "key BLOB NOT NULL," \
631 "value BLOB," \
632 "timestamp INT," \
633 "hash_key BLOB PRIMARY KEY NOT NULL," \
634 "flag INT NOT NULL);";
635
636 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
637 "CREATE TABLE IF NOT EXISTS sync_data(" \
638 "key BLOB NOT NULL," \
639 "value BLOB," \
640 "timestamp INT NOT NULL," \
641 "flag INT NOT NULL," \
642 "device BLOB," \
643 "ori_device BLOB," \
644 "hash_key BLOB NOT NULL," \
645 "w_timestamp INT," \
646 "version INT NOT NULL," \
647 "PRIMARY Key(version, hash_key));";
648 }
649
650 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
651 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)652 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
653 {
654 option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
655 DBConstant::SQLITE_DB_EXTENSION;
656 // creatTable
657 option_.sqls = {
658 CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
659 CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
660 };
661
662 if (!option_.createIfNecessary) {
663 std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
664 DBConstant::SQLITE_DB_EXTENSION;
665 if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
666 return -E_INVALID_DB;
667 }
668 }
669
670 OpenDbProperties option = option_; // copy for no change it
671 option.createIfNecessary = true;
672 int errCode = SQLiteUtils::OpenDatabase(option, db);
673 if (errCode != E_OK) {
674 LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
675 return errCode;
676 }
677 return errCode;
678 }
679
CheckDatabaseSecOpt(const SecurityOption & secOption) const680 int SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
681 {
682 if (!(secOption == option_.securityOpt) &&
683 secOption.securityLabel != SecurityLabel::NOT_SET &&
684 option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
685 LOGE("SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]", secOption.securityLabel,
686 secOption.securityFlag, option_.securityOpt.securityLabel, option_.securityOpt.securityFlag);
687 return -E_SECURITY_OPTION_CHECK_ERROR;
688 }
689 return E_OK;
690 }
691
CreateNewDirsAndSetSecOpt() const692 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
693 {
694 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
695 for (const auto &item : dbDir) {
696 if (OS::CheckPathExistence(option_.subdir + "/" + item)) {
697 continue;
698 }
699
700 // Dir and old db file not existed, it means that the database is newly created
701 // need create flag of database not incomplete
702 if (!OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
703 !OS::CheckPathExistence(option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE +
704 DBConstant::SQLITE_DB_EXTENSION) &&
705 OS::CreateFileByFileName(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
706 LOGE("Fail to create the token of database incompleted! errCode = [E_SYSTEM_API_FAIL]");
707 return -E_SYSTEM_API_FAIL;
708 }
709
710 if (DBCommon::CreateDirectory(option_.subdir + "/" + item) != E_OK) {
711 LOGE("Create sub-directory for single ver failed, errno:%d", errno);
712 return -E_SYSTEM_API_FAIL;
713 }
714
715 if (option_.securityOpt.securityLabel == NOT_SET) {
716 continue;
717 }
718
719 SecurityOption option = option_.securityOpt;
720 if (item == DBConstant::METADB_DIR) {
721 option.securityLabel = ((option_.securityOpt.securityLabel >= SecurityLabel::S2) ?
722 SecurityLabel::S2 : option_.securityOpt.securityLabel);
723 option.securityFlag = SecurityFlag::ECE;
724 }
725
726 int errCode = RuntimeContext::GetInstance()->SetSecurityOption(option_.subdir + "/" + item, option);
727 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
728 LOGE("Set the security option of sub-directory failed[%d]", errCode);
729 return errCode;
730 }
731 }
732 return E_OK;
733 }
734
GetExistedSecOption(SecurityOption & secOption) const735 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
736 {
737 // Check the existence of the database, include the origin database and the database in the 'main' directory.
738 auto mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
739 auto mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
740 auto origDbFilePath = option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
741 if (!OS::CheckPathExistence(origDbFilePath) && !OS::CheckPathExistence(mainDbFilePath)) {
742 secOption = option_.securityOpt;
743 return E_OK;
744 }
745
746 // the main database file has high priority of the security option.
747 int errCode;
748 if (OS::CheckPathExistence(mainDbFilePath)) {
749 errCode = GetPathSecurityOption(mainDbFilePath, secOption);
750 } else {
751 errCode = GetPathSecurityOption(origDbFilePath, secOption);
752 }
753 if (errCode != E_OK) {
754 secOption = SecurityOption();
755 if (errCode == -E_NOT_SUPPORT) {
756 return E_OK;
757 }
758 LOGE("Get the security option of the existed database failed.");
759 }
760 return errCode;
761 }
762
ClearCorruptedFlag()763 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
764 {
765 isCorrupted_ = false;
766 }
767
PreCreateExecutor(bool isWrite)768 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
769 {
770 // Assume that create the write executor firstly and the write one we will not be released.
771 // If the write one would be released in the future, should take care the pass through.
772 if (!isWrite) {
773 return E_OK;
774 }
775
776 if (option_.isMemDb) {
777 return E_OK;
778 }
779
780 // Get the existed database secure option.
781 SecurityOption existedSecOpt;
782 int errCode = GetExistedSecOption(existedSecOpt);
783 if (errCode != E_OK) {
784 return errCode;
785 }
786
787 errCode = CheckDatabaseSecOpt(existedSecOpt);
788 if (errCode != E_OK) {
789 return errCode;
790 }
791
792 // Judge whether need update the security option of the engine.
793 // Should update the security in the import or rekey scene(inner).
794 if (!isNeedUpdateSecOpt_) {
795 option_.securityOpt = existedSecOpt;
796 }
797
798 errCode = CreateNewDirsAndSetSecOpt();
799 if (errCode != E_OK) {
800 return errCode;
801 }
802
803 if (!isUpdated_) {
804 errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
805 if (errCode != E_OK) {
806 LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
807 return errCode;
808 }
809 }
810
811 return E_OK;
812 }
813
EndCreateExecutor(bool isWrite)814 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
815 {
816 if (option_.isMemDb || !isWrite) {
817 return E_OK;
818 }
819
820 int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
821 isNeedUpdateSecOpt_);
822 if (errCode != E_OK) {
823 if (errCode == -E_NOT_SUPPORT) {
824 option_.securityOpt = SecurityOption();
825 errCode = E_OK;
826 }
827 LOGE("SetSecOption failed:%d", errCode);
828 return errCode;
829 }
830
831 // after setting secOption, the database file operation ends
832 // database create completed, delete the token
833 if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
834 OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
835 LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
836 return -E_SYSTEM_API_FAIL;
837 }
838 return errCode;
839 }
840
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)841 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
842 {
843 // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
844 if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
845 int errCode = AttachMetaDatabase(dbHandle, option_);
846 if (errCode != E_OK) {
847 (void)sqlite3_close_v2(dbHandle);
848 dbHandle = nullptr;
849 return errCode;
850 }
851 isAttachMeta = true;
852 }
853 return E_OK;
854 }
855
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)856 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
857 {
858 int errCode = PreCreateExecutor(isWrite);
859 if (errCode != E_OK) {
860 return errCode;
861 }
862
863 sqlite3 *dbHandle = nullptr;
864 errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
865 if (errCode != E_OK) {
866 return errCode;
867 }
868
869 bool isAttachMeta = false;
870 errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
871 if (errCode != E_OK) {
872 return errCode;
873 }
874
875 RegisterFunctionIfNeed(dbHandle);
876 errCode = Upgrade(dbHandle);
877 if (errCode != E_OK) {
878 (void)sqlite3_close_v2(dbHandle);
879 dbHandle = nullptr;
880 return errCode;
881 }
882
883 errCode = EndCreateExecutor(isWrite);
884 if (errCode != E_OK) {
885 LOGE("After create executor, set security option incomplete!");
886 (void)sqlite3_close_v2(dbHandle);
887 dbHandle = nullptr;
888 return errCode;
889 }
890
891 handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
892 if (handle == nullptr) {
893 LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
894 (void)sqlite3_close_v2(dbHandle);
895 dbHandle = nullptr;
896 return -E_OUT_OF_MEMORY;
897 }
898 if (isAttachMeta) {
899 SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
900 singleVerHandle->SetAttachMetaMode(isAttachMeta);
901 }
902 return E_OK;
903 }
904
Upgrade(sqlite3 * db)905 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
906 {
907 if (isUpdated_ || GetEngineState() == CACHEDB) {
908 LOGI("Storage engine is in cache status or has been upgraded[%d]!", isUpdated_);
909 return E_OK;
910 }
911
912 std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
913 LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
914 if (option_.schema.empty()) {
915 upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
916 } else {
917 SchemaObject schema;
918 int errCode = schema.ParseFromSchemaString(option_.schema);
919 if (errCode != E_OK) {
920 LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
921 return errCode;
922 }
923 upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
924 option_.securityOpt, option_.isMemDb);
925 }
926
927 std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
928 std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
929 SecurityOption secOpt = option_.securityOpt;
930 int errCode = E_OK;
931 if (isNeedUpdateSecOpt_) {
932 errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
933 if (errCode != E_OK) {
934 LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
935 if (errCode != -E_NOT_SUPPORT) {
936 return errCode;
937 }
938 secOpt = SecurityOption();
939 }
940 }
941
942 upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
943 upgrader->SetSubdir(option_.subdir);
944 errCode = upgrader->Upgrade();
945 if (errCode != E_OK) {
946 LOGE("Single ver database upgrade failed:%d", errCode);
947 return errCode;
948 }
949
950 LOGD("Finish upgrade single ver database!");
951 isUpdated_ = true; // Identification to avoid repeated upgrades
952 return errCode;
953 }
954
955 // Attention: This function should be called before "Upgrade".
956 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const957 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
958 {
959 // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
960 // not a newly created database, the meta-Table should exist and can be accessed.
961 std::string schemaStr = option_.schema;
962 if (schemaStr.empty()) {
963 // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
964 int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
965 if (errCode != E_OK) {
966 LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
967 }
968 }
969 if (!schemaStr.empty()) {
970 // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
971 int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
972 if (errCode != E_OK) { // Not very likely
973 // Just warning, if no index had been or need to be created, then put or kv-get can still use.
974 LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
975 }
976 }
977
978 // This function is used to update meta_data in triggers when it's attached to mainDB
979 int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
980 if (errCode != E_OK) {
981 LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
982 }
983 }
984
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const985 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
986 {
987 int errCode;
988 LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
989 std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
990 DBConstant::SINGLE_VER_META_STORE + DBConstant::SQLITE_DB_EXTENSION;
991 // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
992 if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
993 errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
994 if (errCode != E_OK) {
995 return errCode;
996 }
997 }
998 CipherPassword passwd;
999 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
1000 if (errCode != E_OK) {
1001 LOGE("AttachNewDatabase fail, errCode = %d", errCode);
1002 }
1003 return errCode;
1004 }
1005
ResetCacheRecordVersion()1006 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1007 {
1008 (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1009 }
1010
IncreaseCacheRecordVersion()1011 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1012 {
1013 (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1014 }
1015
GetAndIncreaseCacheRecordVersion()1016 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1017 {
1018 return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1019 }
1020
GetCacheRecordVersion() const1021 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1022 {
1023 return cacheRecordVersion_.load(std::memory_order_seq_cst);
1024 }
1025
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1026 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1027 int eventType) const
1028 {
1029 std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1030 if (commitNotifyFunc_ == nullptr) {
1031 LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1032 RefObject::DecObjRef(committedData);
1033 committedData = nullptr;
1034 return;
1035 }
1036 commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1037 committedData = nullptr;
1038 return;
1039 }
1040
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1041 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1042 {
1043 if (committedData == nullptr) {
1044 LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1045 return;
1046 }
1047 auto identifier = GetIdentifier();
1048 auto kvdb = KvDBManager::GetInstance()->FindKvDB(identifier);
1049 if (kvdb == nullptr) {
1050 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1051 return;
1052 }
1053 unsigned int conflictFlag = 0;
1054 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1055 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1056 }
1057 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1058 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1059 }
1060 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1061 conflictFlag |= static_cast<unsigned>(SQLITE_GENERAL_NS_NATIVE_ALL);
1062 }
1063 RefObject::DecObjRef(kvdb);
1064 LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1065 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1066 }
1067
SetMaxTimestamp(Timestamp maxTimestamp) const1068 void SQLiteSingleVerStorageEngine::SetMaxTimestamp(Timestamp maxTimestamp) const
1069 {
1070 auto kvdbManager = KvDBManager::GetInstance();
1071 if (kvdbManager == nullptr) {
1072 return;
1073 }
1074 auto identifier = GetIdentifier();
1075 auto kvdb = kvdbManager->FindKvDB(identifier);
1076 if (kvdb == nullptr) {
1077 LOGE("[SQLiteSingleVerStorageEngine::SetMaxTimestamp] kvdb is null.");
1078 return;
1079 }
1080
1081 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
1082 kvStore->SetMaxTimestamp(maxTimestamp);
1083 RefObject::DecObjRef(kvdb);
1084 return;
1085 }
1086
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1087 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1088 {
1089 const auto &isRemote = syncData.isRemote;
1090 const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1091 auto &committedData = syncData.committedData;
1092 auto &entries = syncData.entries;
1093
1094 // Put data. Including insert, update and delete.
1095 if (!isRemoveDeviceData) {
1096 if (committedData != nullptr) {
1097 int eventType = isRemote ? SQLITE_GENERAL_NS_SYNC_EVENT : SQLITE_GENERAL_NS_PUT_EVENT;
1098 CommitAndReleaseNotifyData(committedData, eventType);
1099 }
1100 return;
1101 }
1102
1103 // Remove device data.
1104 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1105 return;
1106 }
1107 size_t totalSize = 0;
1108 for (auto iter = entries.begin(); iter != entries.end();) {
1109 auto &entry = *iter;
1110 if (committedData == nullptr) {
1111 committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1112 if (committedData == nullptr) {
1113 LOGE("Alloc committed notify data failed.");
1114 return;
1115 }
1116 }
1117 if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > DBConstant::MAX_VALUE_SIZE) {
1118 iter++;
1119 continue;
1120 }
1121 if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1122 CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1123 totalSize = 0;
1124 continue;
1125 }
1126 totalSize += (entry.key.size() + entry.value.size());
1127 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1128 iter++;
1129 }
1130 if (committedData != nullptr) {
1131 CommitAndReleaseNotifyData(committedData, SQLITE_GENERAL_NS_SYNC_EVENT);
1132 }
1133 return;
1134 }
1135
1136 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1137 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1138 {
1139 std::lock_guard<std::mutex> lock(subscribeMutex_);
1140 subscribeQuery_[subscribeId] = query;
1141 }
1142 }
1143