1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_engine.h"
17
18 #include <memory>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_log_table_manager.h"
30 #include "sqlite_single_ver_database_upgrader.h"
31 #include "sqlite_single_ver_natural_store.h"
32 #include "sqlite_single_ver_schema_database_upgrader.h"
33
34 namespace DistributedDB {
SQLiteSingleVerStorageEngine()35 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
36 : executorState_(ExecutorState::INVALID),
37 cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
38 isCorrupted_(false),
39 isNeedUpdateSecOpt_(false),
40 maxValueSize_(DBConstant::MAX_VALUE_SIZE)
41 {}
42
~SQLiteSingleVerStorageEngine()43 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
44 {
45 }
46
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const47 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
48 {
49 return handle->MigrateLocalData();
50 }
51
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)52 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
53 {
54 auto kvdbManager = KvDBManager::GetInstance();
55 if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
56 return -E_INVALID_DB;
57 }
58 auto identifier = GetIdentifier();
59 auto kvdb = kvdbManager->FindKvDB(identifier);
60 if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
61 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
62 return -E_INVALID_DB;
63 }
64
65 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
66 for (const auto &devId : removeDevices) {
67 int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
68 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
69 RefObject::DecObjRef(kvdb);
70 return errCode;
71 }
72 }
73
74 RefObject::DecObjRef(kvdb);
75 return E_OK;
76 }
77
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const78 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
79 std::set<std::string> &removeDevices, bool &isNeedHash) const
80 {
81 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
82 return -E_INVALID_DB;
83 }
84 if (item.value.empty()) { // Device ID has been set to value in cache db
85 // Empty means remove all device data, get device id from meta key
86 // LCOV_EXCL_BR_LINE
87 int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
88 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
89 LOGE("Get remove devices list from meta failed. err=%d", errCode);
90 return errCode;
91 }
92 isNeedHash = false;
93 } else {
94 std::string deviceName;
95 DBCommon::VectorToString(item.value, deviceName);
96 removeDevices.insert(deviceName);
97 }
98 return E_OK;
99 }
100
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)101 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
102 const std::vector<DataItem> &dataItems)
103 {
104 int errCode = E_OK;
105 for (const auto &dataItem : dataItems) {
106 if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
107 (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
108 bool isNeedHash = true;
109 std::set<std::string> removeDevices;
110 errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
111 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
112 LOGE("Get remove device id failed. err=%d", errCode);
113 return errCode;
114 }
115
116 // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
117 errCode = ReleaseExecutor(handle);
118 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
119 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
120 return errCode;
121 }
122
123 errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
124 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
125 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
126 return errCode;
127 }
128
129 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
130 errCode));
131 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
132 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
133 return errCode;
134 }
135 }
136 }
137 return errCode;
138 }
139
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)140 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
141 NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
142 {
143 if (syncData.committedData == nullptr) {
144 syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
145 if (syncData.committedData == nullptr) {
146 LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
147 return -E_OUT_OF_MEMORY;
148 }
149 }
150 InitConflictNotifiedFlag(syncData.committedData);
151
152 std::vector<DataItem> dataItems;
153 uint64_t minVerIncurCacheDb = 0;
154 int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
155 if (errCode != E_OK) {
156 LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
157 return errCode;
158 }
159
160 if (minVerIncurCacheDb == 0) { // min version in cache db is 1
161 ++curMigrateVer;
162 return E_OK;
163 }
164
165 if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
166 curMigrateVer = minVerIncurCacheDb;
167 }
168
169 // Call the syncer module to erase the water mark.
170 errCode = EraseDeviceWaterMark(handle, dataItems);
171 if (errCode != E_OK) {
172 LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
173 return errCode;
174 }
175
176 // next version need process
177 LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
178 curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
179 errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
180 if (errCode != E_OK) {
181 LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
182 return errCode;
183 }
184
185 errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
186 if (errCode != E_OK) {
187 return errCode;
188 }
189
190 return E_OK;
191 }
192
193 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)194 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
195 NotifyMigrateSyncData &syncData)
196 {
197 int errCode = ReleaseExecutor(handle);
198 if (errCode != E_OK) {
199 LOGE("release executor for reopen database! errCode = [%d]", errCode);
200 return errCode;
201 }
202
203 CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
204
205 std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
206 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
207 if (errCode != E_OK) {
208 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
209 return errCode;
210 }
211 return errCode;
212 }
213
AddSubscribeToMainDBInMigrate()214 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
215 {
216 LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
217 std::lock_guard<std::mutex> lock(subscribeMutex_);
218 if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
219 return E_OK;
220 }
221 int errCode = E_OK;
222 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
223 if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
224 LOGE("Get available executor for add subscribe failed. %d", errCode);
225 return errCode;
226 }
227 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
228 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
229 goto END;
230 }
231 for (auto item : subscribeQuery_) {
232 errCode = handle->AddSubscribeTrigger(item.second, item.first);
233 if (errCode != E_OK) {
234 LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
235 }
236 }
237 subscribeQuery_.clear();
238 // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
239 (void)handle->Commit();
240 END:
241 ReleaseExecutor(handle);
242 return errCode;
243 }
244
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)245 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
246 {
247 int errCode = E_OK;
248 if (handle == nullptr) {
249 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
250 if (errCode != E_OK) {
251 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
252 return errCode;
253 }
254 }
255
256 LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
257 uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
258 NotifyMigrateSyncData syncData;
259 auto kvdbManager = KvDBManager::GetInstance();
260 if (kvdbManager != nullptr) {
261 auto identifier = GetIdentifier();
262 auto kvdb = kvdbManager->FindKvDB(identifier);
263 if (kvdb != nullptr) {
264 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
265 syncData.isPermitForceWrite =
266 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
267 RefObject::DecObjRef(kvdb);
268 } else {
269 LOGE("[SingleVerEngine] kvdb is null.");
270 }
271 }
272 // cache atomic version represents version of cacheDb input next time
273 while (curMigrateVer < GetCacheRecordVersion()) {
274 errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
275 if (errCode != E_OK) {
276 LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
277 break;
278 }
279 if (!syncData.isRemote) {
280 isNeedTriggerSync = true;
281 }
282 }
283 if (syncData.committedData != nullptr) {
284 RefObject::DecObjRef(syncData.committedData);
285 syncData.committedData = nullptr;
286 }
287 // When finished Migrating sync data, will fix engine state
288 return errCode;
289 }
290
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)291 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
292 EngineState stateBeforeMigrate)
293 {
294 LOGD("Begin attach main db and cache db by executor!");
295 // Judge the file corresponding to db by the engine status and attach it to another file
296 int errCode = E_OK;
297 std::string attachAbsPath;
298 if (stateBeforeMigrate == EngineState::MAINDB) {
299 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
300 DBConstant::DB_EXTENSION;
301 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
302 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
303 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
304 DBConstant::DB_EXTENSION;
305 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
306 } else {
307 return -E_NOT_SUPPORT;
308 }
309 if (errCode != E_OK) {
310 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
311 return errCode;
312 }
313
314 uint64_t maxVersion = 0;
315 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
316 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
317 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
318 }
319
320 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
321 return errCode;
322 }
323
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const324 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
325 {
326 LOGD("Begin attach main db and cache db by sqlite handle!");
327 // Judge the file corresponding to db by the engine status and attach it to another file
328 int errCode = E_OK;
329 std::string attachAbsPath;
330 if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
331 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
332 DBConstant::DB_EXTENSION;
333 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
334 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
335 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
336 DBConstant::DB_EXTENSION;
337 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
338 } else {
339 return -E_NOT_SUPPORT;
340 }
341 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
342 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
343 return errCode;
344 }
345
346 return errCode;
347 }
348
ReInit()349 int SQLiteSingleVerStorageEngine::ReInit()
350 {
351 return Init();
352 }
353
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)354 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
355 {
356 if (handle == nullptr) {
357 return E_OK;
358 }
359 StorageExecutor *databaseHandle = handle;
360 isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
361 Recycle(databaseHandle);
362 handle = nullptr;
363 if (isCorrupted_) {
364 LOGE("Database is corrupted or invalid passwd!");
365 return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
366 }
367 return E_OK;
368 }
369
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)370 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
371 EngineState stateBeforeMigrate)
372 {
373 LOGI("Begin to finish migrate and reinit db state!");
374 int errCode;
375 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
376 return -E_INVALID_ARGS;
377 }
378
379 if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
380 sqlite3 *dbHandle = nullptr;
381 errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
382 if (errCode != E_OK) {
383 LOGE("Get Db handle failed! errCode = [%d]", errCode);
384 return errCode;
385 }
386
387 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
388 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
389 LOGE("Execute the SQLite detach failed:%d", errCode);
390 return errCode;
391 }
392 // delete cachedb
393 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
394 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
395 LOGE("Remove files of cache database after detach:%d", errCode);
396 }
397
398 SetEngineState(EngineState::MAINDB);
399 return errCode;
400 }
401
402 errCode = ReleaseExecutor(handle);
403 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
404 LOGE("Release executor for reopen database! errCode = [%d]", errCode);
405 return errCode;
406 }
407
408 // close db for reinit this engine
409 Release();
410
411 // delete cache db
412 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
413 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
414 LOGE("Remove files of cache database after release current db:%d", errCode);
415 return errCode;
416 }
417
418 // reInit, it will reset engine state
419 errCode = ReInit();
420 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
421 LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
422 return errCode;
423 }
424
425 return E_OK;
426 }
427
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)428 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
429 EngineState preMigrateState)
430 {
431 // after attach main and cache need change operate data sql, changing state forbid operate database
432 SetEngineState(EngineState::MIGRATING);
433
434 int errCode = E_OK;
435 // check if has been attach and attach cache and main for migrate
436 if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
437 errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
438 if (errCode != E_OK) {
439 LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
440 // For lock state open db, can not attach main and cache
441 return errCode;
442 }
443 } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
444 // Has been attach, maybe ever crashed, need update version
445 executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
446 uint64_t maxVersion = 0;
447 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
448 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
449 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
450 }
451 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
452 } else {
453 return -E_UNEXPECTED_DATA;
454 }
455
456 return errCode;
457 }
458
ExecuteMigrate()459 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
460 {
461 EngineState preState = GetEngineState();
462 std::lock_guard<std::mutex> lock(migrateLock_);
463 if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
464 !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
465 DBConstant::DB_EXTENSION)) {
466 LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
467 return E_OK;
468 }
469
470 // Get write executor for migrate
471 int errCode = E_OK;
472 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
473 if (errCode != E_OK) {
474 LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
475 return errCode;
476 }
477
478 isMigrating_.store(true);
479 LOGD("Migrate start.");
480 bool isNeedTriggerSync = false;
481 errCode = InitExecuteMigrate(handle, preState);
482 if (errCode != E_OK) {
483 LOGE("Init migrate data fail, errCode = [%d]", errCode);
484 goto END;
485 }
486
487 LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
488 static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
489 // has been attached, Mark start of migration and it can migrate data
490 errCode = MigrateLocalData(handle);
491 if (errCode != E_OK) {
492 LOGE("Migrate local data fail, errCode = [%d]", errCode);
493 goto END;
494 }
495
496 errCode = MigrateSyncData(handle, isNeedTriggerSync);
497 if (errCode != E_OK) {
498 LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
499 goto END;
500 }
501
502 SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
503
504 // detach database and delete cachedb
505 errCode = FinishMigrateData(handle, preState);
506 if (errCode != E_OK) {
507 LOGE("Finish migrating data fail, errCode = [%d]", errCode);
508 goto END;
509 }
510
511 END: // after FinishMigrateData, it will reset engine state
512 // there is no need cover the errCode
513 EndMigrate(handle, preState, errCode, isNeedTriggerSync);
514 isMigrating_.store(false);
515 LOGD("Migrate stop.");
516 return errCode;
517 }
518
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)519 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
520 int errCode, bool isNeedTriggerSync)
521 {
522 LOGD("Finish migrating data! errCode = [%d]", errCode);
523 if (errCode != E_OK) {
524 SetEngineState(stateBeforeMigrate);
525 }
526 if (handle != nullptr) {
527 handle->ClearMigrateData();
528 }
529 errCode = ReleaseExecutor(handle);
530 if (errCode != E_OK) {
531 LOGE("release executor after migrating! errCode = [%d]", errCode);
532 }
533
534 errCode = AddSubscribeToMainDBInMigrate();
535 if (errCode != E_OK) {
536 LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
537 }
538
539 // Notify max timestamp offset for SyncEngine.
540 // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
541 RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
542 if (isNeedTriggerSync) {
543 commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
544 nullptr);
545 }
546 return;
547 }
548
IsEngineCorrupted() const549 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
550 {
551 return isCorrupted_;
552 }
553
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)554 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
555 {
556 auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
557 if (executor == nullptr) {
558 return executor;
559 }
560 executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
561 return executor;
562 }
563
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)564 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
565 {
566 // Only could get the main database handle in the uninitialized and the main status.
567 if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
568 LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
569 return -E_EKEYREVOKED;
570 }
571
572 if (!option_.isMemDb) {
573 option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
574 DBConstant::DB_EXTENSION;
575 }
576
577 OpenDbProperties optionTemp = option_;
578 if (!isWrite) {
579 optionTemp.createIfNecessary = false;
580 }
581
582 int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
583 if (errCode != E_OK) {
584 if (errno == EKEYREVOKED) {
585 LOGI("Failed to open the main database for key revoked[%d]", errCode);
586 errCode = -E_EKEYREVOKED;
587 }
588 return errCode;
589 }
590
591 executorState_ = ExecutorState::MAINDB;
592 // Set the engine state to main status for that the main database is valid.
593 SetEngineState(EngineState::MAINDB);
594
595 if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
596 DBConstant::DB_EXTENSION)) {
597 // In status cacheDb crash
598 errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
599 if (errCode != E_OK) {
600 LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
601 return E_OK; // not care err to return, only use for print log
602 }
603 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
604 // cache and main existed together, can not read data, must execute migrate first
605 SetEngineState(EngineState::ATTACHING);
606 }
607
608 return errCode;
609 }
610
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)611 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
612 {
613 int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
614 LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]", isWrite,
615 secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
616 if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
617 return errCode;
618 }
619 std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
620 DBConstant::DB_EXTENSION;
621 if (!isWrite || GetEngineState() != EngineState::INVALID ||
622 OS::CheckPathExistence(cacheDbPath)) {
623 LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
624 isWrite, GetEngineState());
625 return -E_EKEYREVOKED;
626 }
627
628 errCode = GetCacheDbHandle(dbHandle);
629 if (errCode != E_OK) {
630 LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
631 return errCode;
632 }
633 SetEngineState(EngineState::CACHEDB);
634 executorState_ = ExecutorState::CACHEDB;
635
636 ResetCacheRecordVersion();
637 // Get handle means maindb file ekeyevoked, not need attach to
638 return errCode;
639 }
640
641 namespace CacheDbSqls {
642 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
643 "CREATE TABLE IF NOT EXISTS local_data(" \
644 "key BLOB NOT NULL," \
645 "value BLOB," \
646 "timestamp INT," \
647 "hash_key BLOB PRIMARY KEY NOT NULL," \
648 "flag INT NOT NULL);";
649
650 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
651 "CREATE TABLE IF NOT EXISTS sync_data(" \
652 "key BLOB NOT NULL," \
653 "value BLOB," \
654 "timestamp INT NOT NULL," \
655 "flag INT NOT NULL," \
656 "device BLOB," \
657 "ori_device BLOB," \
658 "hash_key BLOB NOT NULL," \
659 "w_timestamp INT," \
660 "version INT NOT NULL," \
661 "PRIMARY Key(version, hash_key));";
662 }
663
664 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
665 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)666 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
667 {
668 option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
669 DBConstant::DB_EXTENSION;
670 // creatTable
671 option_.sqls = {
672 CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
673 CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
674 };
675
676 if (!option_.createIfNecessary) {
677 std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
678 DBConstant::DB_EXTENSION;
679 if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
680 return -E_INVALID_DB;
681 }
682 }
683
684 OpenDbProperties option = option_; // copy for no change it
685 option.createIfNecessary = true;
686 int errCode = SQLiteUtils::OpenDatabase(option, db);
687 if (errCode != E_OK) {
688 LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
689 return errCode;
690 }
691 return errCode;
692 }
693
CheckDatabaseSecOpt(const SecurityOption & secOption) const694 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
695 {
696 if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
697 secOption.securityLabel != SecurityLabel::NOT_SET &&
698 option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
699 LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
700 secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
701 option_.securityOpt.securityFlag);
702 }
703 }
704
CreateNewDirsAndSetSecOpt() const705 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
706 {
707 LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
708 return CreateNewDirsAndSetSecOption(option_);
709 }
710
GetExistedSecOption(SecurityOption & secOption) const711 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
712 {
713 LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
714 return GetExistedSecOpt(option_, secOption);
715 }
716
ClearCorruptedFlag()717 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
718 {
719 isCorrupted_ = false;
720 }
721
PreCreateExecutor(bool isWrite,SecurityOption & existedSecOpt)722 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite, SecurityOption &existedSecOpt)
723 {
724 // Assume that create the write executor firstly and the write one we will not be released.
725 // If the write one would be released in the future, should take care the pass through.
726 if (!isWrite) {
727 return E_OK;
728 }
729
730 if (option_.isMemDb) {
731 return E_OK;
732 }
733
734 // check sqlite open ok
735 int errCode = CheckStoreStatus(option_);
736 if (errCode != E_OK) {
737 return errCode;
738 }
739
740 // Get the existed database secure option.
741 errCode = GetExistedSecOption(existedSecOpt);
742 if (errCode != E_OK) {
743 return errCode;
744 }
745
746 CheckDatabaseSecOpt(existedSecOpt);
747
748 // Judge whether need update the security option of the engine.
749 // Should update the security in the import or rekey scene(inner) or exist is not set.
750 if (IsUseExistedSecOption(existedSecOpt, option_.securityOpt)) {
751 option_.securityOpt = existedSecOpt;
752 } else {
753 isNeedUpdateSecOpt_ = true;
754 }
755
756 errCode = CreateNewDirsAndSetSecOpt();
757 if (errCode != E_OK) {
758 return errCode;
759 }
760
761 if (!isUpdated_) {
762 errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
763 if (errCode != E_OK) {
764 LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
765 return errCode;
766 }
767 }
768
769 return E_OK;
770 }
771
EndCreateExecutor(sqlite3 * db,SecurityOption existedSecOpt,bool isWrite,bool isDetachMeta)772 int SQLiteSingleVerStorageEngine::EndCreateExecutor(sqlite3 *db, SecurityOption existedSecOpt, bool isWrite,
773 bool isDetachMeta)
774 {
775 if (option_.isMemDb || !isWrite) {
776 return E_OK;
777 }
778
779 int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt, existedSecOpt,
780 isNeedUpdateSecOpt_);
781 if (errCode != E_OK) {
782 if (errCode == -E_NOT_SUPPORT) {
783 option_.securityOpt = SecurityOption();
784 errCode = E_OK;
785 }
786 LOGE("SetSecOption failed:%d", errCode);
787 return errCode;
788 }
789
790 // after setting secOption, the database file operation ends
791 // database create completed, delete the token
792 if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
793 OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
794 LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
795 return -E_SYSTEM_API_FAIL;
796 }
797 if (isDetachMeta) {
798 errCode = SQLiteUtils::ExecuteRawSQL(db, "DETACH 'meta'");
799 if (errCode != E_OK) {
800 LOGE("Detach meta db failed %d", errCode);
801 return errCode;
802 } else {
803 LOGI("Detach meta db success");
804 }
805 }
806 errCode = SqliteLogTableManager::CreateKvSyncLogTable(db);
807 if (errCode != E_OK) {
808 LOGE("[SQLiteSingleVerStorageEngine] create cloud log table failed, errCode = [%d]", errCode);
809 } else {
810 LOGI("[SQLiteSingleVerStorageEngine] create cloud log table success");
811 }
812 return errCode;
813 }
814
TryAttachMetaDb(const SecurityOption & existedSecOpt,sqlite3 * & dbHandle,bool & isAttachMeta,bool & isNeedDetachMeta)815 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(const SecurityOption &existedSecOpt, sqlite3 *&dbHandle,
816 bool &isAttachMeta, bool &isNeedDetachMeta)
817 {
818 bool isCurrentSESECE = ParamCheckUtils::IsS3SECEOpt(existedSecOpt);
819 bool isOpenSESECE = ParamCheckUtils::IsS3SECEOpt(option_.securityOpt);
820 // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
821 if ((!option_.isMemDb) && (isOpenSESECE || (isNeedUpdateSecOpt_ && isCurrentSESECE))) {
822 int errCode = AttachMetaDatabase(dbHandle, option_);
823 if (errCode != E_OK) {
824 (void)sqlite3_close_v2(dbHandle);
825 dbHandle = nullptr;
826 return errCode;
827 }
828 isAttachMeta = isOpenSESECE; // only open with S3 SECE need in attach mode
829 isNeedDetachMeta = !isOpenSESECE && isCurrentSESECE; // NOT S3 SECE no need meta.db
830 }
831 return E_OK;
832 }
833
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)834 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
835 {
836 SecurityOption existedSecOpt;
837 int errCode = PreCreateExecutor(isWrite, existedSecOpt);
838 if (errCode != E_OK) {
839 return errCode;
840 }
841
842 sqlite3 *dbHandle = nullptr;
843 errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
844 if (errCode != E_OK) {
845 return errCode;
846 }
847
848 bool isAttachMeta = false;
849 bool isDetachMeta = false;
850 errCode = TryAttachMetaDb(existedSecOpt, dbHandle, isAttachMeta, isDetachMeta);
851 if (errCode != E_OK) {
852 return errCode;
853 }
854
855 RegisterFunctionIfNeed(dbHandle);
856 errCode = Upgrade(dbHandle);
857 if (errCode != E_OK) {
858 (void)sqlite3_close_v2(dbHandle);
859 dbHandle = nullptr;
860 return errCode;
861 }
862
863 errCode = EndCreateExecutor(dbHandle, existedSecOpt, isWrite, isDetachMeta);
864 if (errCode != E_OK) {
865 LOGE("After create executor, set security option incomplete!");
866 (void)sqlite3_close_v2(dbHandle);
867 dbHandle = nullptr;
868 return errCode;
869 }
870
871 handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
872 if (handle == nullptr) {
873 LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
874 (void)sqlite3_close_v2(dbHandle);
875 dbHandle = nullptr;
876 return -E_OUT_OF_MEMORY;
877 }
878 if (isAttachMeta) {
879 SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
880 singleVerHandle->SetAttachMetaMode(isAttachMeta);
881 }
882 return E_OK;
883 }
884
Upgrade(sqlite3 * db)885 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
886 {
887 if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
888 return E_OK;
889 }
890
891 std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
892 LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
893 if (option_.schema.empty()) {
894 upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
895 } else {
896 SchemaObject schema;
897 int errCode = schema.ParseFromSchemaString(option_.schema);
898 if (errCode != E_OK) {
899 LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
900 return errCode;
901 }
902 upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
903 option_.securityOpt, option_.isMemDb);
904 }
905
906 std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
907 std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
908 SecurityOption secOpt = option_.securityOpt;
909 int errCode = E_OK;
910 if (isNeedUpdateSecOpt_) {
911 errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
912 if (errCode != E_OK) {
913 LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
914 if (errCode != -E_NOT_SUPPORT) {
915 return errCode;
916 }
917 secOpt = SecurityOption();
918 }
919 }
920
921 upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
922 upgrader->SetSubdir(option_.subdir);
923 errCode = upgrader->Upgrade();
924 if (errCode != E_OK) {
925 LOGE("Single ver database upgrade failed:%d", errCode);
926 return errCode;
927 }
928
929 LOGD("Finish upgrade single ver database!");
930 isUpdated_ = true; // Identification to avoid repeated upgrades
931 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
932 isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
933 return errCode;
934 }
935
936 // Attention: This function should be called before "Upgrade".
937 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const938 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
939 {
940 // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
941 // not a newly created database, the meta-Table should exist and can be accessed.
942 std::string schemaStr = option_.schema;
943 if (schemaStr.empty()) {
944 // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
945 int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
946 if (errCode != E_OK) {
947 LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
948 }
949 }
950 if (!schemaStr.empty()) {
951 // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
952 int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
953 if (errCode != E_OK) { // Not very likely
954 // Just warning, if no index had been or need to be created, then put or kv-get can still use.
955 LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
956 }
957 }
958
959 // This function is used to update meta_data in triggers when it's attached to mainDB
960 int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
961 if (errCode != E_OK) {
962 LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
963 }
964 }
965
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const966 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
967 {
968 int errCode;
969 LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
970 std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
971 DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
972 // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
973 if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
974 errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
975 if (errCode != E_OK) {
976 return errCode;
977 }
978 }
979 CipherPassword passwd;
980 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
981 if (errCode != E_OK) {
982 LOGE("AttachNewDatabase fail, errCode = %d", errCode);
983 }
984 return errCode;
985 }
986
ResetCacheRecordVersion()987 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
988 {
989 (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
990 }
991
IncreaseCacheRecordVersion()992 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
993 {
994 (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
995 }
996
GetAndIncreaseCacheRecordVersion()997 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
998 {
999 return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1000 }
1001
GetCacheRecordVersion() const1002 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1003 {
1004 return cacheRecordVersion_.load(std::memory_order_seq_cst);
1005 }
1006
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1007 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1008 int eventType) const
1009 {
1010 std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1011 if (commitNotifyFunc_ == nullptr) {
1012 LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1013 RefObject::DecObjRef(committedData);
1014 committedData = nullptr;
1015 return;
1016 }
1017 commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1018 committedData = nullptr;
1019 }
1020
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1021 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1022 {
1023 if (committedData == nullptr) {
1024 LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1025 return;
1026 }
1027 auto identifier = GetIdentifier();
1028 auto kvDBManager = KvDBManager::GetInstance();
1029 if (kvDBManager == nullptr) {
1030 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1031 return;
1032 }
1033 auto kvdb = kvDBManager->FindKvDB(identifier);
1034 if (kvdb == nullptr) {
1035 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1036 return;
1037 }
1038 unsigned int conflictFlag = 0;
1039 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1040 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1041 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1042 }
1043 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1044 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1045 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1046 }
1047 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1048 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1049 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1050 }
1051 RefObject::DecObjRef(kvdb);
1052 LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1053 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1054 }
1055
SetMaxValueSize(uint32_t maxValueSize)1056 void SQLiteSingleVerStorageEngine::SetMaxValueSize(uint32_t maxValueSize)
1057 {
1058 LOGI("Set the max value size to %" PRIu32, maxValueSize);
1059 maxValueSize_ = maxValueSize;
1060 }
1061
GetMaxValueSize()1062 uint32_t SQLiteSingleVerStorageEngine::GetMaxValueSize()
1063 {
1064 return maxValueSize_;
1065 }
1066
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1067 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1068 {
1069 const auto &isRemote = syncData.isRemote;
1070 const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1071 auto &committedData = syncData.committedData;
1072 auto &entries = syncData.entries;
1073
1074 // Put data. Including insert, update and delete.
1075 if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1076 if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1077 int eventType = static_cast<int>(isRemote ?
1078 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1079 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1080 CommitAndReleaseNotifyData(committedData, eventType);
1081 }
1082 return;
1083 }
1084
1085 // Remove device data.
1086 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1087 return;
1088 }
1089 size_t totalSize = 0;
1090 for (auto iter = entries.begin(); iter != entries.end();) {
1091 auto &entry = *iter;
1092 if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1093 committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1094 if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1095 LOGE("Alloc committed notify data failed.");
1096 return;
1097 }
1098 }
1099 if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > maxValueSize_) { // LCOV_EXCL_BR_LINE
1100 iter++;
1101 continue;
1102 }
1103 if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1104 CommitAndReleaseNotifyData(committedData,
1105 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1106 totalSize = 0;
1107 continue;
1108 }
1109 totalSize += (entry.key.size() + entry.value.size());
1110 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1111 iter++;
1112 }
1113 if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1114 CommitAndReleaseNotifyData(committedData,
1115 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1116 }
1117 }
1118
1119 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1120 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1121 {
1122 std::lock_guard<std::mutex> lock(subscribeMutex_);
1123 subscribeQuery_[subscribeId] = query;
1124 }
1125
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1126 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1127 const SecurityOption &openSecOpt)
1128 {
1129 if (isNeedUpdateSecOpt_) {
1130 return false;
1131 }
1132 if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1133 return false;
1134 }
1135 return true;
1136 }
1137
UpgradeLocalMetaData()1138 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1139 {
1140 std::function<int(void)> schemaChangedFunc = nullptr;
1141 {
1142 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1143 if (isSchemaChanged_) {
1144 schemaChangedFunc = schemaChangedFunc_;
1145 isSchemaChanged_ = false;
1146 }
1147 }
1148 if (schemaChangedFunc != nullptr) {
1149 return schemaChangedFunc();
1150 }
1151 return E_OK;
1152 }
1153 }
1154