1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_engine.h"
17
18 #include <memory>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "single_ver_utils.h"
29 #include "sqlite_log_table_manager.h"
30 #include "sqlite_single_ver_database_upgrader.h"
31 #include "sqlite_single_ver_natural_store.h"
32 #include "sqlite_single_ver_schema_database_upgrader.h"
33
34 namespace DistributedDB {
SQLiteSingleVerStorageEngine()35 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
36 : executorState_(ExecutorState::INVALID),
37 cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
38 isCorrupted_(false),
39 isNeedUpdateSecOpt_(false),
40 maxValueSize_(DBConstant::MAX_VALUE_SIZE)
41 {}
42
~SQLiteSingleVerStorageEngine()43 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
44 {
45 }
46
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const47 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
48 {
49 return handle->MigrateLocalData();
50 }
51
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)52 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
53 {
54 auto kvdbManager = KvDBManager::GetInstance();
55 if (kvdbManager == nullptr) { // LCOV_EXCL_BR_LINE
56 return -E_INVALID_DB;
57 }
58 auto identifier = GetIdentifier();
59 auto kvdb = kvdbManager->FindKvDB(identifier);
60 if (kvdb == nullptr) { // LCOV_EXCL_BR_LINE
61 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
62 return -E_INVALID_DB;
63 }
64
65 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
66 for (const auto &devId : removeDevices) {
67 int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
68 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
69 RefObject::DecObjRef(kvdb);
70 return errCode;
71 }
72 }
73
74 RefObject::DecObjRef(kvdb);
75 return E_OK;
76 }
77
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const78 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
79 std::set<std::string> &removeDevices, bool &isNeedHash) const
80 {
81 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
82 return -E_INVALID_DB;
83 }
84 if (item.value.empty()) { // Device ID has been set to value in cache db
85 // Empty means remove all device data, get device id from meta key
86 // LCOV_EXCL_BR_LINE
87 int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
88 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
89 LOGE("Get remove devices list from meta failed. err=%d", errCode);
90 return errCode;
91 }
92 isNeedHash = false;
93 } else {
94 std::string deviceName;
95 DBCommon::VectorToString(item.value, deviceName);
96 removeDevices.insert(deviceName);
97 }
98 return E_OK;
99 }
100
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)101 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
102 const std::vector<DataItem> &dataItems)
103 {
104 int errCode = E_OK;
105 for (const auto &dataItem : dataItems) {
106 if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
107 (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
108 bool isNeedHash = true;
109 std::set<std::string> removeDevices;
110 errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
111 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
112 LOGE("Get remove device id failed. err=%d", errCode);
113 return errCode;
114 }
115
116 // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
117 errCode = ReleaseExecutor(handle);
118 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
119 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
120 return errCode;
121 }
122
123 errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
124 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
125 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
126 return errCode;
127 }
128
129 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
130 errCode));
131 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
132 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
133 return errCode;
134 }
135 }
136 }
137 return errCode;
138 }
139
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)140 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
141 NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
142 {
143 if (syncData.committedData == nullptr) {
144 syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
145 if (syncData.committedData == nullptr) {
146 LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
147 return -E_OUT_OF_MEMORY;
148 }
149 }
150 InitConflictNotifiedFlag(syncData.committedData);
151
152 std::vector<DataItem> dataItems;
153 uint64_t minVerIncurCacheDb = 0;
154 if (handle == nullptr) {
155 LOGE("[MigrateSyncDataByVersion] handle is nullptr.");
156 return -E_INVALID_DB;
157 }
158 int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
159 if (errCode != E_OK) {
160 LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
161 return errCode;
162 }
163
164 if (minVerIncurCacheDb == 0) { // min version in cache db is 1
165 ++curMigrateVer;
166 return E_OK;
167 }
168
169 if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
170 curMigrateVer = minVerIncurCacheDb;
171 }
172
173 // Call the syncer module to erase the water mark.
174 errCode = EraseDeviceWaterMark(handle, dataItems);
175 if (errCode != E_OK) {
176 LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
177 return errCode;
178 }
179
180 // next version need process
181 LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
182 curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
183 errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
184 if (errCode != E_OK) {
185 LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
186 return errCode;
187 }
188
189 errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
190 if (errCode != E_OK) {
191 return errCode;
192 }
193
194 return E_OK;
195 }
196
197 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)198 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
199 NotifyMigrateSyncData &syncData)
200 {
201 int errCode = ReleaseExecutor(handle);
202 if (errCode != E_OK) {
203 LOGE("release executor for reopen database! errCode = [%d]", errCode);
204 return errCode;
205 }
206
207 CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
208
209 std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
210 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
211 if (errCode != E_OK) {
212 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
213 return errCode;
214 }
215 return errCode;
216 }
217
AddSubscribeToMainDBInMigrate()218 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
219 {
220 LOGD("Add subscribe to mainDB from cache. %d", GetEngineState());
221 std::lock_guard<std::mutex> lock(subscribeMutex_);
222 if (subscribeQuery_.empty()) { // LCOV_EXCL_BR_LINE
223 return E_OK;
224 }
225 int errCode = E_OK;
226 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
227 if (errCode != E_OK || handle == nullptr) { // LCOV_EXCL_BR_LINE
228 LOGE("Get available executor for add subscribe failed. %d", errCode);
229 return errCode;
230 }
231 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
232 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
233 goto END;
234 }
235 for (auto item : subscribeQuery_) {
236 errCode = handle->AddSubscribeTrigger(item.second, item.first);
237 if (errCode != E_OK) {
238 LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
239 }
240 }
241 subscribeQuery_.clear();
242 // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
243 (void)handle->Commit();
244 END:
245 ReleaseExecutor(handle);
246 return errCode;
247 }
248
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)249 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
250 {
251 int errCode = E_OK;
252 if (handle == nullptr) {
253 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
254 if (errCode != E_OK) {
255 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
256 return errCode;
257 }
258 }
259
260 LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion());
261 uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
262 NotifyMigrateSyncData syncData;
263 auto kvdbManager = KvDBManager::GetInstance();
264 if (kvdbManager != nullptr) {
265 auto identifier = GetIdentifier();
266 auto kvdb = kvdbManager->FindKvDB(identifier);
267 if (kvdb != nullptr) {
268 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
269 syncData.isPermitForceWrite =
270 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
271 RefObject::DecObjRef(kvdb);
272 } else {
273 LOGE("[SingleVerEngine] kvdb is null.");
274 }
275 }
276 // cache atomic version represents version of cacheDb input next time
277 while (curMigrateVer < GetCacheRecordVersion()) {
278 errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
279 if (errCode != E_OK) {
280 LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
281 break;
282 }
283 if (!syncData.isRemote) {
284 isNeedTriggerSync = true;
285 }
286 }
287 if (syncData.committedData != nullptr) {
288 RefObject::DecObjRef(syncData.committedData);
289 syncData.committedData = nullptr;
290 }
291 // When finished Migrating sync data, will fix engine state
292 return errCode;
293 }
294
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)295 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
296 EngineState stateBeforeMigrate)
297 {
298 LOGD("Begin attach main db and cache db by executor!");
299 // Judge the file corresponding to db by the engine status and attach it to another file
300 int errCode = E_OK;
301 std::string attachAbsPath;
302 if (handle == nullptr) {
303 LOGE("[AttachMainDbAndCacheDb] handle is nullptr.");
304 return -E_INVALID_DB;
305 }
306 if (stateBeforeMigrate == EngineState::MAINDB) {
307 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
308 DBConstant::DB_EXTENSION;
309 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
310 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
311 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
312 DBConstant::DB_EXTENSION;
313 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
314 } else {
315 return -E_NOT_SUPPORT;
316 }
317 if (errCode != E_OK) {
318 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
319 return errCode;
320 }
321
322 uint64_t maxVersion = 0;
323 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
324 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
325 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
326 }
327
328 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
329 return errCode;
330 }
331
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const332 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
333 {
334 LOGD("Begin attach main db and cache db by sqlite handle!");
335 // Judge the file corresponding to db by the engine status and attach it to another file
336 int errCode = E_OK;
337 std::string attachAbsPath;
338 if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
339 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
340 DBConstant::DB_EXTENSION;
341 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
342 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
343 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
344 DBConstant::DB_EXTENSION;
345 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
346 } else {
347 return -E_NOT_SUPPORT;
348 }
349 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
350 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
351 return errCode;
352 }
353
354 return errCode;
355 }
356
ReInit()357 int SQLiteSingleVerStorageEngine::ReInit()
358 {
359 return Init();
360 }
361
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)362 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
363 {
364 if (handle == nullptr) {
365 return E_OK;
366 }
367 StorageExecutor *databaseHandle = handle;
368 isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
369 Recycle(databaseHandle);
370 handle = nullptr;
371 if (isCorrupted_) {
372 LOGE("Database is corrupted or invalid passwd!");
373 return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
374 }
375 return E_OK;
376 }
377
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)378 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
379 EngineState stateBeforeMigrate)
380 {
381 LOGI("Begin to finish migrate and reinit db state!");
382 int errCode;
383 if (handle == nullptr) { // LCOV_EXCL_BR_LINE
384 return -E_INVALID_ARGS;
385 }
386
387 if (stateBeforeMigrate == EngineState::MAINDB) { // LCOV_EXCL_BR_LINE
388 sqlite3 *dbHandle = nullptr;
389 errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
390 if (errCode != E_OK) {
391 LOGE("Get Db handle failed! errCode = [%d]", errCode);
392 return errCode;
393 }
394
395 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
396 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
397 LOGE("Execute the SQLite detach failed:%d", errCode);
398 return errCode;
399 }
400 // delete cachedb
401 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
402 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
403 LOGE("Remove files of cache database after detach:%d", errCode);
404 }
405
406 SetEngineState(EngineState::MAINDB);
407 return errCode;
408 }
409
410 errCode = ReleaseExecutor(handle);
411 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
412 LOGE("Release executor for reopen database! errCode = [%d]", errCode);
413 return errCode;
414 }
415
416 // close db for reinit this engine
417 Release();
418
419 // delete cache db
420 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
421 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
422 LOGE("Remove files of cache database after release current db:%d", errCode);
423 return errCode;
424 }
425
426 // reInit, it will reset engine state
427 errCode = ReInit();
428 if (errCode != E_OK) { // LCOV_EXCL_BR_LINE
429 LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
430 return errCode;
431 }
432
433 return E_OK;
434 }
435
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)436 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
437 EngineState preMigrateState)
438 {
439 // after attach main and cache need change operate data sql, changing state forbid operate database
440 SetEngineState(EngineState::MIGRATING);
441
442 int errCode = E_OK;
443 // check if has been attach and attach cache and main for migrate
444 if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
445 errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
446 if (errCode != E_OK) {
447 LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
448 // For lock state open db, can not attach main and cache
449 return errCode;
450 }
451 } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
452 // Has been attach, maybe ever crashed, need update version
453 executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
454 uint64_t maxVersion = 0;
455 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
456 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
457 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
458 }
459 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
460 } else {
461 return -E_UNEXPECTED_DATA;
462 }
463
464 return errCode;
465 }
466
ExecuteMigrate()467 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
468 {
469 EngineState preState = GetEngineState();
470 std::lock_guard<std::mutex> lock(migrateLock_);
471 if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
472 !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
473 DBConstant::DB_EXTENSION)) {
474 LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
475 return E_OK;
476 }
477
478 // Get write executor for migrate
479 int errCode = E_OK;
480 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
481 if (errCode != E_OK) {
482 LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
483 return errCode;
484 }
485
486 isMigrating_.store(true);
487 LOGD("Migrate start.");
488 bool isNeedTriggerSync = false;
489 errCode = InitExecuteMigrate(handle, preState);
490 if (errCode != E_OK) {
491 LOGE("Init migrate data fail, errCode = [%d]", errCode);
492 goto END;
493 }
494
495 LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
496 static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
497 // has been attached, Mark start of migration and it can migrate data
498 errCode = MigrateLocalData(handle);
499 if (errCode != E_OK) {
500 LOGE("Migrate local data fail, errCode = [%d]", errCode);
501 goto END;
502 }
503
504 errCode = MigrateSyncData(handle, isNeedTriggerSync);
505 if (errCode != E_OK) {
506 LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
507 goto END;
508 }
509
510 SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
511
512 // detach database and delete cachedb
513 errCode = FinishMigrateData(handle, preState);
514 if (errCode != E_OK) {
515 LOGE("Finish migrating data fail, errCode = [%d]", errCode);
516 goto END;
517 }
518
519 END: // after FinishMigrateData, it will reset engine state
520 // there is no need cover the errCode
521 EndMigrate(handle, preState, errCode, isNeedTriggerSync);
522 isMigrating_.store(false);
523 LOGD("Migrate stop.");
524 return errCode;
525 }
526
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)527 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
528 int errCode, bool isNeedTriggerSync)
529 {
530 LOGD("Finish migrating data! errCode = [%d]", errCode);
531 if (errCode != E_OK) {
532 SetEngineState(stateBeforeMigrate);
533 }
534 if (handle != nullptr) {
535 handle->ClearMigrateData();
536 }
537 errCode = ReleaseExecutor(handle);
538 if (errCode != E_OK) {
539 LOGE("release executor after migrating! errCode = [%d]", errCode);
540 }
541
542 errCode = AddSubscribeToMainDBInMigrate();
543 if (errCode != E_OK) {
544 LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
545 }
546
547 // Notify max timestamp offset for SyncEngine.
548 // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
549 RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
550 if (isNeedTriggerSync) {
551 commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
552 nullptr);
553 }
554 return;
555 }
556
IsEngineCorrupted() const557 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
558 {
559 return isCorrupted_;
560 }
561
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)562 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
563 {
564 auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
565 if (executor == nullptr) {
566 return executor;
567 }
568 executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
569 return executor;
570 }
571
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db,OpenDbProperties & option)572 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db, OpenDbProperties &option)
573 {
574 // Only could get the main database handle in the uninitialized and the main status.
575 if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
576 LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
577 return -E_EKEYREVOKED;
578 }
579
580 if (!option.isMemDb) {
581 option.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
582 DBConstant::DB_EXTENSION;
583 SetUri(option.uri);
584 }
585
586 if (!isWrite) {
587 option.createIfNecessary = false;
588 SetCreateIfNecessary(option.createIfNecessary);
589 }
590
591 int errCode = SQLiteUtils::OpenDatabase(option, db);
592 if (errCode != E_OK) {
593 if (errno == EKEYREVOKED) {
594 LOGI("Failed to open the main database for key revoked[%d]", errCode);
595 errCode = -E_EKEYREVOKED;
596 }
597 return errCode;
598 }
599
600 executorState_ = ExecutorState::MAINDB;
601 // Set the engine state to main status for that the main database is valid.
602 SetEngineState(EngineState::MAINDB);
603
604 if (OS::CheckPathExistence(GetDbDir(option.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
605 DBConstant::DB_EXTENSION)) {
606 // In status cacheDb crash
607 errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
608 if (errCode != E_OK) {
609 LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
610 return E_OK; // not care err to return, only use for print log
611 }
612 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
613 // cache and main existed together, can not read data, must execute migrate first
614 SetEngineState(EngineState::ATTACHING);
615 }
616
617 return errCode;
618 }
619
GetDbHandle(bool isWrite,sqlite3 * & dbHandle,OpenDbProperties & option)620 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, sqlite3 *&dbHandle, OpenDbProperties &option)
621 {
622 int errCode = TryToOpenMainDatabase(isWrite, dbHandle, option);
623 const auto &secOpt = option.securityOpt;
624 LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]", isWrite,
625 secOpt.securityLabel, secOpt.securityFlag, hashIdentifier_.c_str(), errCode);
626 if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
627 return errCode;
628 }
629 std::string cacheDbPath = GetDbDir(option.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
630 DBConstant::DB_EXTENSION;
631 if (!isWrite || GetEngineState() != EngineState::INVALID ||
632 OS::CheckPathExistence(cacheDbPath)) {
633 LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
634 isWrite, GetEngineState());
635 return -E_EKEYREVOKED;
636 }
637
638 errCode = GetCacheDbHandle(dbHandle, option);
639 if (errCode != E_OK) {
640 LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
641 return errCode;
642 }
643 SetEngineState(EngineState::CACHEDB);
644 executorState_ = ExecutorState::CACHEDB;
645
646 ResetCacheRecordVersion();
647 // Get handle means maindb file ekeyevoked, not need attach to
648 return errCode;
649 }
650
651 namespace CacheDbSqls {
652 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
653 "CREATE TABLE IF NOT EXISTS local_data(" \
654 "key BLOB NOT NULL," \
655 "value BLOB," \
656 "timestamp INT," \
657 "hash_key BLOB PRIMARY KEY NOT NULL," \
658 "flag INT NOT NULL);";
659
660 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
661 "CREATE TABLE IF NOT EXISTS sync_data(" \
662 "key BLOB NOT NULL," \
663 "value BLOB," \
664 "timestamp INT NOT NULL," \
665 "flag INT NOT NULL," \
666 "device BLOB," \
667 "ori_device BLOB," \
668 "hash_key BLOB NOT NULL," \
669 "w_timestamp INT," \
670 "version INT NOT NULL," \
671 "PRIMARY Key(version, hash_key));";
672 }
673
674 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
675 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db,OpenDbProperties & option)676 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db, OpenDbProperties &option)
677 {
678 option.uri = GetDbDir(option.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
679 DBConstant::DB_EXTENSION;
680 SetUri(option.uri);
681 // creatTable
682 option.sqls = {CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL, CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL};
683 SetSQL(option.sqls);
684
685 if (!option.createIfNecessary) {
686 std::string mainDbPath = GetDbDir(option.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
687 DBConstant::DB_EXTENSION;
688 if (!OS::CheckPathExistence(mainDbPath)) { // Whether to create a cacheDb is based on whether the mainDb exists
689 return -E_INVALID_DB;
690 }
691 }
692
693 option.createIfNecessary = true;
694 int errCode = SQLiteUtils::OpenDatabase(option, db);
695 if (errCode != E_OK) {
696 LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
697 return errCode;
698 }
699 return errCode;
700 }
701
CheckDatabaseSecOpt(const SecurityOption & secOption) const702 void SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
703 {
704 if (!(secOption == option_.securityOpt) && (secOption.securityLabel > option_.securityOpt.securityLabel) &&
705 secOption.securityLabel != SecurityLabel::NOT_SET &&
706 option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
707 LOGW("[SQLiteSingleVerStorageEngine] SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]",
708 secOption.securityLabel, secOption.securityFlag, option_.securityOpt.securityLabel,
709 option_.securityOpt.securityFlag);
710 }
711 }
712
CreateNewDirsAndSetSecOpt() const713 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
714 {
715 LOGD("[SQLiteSingleVerStorageEngine] Begin to create new dirs and set security option");
716 return CreateNewDirsAndSetSecOption(option_);
717 }
718
GetExistedSecOption(SecurityOption & secOption) const719 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
720 {
721 LOGD("[SQLiteSingleVerStorageEngine] Try to get existed sec option");
722 return GetExistedSecOpt(option_, secOption);
723 }
724
ClearCorruptedFlag()725 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
726 {
727 isCorrupted_ = false;
728 }
729
PreCreateExecutor(bool isWrite,SecurityOption & existedSecOpt,OpenDbProperties & option)730 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite, SecurityOption &existedSecOpt,
731 OpenDbProperties &option)
732 {
733 // Assume that create the write executor firstly and the write one we will not be released.
734 // If the write one would be released in the future, should take care the pass through.
735 if (!isWrite) {
736 return E_OK;
737 }
738
739 if (option.isMemDb) {
740 return E_OK;
741 }
742
743 // check sqlite open ok
744 int errCode = CheckStoreStatus(option);
745 if (errCode != E_OK) {
746 return errCode;
747 }
748
749 // Get the existed database secure option.
750 errCode = GetExistedSecOption(existedSecOpt);
751 if (errCode != E_OK) {
752 return errCode;
753 }
754
755 CheckDatabaseSecOpt(existedSecOpt);
756
757 // Judge whether need update the security option of the engine.
758 // Should update the security in the import or rekey scene(inner) or exist is not set.
759 if (IsUseExistedSecOption(existedSecOpt, option.securityOpt)) {
760 option.securityOpt = existedSecOpt;
761 SetSecurityOption(existedSecOpt);
762 } else {
763 isNeedUpdateSecOpt_ = true;
764 }
765
766 errCode = CreateNewDirsAndSetSecOpt();
767 if (errCode != E_OK) {
768 return errCode;
769 }
770
771 if (!isUpdated_) {
772 errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option.subdir, option);
773 if (errCode != E_OK) {
774 LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
775 return errCode;
776 }
777 }
778
779 return E_OK;
780 }
781
EndCreateExecutor(sqlite3 * db,SecurityOption existedSecOpt,bool isWrite,bool isDetachMeta,OpenDbProperties & option)782 int SQLiteSingleVerStorageEngine::EndCreateExecutor(sqlite3 *db, SecurityOption existedSecOpt, bool isWrite,
783 bool isDetachMeta, OpenDbProperties &option)
784 {
785 if (option.isMemDb || !isWrite) {
786 return E_OK;
787 }
788
789 int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option.subdir, option.securityOpt, existedSecOpt,
790 isNeedUpdateSecOpt_);
791 if (errCode != E_OK) {
792 if (errCode == -E_NOT_SUPPORT) {
793 option.securityOpt = SecurityOption();
794 SetSecurityOption(option.securityOpt);
795 errCode = E_OK;
796 }
797 LOGE("SetSecOption failed:%d", errCode);
798 return errCode;
799 }
800
801 // after setting secOption, the database file operation ends
802 // database create completed, delete the token
803 if (OS::CheckPathExistence(option.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
804 OS::RemoveFile(option.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
805 LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
806 return -E_SYSTEM_API_FAIL;
807 }
808 if (isDetachMeta) {
809 errCode = SQLiteUtils::ExecuteRawSQL(db, "DETACH 'meta'");
810 if (errCode != E_OK) {
811 LOGE("Detach meta db failed %d", errCode);
812 return errCode;
813 } else {
814 LOGI("Detach meta db success");
815 }
816 }
817 errCode = SqliteLogTableManager::CreateKvSyncLogTable(db);
818 if (errCode != E_OK) {
819 LOGE("[SqlSinEngine] create cloud log table failed, errCode = [%d]", errCode);
820 } else {
821 LOGI("[SqlSinEngine] create cloud log table success");
822 }
823 return errCode;
824 }
825
TryAttachMetaDb(const SecurityOption & existedSecOpt,sqlite3 * & dbHandle,bool & isAttachMeta,bool & isNeedDetachMeta,OpenDbProperties & option)826 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(const SecurityOption &existedSecOpt, sqlite3 *&dbHandle,
827 bool &isAttachMeta, bool &isNeedDetachMeta, OpenDbProperties &option)
828 {
829 bool isCurrentSESECE = ParamCheckUtils::IsS3SECEOpt(existedSecOpt);
830 bool isOpenSESECE = ParamCheckUtils::IsS3SECEOpt(option.securityOpt);
831 // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
832 if ((!option.isMemDb) && (isOpenSESECE || (isNeedUpdateSecOpt_ && isCurrentSESECE))) {
833 int errCode = AttachMetaDatabase(dbHandle, option);
834 if (errCode != E_OK) {
835 (void)sqlite3_close_v2(dbHandle);
836 dbHandle = nullptr;
837 return errCode;
838 }
839 isAttachMeta = isOpenSESECE; // only open with S3 SECE need in attach mode
840 isNeedDetachMeta = !isOpenSESECE && isCurrentSESECE; // NOT S3 SECE no need meta.db
841 }
842 return E_OK;
843 }
844
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)845 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
846 {
847 SecurityOption existedSecOpt;
848 auto option = GetOption();
849 int errCode = PreCreateExecutor(isWrite, existedSecOpt, option);
850 if (errCode != E_OK) {
851 return errCode;
852 }
853
854 sqlite3 *dbHandle = nullptr;
855 errCode = GetDbHandle(isWrite, dbHandle, option);
856 if (errCode != E_OK) {
857 return errCode;
858 }
859
860 bool isAttachMeta = false;
861 bool isDetachMeta = false;
862 errCode = TryAttachMetaDb(existedSecOpt, dbHandle, isAttachMeta, isDetachMeta, option);
863 if (errCode != E_OK) {
864 return errCode;
865 }
866
867 RegisterFunctionIfNeed(dbHandle, option);
868 errCode = UpgradeInner(dbHandle, option);
869 if (errCode != E_OK) {
870 (void)sqlite3_close_v2(dbHandle);
871 dbHandle = nullptr;
872 return errCode;
873 }
874
875 errCode = EndCreateExecutor(dbHandle, existedSecOpt, isWrite, isDetachMeta, option);
876 if (errCode != E_OK) {
877 LOGE("After create executor, set security option incomplete!");
878 (void)sqlite3_close_v2(dbHandle);
879 dbHandle = nullptr;
880 return errCode;
881 }
882
883 handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option.isMemDb);
884 if (handle == nullptr) {
885 LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
886 (void)sqlite3_close_v2(dbHandle);
887 dbHandle = nullptr;
888 return -E_OUT_OF_MEMORY;
889 }
890 if (isAttachMeta) {
891 SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
892 singleVerHandle->SetAttachMetaMode(isAttachMeta);
893 }
894 return E_OK;
895 }
896
UpgradeInner(sqlite3 * db,const OpenDbProperties & option)897 int SQLiteSingleVerStorageEngine::UpgradeInner(sqlite3 *db, const OpenDbProperties &option)
898 {
899 if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
900 return E_OK;
901 }
902
903 std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
904 LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option.schema.size());
905 if (option.schema.empty()) {
906 upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option.securityOpt, option.isMemDb);
907 } else {
908 SchemaObject schema;
909 int errCode = schema.ParseFromSchemaString(option.schema);
910 if (errCode != E_OK) {
911 LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
912 return errCode;
913 }
914 upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
915 option.securityOpt, option.isMemDb);
916 }
917
918 std::string mainDbDir = GetDbDir(option.subdir, DbType::MAIN);
919 std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::DB_EXTENSION;
920 SecurityOption secOpt = option.securityOpt;
921 int errCode = E_OK;
922 if (isNeedUpdateSecOpt_) {
923 errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
924 if (errCode != E_OK) {
925 LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
926 if (errCode != -E_NOT_SUPPORT) {
927 return errCode;
928 }
929 secOpt = SecurityOption();
930 }
931 }
932
933 upgrader->SetMetaUpgrade(secOpt, option.securityOpt, option.subdir);
934 upgrader->SetSubdir(option.subdir);
935 errCode = upgrader->Upgrade();
936 if (errCode != E_OK) {
937 LOGE("Single ver database upgrade failed:%d", errCode);
938 return errCode;
939 }
940
941 LOGD("Finish upgrade single ver database!");
942 isUpdated_ = true; // Identification to avoid repeated upgrades
943 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
944 isSchemaChanged_ = upgrader->IsValueNeedUpgrade();
945 return errCode;
946 }
947
948 // Attention: This function should be called before "Upgrade".
949 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle,const OpenDbProperties & option) const950 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle, const OpenDbProperties &option) const
951 {
952 // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
953 // not a newly created database, the meta-Table should exist and can be accessed.
954 std::string schemaStr = option.schema;
955 if (schemaStr.empty()) {
956 // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
957 int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
958 if (errCode != E_OK) {
959 LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
960 }
961 }
962 if (!schemaStr.empty()) {
963 // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
964 int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
965 if (errCode != E_OK) { // Not very likely
966 // Just warning, if no index had been or need to be created, then put or kv-get can still use.
967 LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
968 }
969 }
970
971 // This function is used to update meta_data in triggers when it's attached to mainDB
972 int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
973 if (errCode != E_OK) {
974 LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
975 }
976 }
977
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const978 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
979 {
980 int errCode;
981 LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
982 std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
983 DBConstant::SINGLE_VER_META_STORE + DBConstant::DB_EXTENSION;
984 // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
985 if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
986 errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
987 if (errCode != E_OK) {
988 return errCode;
989 }
990 }
991 CipherPassword passwd;
992 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
993 if (errCode != E_OK) {
994 LOGE("AttachNewDatabase fail, errCode = %d", errCode);
995 }
996 return errCode;
997 }
998
ResetCacheRecordVersion()999 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1000 {
1001 (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1002 }
1003
IncreaseCacheRecordVersion()1004 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1005 {
1006 (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1007 }
1008
GetAndIncreaseCacheRecordVersion()1009 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1010 {
1011 return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1012 }
1013
GetCacheRecordVersion() const1014 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1015 {
1016 return cacheRecordVersion_.load(std::memory_order_seq_cst);
1017 }
1018
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1019 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1020 int eventType) const
1021 {
1022 std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1023 if (commitNotifyFunc_ == nullptr) {
1024 LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1025 RefObject::DecObjRef(committedData);
1026 committedData = nullptr;
1027 return;
1028 }
1029 commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1030 committedData = nullptr;
1031 }
1032
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1033 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1034 {
1035 if (committedData == nullptr) {
1036 LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1037 return;
1038 }
1039 auto identifier = GetIdentifier();
1040 auto kvDBManager = KvDBManager::GetInstance();
1041 if (kvDBManager == nullptr) {
1042 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvDBManager is null.");
1043 return;
1044 }
1045 auto kvdb = kvDBManager->FindKvDB(identifier);
1046 if (kvdb == nullptr) {
1047 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1048 return;
1049 }
1050 unsigned int conflictFlag = 0;
1051 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1052 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1053 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1054 }
1055 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1056 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1057 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1058 }
1059 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1060 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1061 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1062 }
1063 RefObject::DecObjRef(kvdb);
1064 LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1065 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1066 }
1067
SetMaxValueSize(uint32_t maxValueSize)1068 void SQLiteSingleVerStorageEngine::SetMaxValueSize(uint32_t maxValueSize)
1069 {
1070 if (maxValueSize_ != maxValueSize) {
1071 LOGI("Set the max value size to %" PRIu32, maxValueSize);
1072 }
1073 maxValueSize_ = maxValueSize;
1074 }
1075
GetMaxValueSize()1076 uint32_t SQLiteSingleVerStorageEngine::GetMaxValueSize()
1077 {
1078 return maxValueSize_;
1079 }
1080
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1081 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1082 {
1083 const auto &isRemote = syncData.isRemote;
1084 const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1085 auto &committedData = syncData.committedData;
1086 auto &entries = syncData.entries;
1087
1088 // Put data. Including insert, update and delete.
1089 if (!isRemoveDeviceData) { // LCOV_EXCL_BR_LINE
1090 if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1091 int eventType = static_cast<int>(isRemote ?
1092 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1093 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1094 CommitAndReleaseNotifyData(committedData, eventType);
1095 }
1096 return;
1097 }
1098
1099 // Remove device data.
1100 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) { // LCOV_EXCL_BR_LINE
1101 return;
1102 }
1103 size_t totalSize = 0;
1104 for (auto iter = entries.begin(); iter != entries.end();) {
1105 auto &entry = *iter;
1106 if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1107 committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1108 if (committedData == nullptr) { // LCOV_EXCL_BR_LINE
1109 LOGE("Alloc committed notify data failed.");
1110 return;
1111 }
1112 }
1113 if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > maxValueSize_) { // LCOV_EXCL_BR_LINE
1114 iter++;
1115 continue;
1116 }
1117 if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) { // LCOV_EXCL_BR_LINE
1118 CommitAndReleaseNotifyData(committedData,
1119 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1120 totalSize = 0;
1121 continue;
1122 }
1123 totalSize += (entry.key.size() + entry.value.size());
1124 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1125 iter++;
1126 }
1127 if (committedData != nullptr) { // LCOV_EXCL_BR_LINE
1128 CommitAndReleaseNotifyData(committedData,
1129 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1130 }
1131 }
1132
1133 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1134 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1135 {
1136 std::lock_guard<std::mutex> lock(subscribeMutex_);
1137 subscribeQuery_[subscribeId] = query;
1138 }
1139
IsUseExistedSecOption(const SecurityOption & existedSecOpt,const SecurityOption & openSecOpt)1140 bool SQLiteSingleVerStorageEngine::IsUseExistedSecOption(const SecurityOption &existedSecOpt,
1141 const SecurityOption &openSecOpt)
1142 {
1143 if (isNeedUpdateSecOpt_) {
1144 return false;
1145 }
1146 if (existedSecOpt.securityLabel != openSecOpt.securityLabel) {
1147 return false;
1148 }
1149 return true;
1150 }
1151
UpgradeLocalMetaData()1152 int SQLiteSingleVerStorageEngine::UpgradeLocalMetaData()
1153 {
1154 std::function<int(void)> schemaChangedFunc = nullptr;
1155 {
1156 std::unique_lock<std::shared_mutex> lock(schemaChangedMutex_);
1157 if (isSchemaChanged_) {
1158 schemaChangedFunc = schemaChangedFunc_;
1159 isSchemaChanged_ = false;
1160 }
1161 }
1162 if (schemaChangedFunc != nullptr) {
1163 return schemaChangedFunc();
1164 }
1165 return E_OK;
1166 }
1167 }
1168