1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_engine.h"
17
18 #include <memory>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_errno.h"
23 #include "kvdb_manager.h"
24 #include "log_print.h"
25 #include "param_check_utils.h"
26 #include "platform_specific.h"
27 #include "runtime_context.h"
28 #include "sqlite_single_ver_database_upgrader.h"
29 #include "sqlite_single_ver_natural_store.h"
30 #include "sqlite_single_ver_schema_database_upgrader.h"
31
32 namespace DistributedDB {
33 namespace {
34 const uint64_t CACHE_RECORD_DEFAULT_VERSION = 1;
GetPathSecurityOption(const std::string & filePath,SecurityOption & secOpt)35 int GetPathSecurityOption(const std::string &filePath, SecurityOption &secOpt)
36 {
37 return RuntimeContext::GetInstance()->GetSecurityOption(filePath, secOpt);
38 }
39
40 enum class DbType : int32_t {
41 MAIN,
42 META,
43 CACHE
44 };
45
GetDbDir(const std::string & subDir,DbType type)46 std::string GetDbDir(const std::string &subDir, DbType type)
47 {
48 switch (type) {
49 case DbType::MAIN:
50 return subDir + "/" + DBConstant::MAINDB_DIR;
51 case DbType::META:
52 return subDir + "/" + DBConstant::METADB_DIR;
53 case DbType::CACHE:
54 return subDir + "/" + DBConstant::CACHEDB_DIR;
55 default:
56 break;
57 }
58 return "";
59 }
60 } // namespace
61
SQLiteSingleVerStorageEngine()62 SQLiteSingleVerStorageEngine::SQLiteSingleVerStorageEngine()
63 : cacheRecordVersion_(CACHE_RECORD_DEFAULT_VERSION),
64 executorState_(ExecutorState::INVALID),
65 isCorrupted_(false),
66 isNeedUpdateSecOpt_(false)
67 {}
68
~SQLiteSingleVerStorageEngine()69 SQLiteSingleVerStorageEngine::~SQLiteSingleVerStorageEngine()
70 {
71 }
72
MigrateLocalData(SQLiteSingleVerStorageExecutor * handle) const73 int SQLiteSingleVerStorageEngine::MigrateLocalData(SQLiteSingleVerStorageExecutor *handle) const
74 {
75 return handle->MigrateLocalData();
76 }
77
EraseDeviceWaterMark(const std::set<std::string> & removeDevices,bool isNeedHash)78 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(const std::set<std::string> &removeDevices, bool isNeedHash)
79 {
80 auto kvdbManager = KvDBManager::GetInstance();
81 if (kvdbManager == nullptr) {
82 return -E_INVALID_DB;
83 }
84 auto identifier = GetIdentifier();
85 auto kvdb = kvdbManager->FindKvDB(identifier);
86 if (kvdb == nullptr) {
87 LOGE("[SingleVerEngine::EraseWaterMark] kvdb is null.");
88 return -E_INVALID_DB;
89 }
90
91 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
92 for (const auto &devId : removeDevices) {
93 int errCode = kvStore->EraseDeviceWaterMark(devId, isNeedHash);
94 if (errCode != E_OK) {
95 RefObject::DecObjRef(kvdb);
96 return errCode;
97 }
98 }
99
100 RefObject::DecObjRef(kvdb);
101 return E_OK;
102 }
103
GetRemoveDataDevices(SQLiteSingleVerStorageExecutor * handle,const DataItem & item,std::set<std::string> & removeDevices,bool & isNeedHash) const104 int SQLiteSingleVerStorageEngine::GetRemoveDataDevices(SQLiteSingleVerStorageExecutor *handle, const DataItem &item,
105 std::set<std::string> &removeDevices, bool &isNeedHash) const
106 {
107 if (handle == nullptr) {
108 return -E_INVALID_DB;
109 }
110 if (item.value.empty()) { // Device ID has been set to value in cache db
111 // Empty means remove all device data, get device id from meta key
112 int errCode = handle->GetExistsDevicesFromMeta(removeDevices);
113 if (errCode != E_OK) {
114 LOGE("Get remove devices list from meta failed. err=%d", errCode);
115 return errCode;
116 }
117 isNeedHash = false;
118 } else {
119 std::string deviceName;
120 DBCommon::VectorToString(item.value, deviceName);
121 removeDevices.insert(deviceName);
122 }
123 return E_OK;
124 }
125
EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor * & handle,const std::vector<DataItem> & dataItems)126 int SQLiteSingleVerStorageEngine::EraseDeviceWaterMark(SQLiteSingleVerStorageExecutor *&handle,
127 const std::vector<DataItem> &dataItems)
128 {
129 int errCode = E_OK;
130 for (const auto &dataItem : dataItems) {
131 if ((dataItem.flag & DataItem::REMOVE_DEVICE_DATA_FLAG) == DataItem::REMOVE_DEVICE_DATA_FLAG ||
132 (dataItem.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) {
133 bool isNeedHash = true;
134 std::set<std::string> removeDevices;
135 errCode = GetRemoveDataDevices(handle, dataItem, removeDevices, isNeedHash);
136 if (errCode != E_OK) {
137 LOGE("Get remove device id failed. err=%d", errCode);
138 return errCode;
139 }
140
141 // sync module will use handle to fix watermark, if fix fail then migrate fail, not need hold write handle
142 errCode = ReleaseExecutor(handle);
143 if (errCode != E_OK) {
144 LOGE("release executor for erase water mark! errCode = [%d]", errCode);
145 return errCode;
146 }
147
148 errCode = EraseDeviceWaterMark(removeDevices, isNeedHash);
149 if (errCode != E_OK) {
150 LOGE("EraseDeviceWaterMark failed when migrating, errCode = [%d]", errCode);
151 return errCode;
152 }
153
154 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM,
155 errCode));
156 if (errCode != E_OK) {
157 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
158 return errCode;
159 }
160 }
161 }
162 return errCode;
163 }
164
MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor * & handle,NotifyMigrateSyncData & syncData,uint64_t & curMigrateVer)165 int SQLiteSingleVerStorageEngine::MigrateSyncDataByVersion(SQLiteSingleVerStorageExecutor *&handle,
166 NotifyMigrateSyncData &syncData, uint64_t &curMigrateVer)
167 {
168 if (syncData.committedData == nullptr) {
169 syncData.committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
170 if (syncData.committedData == nullptr) {
171 LOGE("[SQLiteSingleVerStorageEngine::MigrateSyncData] committedData is null.");
172 return -E_OUT_OF_MEMORY;
173 }
174 }
175 InitConflictNotifiedFlag(syncData.committedData);
176
177 std::vector<DataItem> dataItems;
178 uint64_t minVerIncurCacheDb = 0;
179 int errCode = handle->GetMinVersionCacheData(dataItems, minVerIncurCacheDb);
180 if (errCode != E_OK) {
181 LOGE("[MigrateSyncDataByVersion]Fail to get cur data in cache! err[%d]", errCode);
182 return errCode;
183 }
184
185 if (minVerIncurCacheDb == 0) { // min version in cache db is 1
186 ++curMigrateVer;
187 return E_OK;
188 }
189
190 if (minVerIncurCacheDb != curMigrateVer) { // double check for latest version is migrated
191 curMigrateVer = minVerIncurCacheDb;
192 }
193
194 // Call the syncer module to erase the water mark.
195 errCode = EraseDeviceWaterMark(handle, dataItems);
196 if (errCode != E_OK) {
197 LOGE("[MigrateSyncData] Erase water mark failed:%d", errCode);
198 return errCode;
199 }
200
201 // next version need process
202 LOGD("MigrateVer[%" PRIu64 "], minVer[%" PRIu64 "] maxVer[%" PRIu64 "]",
203 curMigrateVer, minVerIncurCacheDb, GetCacheRecordVersion());
204 errCode = handle->MigrateSyncDataByVersion(curMigrateVer++, syncData, dataItems);
205 if (errCode != E_OK) {
206 LOGE("Migrate sync data fail and rollback, errCode = [%d]", errCode);
207 return errCode;
208 }
209
210 Timestamp timestamp = 0;
211 errCode = handle->GetMaxTimestampDuringMigrating(timestamp);
212 if (errCode == E_OK) {
213 SetMaxTimestamp(timestamp);
214 }
215
216 errCode = ReleaseHandleTransiently(handle, 2ULL, syncData); // temporary release handle 2ms
217 if (errCode != E_OK) {
218 return errCode;
219 }
220
221 return E_OK;
222 }
223
224 // Temporary release handle for idleTime ms, avoid long-term blocking
ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor * & handle,uint64_t idleTime,NotifyMigrateSyncData & syncData)225 int SQLiteSingleVerStorageEngine::ReleaseHandleTransiently(SQLiteSingleVerStorageExecutor *&handle, uint64_t idleTime,
226 NotifyMigrateSyncData &syncData)
227 {
228 int errCode = ReleaseExecutor(handle);
229 if (errCode != E_OK) {
230 LOGE("release executor for reopen database! errCode = [%d]", errCode);
231 return errCode;
232 }
233
234 CommitNotifyForMigrateCache(syncData); // Trigger sync after release handle
235
236 std::this_thread::sleep_for(std::chrono::milliseconds(idleTime)); // Wait 2 ms to free this handle for put data
237 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
238 if (errCode != E_OK) {
239 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
240 return errCode;
241 }
242 return errCode;
243 }
244
AddSubscribeToMainDBInMigrate()245 int SQLiteSingleVerStorageEngine::AddSubscribeToMainDBInMigrate()
246 {
247 LOGD("Add subscribe to mainDB from cache. %d", engineState_);
248 std::lock_guard<std::mutex> lock(subscribeMutex_);
249 if (subscribeQuery_.empty()) {
250 return E_OK;
251 }
252 int errCode = E_OK;
253 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
254 if (errCode != E_OK || handle == nullptr) {
255 LOGE("Get available executor for add subscribe failed. %d", errCode);
256 return errCode;
257 }
258 errCode = handle->StartTransaction(TransactType::IMMEDIATE);
259 if (errCode != E_OK) {
260 goto END;
261 }
262 for (auto item : subscribeQuery_) {
263 errCode = handle->AddSubscribeTrigger(item.second, item.first);
264 if (errCode != E_OK) {
265 LOGE("Add subscribe trigger failed: %d id: %s", errCode, item.first.c_str());
266 }
267 }
268 subscribeQuery_.clear();
269 // Not rollback even if some triggers add failed. Users don’t perceive errors, add triggers as much as possible
270 (void)handle->Commit();
271 END:
272 ReleaseExecutor(handle);
273 return errCode;
274 }
275
MigrateSyncData(SQLiteSingleVerStorageExecutor * & handle,bool & isNeedTriggerSync)276 int SQLiteSingleVerStorageEngine::MigrateSyncData(SQLiteSingleVerStorageExecutor *&handle, bool &isNeedTriggerSync)
277 {
278 int errCode = E_OK;
279 if (handle == nullptr) {
280 handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
281 if (errCode != E_OK) {
282 LOGE("Migrate sync data fail, Can not get available executor, errCode = [%d]", errCode);
283 return errCode;
284 }
285 }
286
287 LOGD("Begin migrate sync data, need migrate version[%" PRIu64 "]", GetCacheRecordVersion() - 1);
288 uint64_t curMigrateVer = 0; // The migration process is asynchronous and continuous
289 NotifyMigrateSyncData syncData;
290 auto kvdbManager = KvDBManager::GetInstance();
291 if (kvdbManager != nullptr) {
292 auto identifier = GetIdentifier();
293 auto kvdb = kvdbManager->FindKvDB(identifier);
294 if (kvdb != nullptr) {
295 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
296 syncData.isPermitForceWrite =
297 !(kvStore->GetDbProperties().GetBoolProp(KvDBProperties::SYNC_DUAL_TUPLE_MODE, false));
298 RefObject::DecObjRef(kvdb);
299 } else {
300 LOGE("[SingleVerEngine] kvdb is null.");
301 }
302 }
303 // cache atomic version represents version of cacheDb input next time
304 while (curMigrateVer < GetCacheRecordVersion()) {
305 errCode = MigrateSyncDataByVersion(handle, syncData, curMigrateVer);
306 if (errCode != E_OK) {
307 LOGE("Migrate version[%" PRIu64 "] failed! errCode = [%d]", curMigrateVer, errCode);
308 break;
309 }
310 if (!syncData.isRemote) {
311 isNeedTriggerSync = true;
312 }
313 }
314 if (syncData.committedData != nullptr) {
315 RefObject::DecObjRef(syncData.committedData);
316 syncData.committedData = nullptr;
317 }
318 // When finished Migrating sync data, will fix engine state
319 return errCode;
320 }
321
AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor * handle,EngineState stateBeforeMigrate)322 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(SQLiteSingleVerStorageExecutor *handle,
323 EngineState stateBeforeMigrate)
324 {
325 LOGD("Begin attach main db and cache db by executor!");
326 // Judge the file corresponding to db by the engine status and attach it to another file
327 int errCode = E_OK;
328 std::string attachAbsPath;
329 if (stateBeforeMigrate == EngineState::MAINDB) {
330 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
331 DBConstant::SQLITE_DB_EXTENSION;
332 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
333 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
334 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
335 DBConstant::SQLITE_DB_EXTENSION;
336 errCode = handle->AttachMainDbAndCacheDb(option_.cipherType, option_.passwd, attachAbsPath, stateBeforeMigrate);
337 } else {
338 return -E_NOT_SUPPORT;
339 }
340 if (errCode != E_OK) {
341 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
342 return errCode;
343 }
344
345 uint64_t maxVersion = 0;
346 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
347 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
348 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
349 }
350
351 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
352 return errCode;
353 }
354
AttachMainDbAndCacheDb(sqlite3 * dbHandle,EngineState stateBeforeMigrate) const355 int SQLiteSingleVerStorageEngine::AttachMainDbAndCacheDb(sqlite3 *dbHandle, EngineState stateBeforeMigrate) const
356 {
357 LOGD("Begin attach main db and cache db by sqlite handle!");
358 // Judge the file corresponding to db by the engine status and attach it to another file
359 int errCode = E_OK;
360 std::string attachAbsPath;
361 if (stateBeforeMigrate == EngineState::MAINDB) {
362 attachAbsPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
363 DBConstant::SQLITE_DB_EXTENSION;
364 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "cache");
365 } else if (stateBeforeMigrate == EngineState::CACHEDB) {
366 attachAbsPath = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
367 DBConstant::SQLITE_DB_EXTENSION;
368 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option_.cipherType, option_.passwd, attachAbsPath, "maindb");
369 } else {
370 return -E_NOT_SUPPORT;
371 }
372 if (errCode != E_OK) {
373 LOGE("Attached database failed, errCode = [%d] engine state = [%d]", errCode, stateBeforeMigrate);
374 return errCode;
375 }
376
377 return errCode;
378 }
379
ReInit()380 int SQLiteSingleVerStorageEngine::ReInit()
381 {
382 return Init();
383 }
384
ReleaseExecutor(SQLiteSingleVerStorageExecutor * & handle)385 int SQLiteSingleVerStorageEngine::ReleaseExecutor(SQLiteSingleVerStorageExecutor *&handle)
386 {
387 if (handle == nullptr) {
388 return E_OK;
389 }
390 StorageExecutor *databaseHandle = handle;
391 isCorrupted_ = isCorrupted_ || handle->GetCorruptedStatus();
392 Recycle(databaseHandle);
393 handle = nullptr;
394 if (isCorrupted_) {
395 LOGE("Database is corrupted or invalid passwd!");
396 return -E_INVALID_PASSWD_OR_CORRUPTED_DB; // Externally imperceptible, used to terminate migration
397 }
398 return E_OK;
399 }
400
FinishMigrateData(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate)401 int SQLiteSingleVerStorageEngine::FinishMigrateData(SQLiteSingleVerStorageExecutor *&handle,
402 EngineState stateBeforeMigrate)
403 {
404 LOGI("Begin to finish migrate and reinit db state!");
405 int errCode;
406 if (handle == nullptr) {
407 return -E_INVALID_ARGS;
408 }
409
410 if (stateBeforeMigrate == EngineState::MAINDB) {
411 sqlite3 *dbHandle = nullptr;
412 errCode = handle->GetDbHandle(dbHandle); // use executor get sqlite3 handle to operating database
413 if (errCode != E_OK) {
414 LOGE("Get Db handle failed! errCode = [%d]", errCode);
415 return errCode;
416 }
417
418 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle, "DETACH 'cache'");
419 if (errCode != E_OK) {
420 LOGE("Execute the SQLite detach failed:%d", errCode);
421 return errCode;
422 }
423 // delete cachedb
424 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
425 if (errCode != E_OK) {
426 LOGE("Remove files of cache database after detach:%d", errCode);
427 }
428
429 SetEngineState(EngineState::MAINDB);
430 return errCode;
431 }
432
433 errCode = ReleaseExecutor(handle);
434 if (errCode != E_OK) {
435 LOGE("Release executor for reopen database! errCode = [%d]", errCode);
436 return errCode;
437 }
438
439 // close db for reinit this engine
440 Release();
441
442 // delete cache db
443 errCode = DBCommon::RemoveAllFilesOfDirectory(GetDbDir(option_.subdir, DbType::CACHE), false);
444 if (errCode != E_OK) {
445 LOGE("Remove files of cache database after release current db:%d", errCode);
446 return errCode;
447 }
448
449 // reInit, it will reset engine state
450 errCode = ReInit();
451 if (errCode != E_OK) {
452 LOGE("Reinit failed when finish migrate data! please try reopen kvstore! errCode = [%d]", errCode);
453 return errCode;
454 }
455
456 return E_OK;
457 }
458
InitExecuteMigrate(SQLiteSingleVerStorageExecutor * handle,EngineState preMigrateState)459 int SQLiteSingleVerStorageEngine::InitExecuteMigrate(SQLiteSingleVerStorageExecutor *handle,
460 EngineState preMigrateState)
461 {
462 // after attach main and cache need change operate data sql, changing state forbid operate database
463 SetEngineState(EngineState::MIGRATING);
464
465 int errCode = E_OK;
466 // check if has been attach and attach cache and main for migrate
467 if (executorState_ == ExecutorState::MAINDB || executorState_ == ExecutorState::CACHEDB) {
468 errCode = AttachMainDbAndCacheDb(handle, preMigrateState);
469 if (errCode != E_OK) {
470 LOGE("[ExeMigrate] Attach main db and cache db failed!, errCode = [%d]", errCode);
471 // For lock state open db, can not attach main and cache
472 return errCode;
473 }
474 } else if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE ||
475 // Has been attach, maybe ever crashed, need update version
476 executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
477 uint64_t maxVersion = 0;
478 errCode = handle->GetMaxVersionInCacheDb(maxVersion);
479 if (errCode != E_OK || maxVersion < CACHE_RECORD_DEFAULT_VERSION) {
480 maxVersion = CACHE_RECORD_DEFAULT_VERSION;
481 }
482 (void)cacheRecordVersion_.store(maxVersion + 1, std::memory_order_seq_cst);
483 } else {
484 return -E_UNEXPECTED_DATA;
485 }
486
487 return errCode;
488 }
489
ExecuteMigrate()490 int SQLiteSingleVerStorageEngine::ExecuteMigrate()
491 {
492 EngineState preState = GetEngineState();
493 std::lock_guard<std::mutex> lock(migrateLock_);
494 if (preState == EngineState::MIGRATING || preState == EngineState::INVALID ||
495 !OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
496 DBConstant::SQLITE_DB_EXTENSION)) {
497 LOGD("[SqlSingleVerEngine] Being single ver migrating or never create db! engine state [%u]", preState);
498 return E_OK;
499 }
500
501 // Get write executor for migrate
502 int errCode = E_OK;
503 auto handle = static_cast<SQLiteSingleVerStorageExecutor *>(FindExecutor(true, OperatePerm::NORMAL_PERM, errCode));
504 if (errCode != E_OK) {
505 LOGE("Migrate data fail, Can not get available executor, errCode = [%d]", errCode);
506 return errCode;
507 }
508
509 isMigrating_.store(true);
510 LOGD("Migrate start.");
511 bool isNeedTriggerSync = false;
512 errCode = InitExecuteMigrate(handle, preState);
513 if (errCode != E_OK) {
514 LOGE("Init migrate data fail, errCode = [%d]", errCode);
515 goto END;
516 }
517
518 LOGD("[SqlSingleVerEngine] Current engineState [%u] executorState [%u], begin to executing singleVer db migrate!",
519 static_cast<unsigned>(preState), static_cast<unsigned>(executorState_));
520 // has been attached, Mark start of migration and it can migrate data
521 errCode = MigrateLocalData(handle);
522 if (errCode != E_OK) {
523 LOGE("Migrate local data fail, errCode = [%d]", errCode);
524 goto END;
525 }
526
527 errCode = MigrateSyncData(handle, isNeedTriggerSync);
528 if (errCode != E_OK) {
529 LOGE("Migrate Sync data fail, errCode = [%d]", errCode);
530 goto END;
531 }
532
533 SetEngineState(EngineState::ENGINE_BUSY); // temp forbid use handle and engine for detach and close executor
534
535 // detach database and delete cachedb
536 errCode = FinishMigrateData(handle, preState);
537 if (errCode != E_OK) {
538 LOGE("Finish migrating data fail, errCode = [%d]", errCode);
539 goto END;
540 }
541
542 END: // after FinishMigrateData, it will reset engine state
543 // there is no need cover the errCode
544 EndMigrate(handle, preState, errCode, isNeedTriggerSync);
545 isMigrating_.store(false);
546 LOGD("Migrate stop.");
547 return errCode;
548 }
549
EndMigrate(SQLiteSingleVerStorageExecutor * & handle,EngineState stateBeforeMigrate,int errCode,bool isNeedTriggerSync)550 void SQLiteSingleVerStorageEngine::EndMigrate(SQLiteSingleVerStorageExecutor *&handle, EngineState stateBeforeMigrate,
551 int errCode, bool isNeedTriggerSync)
552 {
553 LOGD("Finish migrating data! errCode = [%d]", errCode);
554 if (errCode != E_OK) {
555 SetEngineState(stateBeforeMigrate);
556 }
557 if (handle != nullptr) {
558 handle->ClearMigrateData();
559 }
560 errCode = ReleaseExecutor(handle);
561 if (errCode != E_OK) {
562 LOGE("release executor after migrating! errCode = [%d]", errCode);
563 }
564
565 errCode = AddSubscribeToMainDBInMigrate();
566 if (errCode != E_OK) {
567 LOGE("Add subscribe trigger after migrate sync data failed: %d", errCode);
568 }
569
570 // Notify max timestamp offset for SyncEngine.
571 // When time change offset equals 0, SyncEngine can adjust local time offset according to max timestamp.
572 RuntimeContext::GetInstance()->NotifyTimestampChanged(0);
573 if (isNeedTriggerSync) {
574 commitNotifyFunc_(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_FINISH_MIGRATE_EVENT),
575 nullptr);
576 }
577 return;
578 }
579
IsEngineCorrupted() const580 bool SQLiteSingleVerStorageEngine::IsEngineCorrupted() const
581 {
582 return isCorrupted_;
583 }
584
NewSQLiteStorageExecutor(sqlite3 * dbHandle,bool isWrite,bool isMemDb)585 StorageExecutor *SQLiteSingleVerStorageEngine::NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb)
586 {
587 auto executor = new (std::nothrow) SQLiteSingleVerStorageExecutor(dbHandle, isWrite, isMemDb, executorState_);
588 if (executor == nullptr) {
589 return executor;
590 }
591 executor->SetConflictResolvePolicy(option_.conflictReslovePolicy);
592 return executor;
593 }
594
TryToOpenMainDatabase(bool isWrite,sqlite3 * & db)595 int SQLiteSingleVerStorageEngine::TryToOpenMainDatabase(bool isWrite, sqlite3 *&db)
596 {
597 // Only could get the main database handle in the uninitialized and the main status.
598 if (GetEngineState() != EngineState::INVALID && GetEngineState() != EngineState::MAINDB) {
599 LOGE("[SQLiteSinStoreEng][GetMainHandle] Can only create new handle for state[%d]", GetEngineState());
600 return -E_EKEYREVOKED;
601 }
602
603 if (!option_.isMemDb) {
604 option_.uri = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
605 DBConstant::SQLITE_DB_EXTENSION;
606 }
607
608 OpenDbProperties optionTemp = option_;
609 if (!isWrite) {
610 optionTemp.createIfNecessary = false;
611 }
612
613 int errCode = SQLiteUtils::OpenDatabase(optionTemp, db);
614 if (errCode != E_OK) {
615 if (errno == EKEYREVOKED) {
616 LOGI("Failed to open the main database for key revoked[%d]", errCode);
617 errCode = -E_EKEYREVOKED;
618 }
619 return errCode;
620 }
621
622 executorState_ = ExecutorState::MAINDB;
623 // Set the engine state to main status for that the main database is valid.
624 SetEngineState(EngineState::MAINDB);
625
626 if (OS::CheckPathExistence(GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
627 DBConstant::SQLITE_DB_EXTENSION)) {
628 // In status cacheDb crash
629 errCode = AttachMainDbAndCacheDb(db, EngineState::MAINDB);
630 if (errCode != E_OK) {
631 LOGE("[SingleVerEngine][GetMain] Attach main db and cache db failed!, errCode = [%d]", errCode);
632 return E_OK; // not care err to return, only use for print log
633 }
634 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
635 // cache and main existed together, can not read data, must execute migrate first
636 SetEngineState(EngineState::ATTACHING);
637 }
638
639 return errCode;
640 }
641
GetDbHandle(bool isWrite,const SecurityOption & secOpt,sqlite3 * & dbHandle)642 int SQLiteSingleVerStorageEngine::GetDbHandle(bool isWrite, const SecurityOption &secOpt, sqlite3 *&dbHandle)
643 {
644 int errCode = TryToOpenMainDatabase(isWrite, dbHandle);
645 LOGD("Finish to open the main database, write[%d], label[%d], flag[%d], id[%.6s], errCode[%d]", isWrite,
646 secOpt.securityLabel, secOpt.securityFlag, DBCommon::TransferStringToHex(identifier_).c_str(), errCode);
647 if (!(ParamCheckUtils::IsS3SECEOpt(secOpt) && errCode == -E_EKEYREVOKED)) {
648 return errCode;
649 }
650
651 std::string cacheDbPath = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
652 DBConstant::SQLITE_DB_EXTENSION;
653 if (!isWrite || GetEngineState() != EngineState::INVALID ||
654 OS::CheckPathExistence(cacheDbPath)) {
655 LOGI("[SQLiteSingleStorageEng][GetDbHandle] Only use for first create cache db! [%d] [%d]",
656 isWrite, GetEngineState());
657 return -E_EKEYREVOKED;
658 }
659
660 errCode = GetCacheDbHandle(dbHandle);
661 if (errCode != E_OK) {
662 LOGE("singleVerStorageEngine::GetDbHandle get cache handle fail! errCode = [%d]", errCode);
663 return errCode;
664 }
665 SetEngineState(EngineState::CACHEDB);
666 executorState_ = ExecutorState::CACHEDB;
667
668 ResetCacheRecordVersion();
669 // Get handle means maindb file ekeyevoked, not need attach to
670 return errCode;
671 }
672
673 namespace CacheDbSqls {
674 const std::string CREATE_CACHE_LOCAL_TABLE_SQL =
675 "CREATE TABLE IF NOT EXISTS local_data(" \
676 "key BLOB NOT NULL," \
677 "value BLOB," \
678 "timestamp INT," \
679 "hash_key BLOB PRIMARY KEY NOT NULL," \
680 "flag INT NOT NULL);";
681
682 const std::string CREATE_CACHE_SYNC_TABLE_SQL =
683 "CREATE TABLE IF NOT EXISTS sync_data(" \
684 "key BLOB NOT NULL," \
685 "value BLOB," \
686 "timestamp INT NOT NULL," \
687 "flag INT NOT NULL," \
688 "device BLOB," \
689 "ori_device BLOB," \
690 "hash_key BLOB NOT NULL," \
691 "w_timestamp INT," \
692 "version INT NOT NULL," \
693 "PRIMARY Key(version, hash_key));";
694 }
695
696 // Warning: Use error passwd create cache database can not check, it will create error passwd cache db,
697 // And make migrate data failed! This cache db will not be open correctly.
GetCacheDbHandle(sqlite3 * & db)698 int SQLiteSingleVerStorageEngine::GetCacheDbHandle(sqlite3 *&db)
699 {
700 option_.uri = GetDbDir(option_.subdir, DbType::CACHE) + "/" + DBConstant::SINGLE_VER_CACHE_STORE +
701 DBConstant::SQLITE_DB_EXTENSION;
702 // creatTable
703 option_.sqls = {
704 CacheDbSqls::CREATE_CACHE_LOCAL_TABLE_SQL,
705 CacheDbSqls::CREATE_CACHE_SYNC_TABLE_SQL
706 };
707
708 if (!option_.createIfNecessary) {
709 std::string mainDbPtah = GetDbDir(option_.subdir, DbType::MAIN) + "/" + DBConstant::SINGLE_VER_DATA_STORE +
710 DBConstant::SQLITE_DB_EXTENSION;
711 if (!OS::CheckPathExistence(mainDbPtah)) { // Whether to create a cacheDb is based on whether the mainDb exists
712 return -E_INVALID_DB;
713 }
714 }
715
716 OpenDbProperties option = option_; // copy for no change it
717 option.createIfNecessary = true;
718 int errCode = SQLiteUtils::OpenDatabase(option, db);
719 if (errCode != E_OK) {
720 LOGE("Get CacheDb handle failed, errCode = [%d], errno = [%d]", errCode, errno);
721 return errCode;
722 }
723 return errCode;
724 }
725
CheckDatabaseSecOpt(const SecurityOption & secOption) const726 int SQLiteSingleVerStorageEngine::CheckDatabaseSecOpt(const SecurityOption &secOption) const
727 {
728 if (!(secOption == option_.securityOpt) &&
729 secOption.securityLabel != SecurityLabel::NOT_SET &&
730 option_.securityOpt.securityLabel != SecurityLabel::NOT_SET) {
731 LOGE("SecurityOption mismatch, existed:[%d-%d] vs input:[%d-%d]", secOption.securityLabel,
732 secOption.securityFlag, option_.securityOpt.securityLabel, option_.securityOpt.securityFlag);
733 return -E_SECURITY_OPTION_CHECK_ERROR;
734 }
735 return E_OK;
736 }
737
CreateNewDirsAndSetSecOpt() const738 int SQLiteSingleVerStorageEngine::CreateNewDirsAndSetSecOpt() const
739 {
740 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
741 for (const auto &item : dbDir) {
742 if (OS::CheckPathExistence(option_.subdir + "/" + item)) {
743 continue;
744 }
745
746 // Dir and old db file not existed, it means that the database is newly created
747 // need create flag of database not incomplete
748 if (!OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
749 !OS::CheckPathExistence(option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE +
750 DBConstant::SQLITE_DB_EXTENSION) &&
751 OS::CreateFileByFileName(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
752 LOGE("Fail to create the token of database incompleted! errCode = [E_SYSTEM_API_FAIL]");
753 return -E_SYSTEM_API_FAIL;
754 }
755
756 if (DBCommon::CreateDirectory(option_.subdir + "/" + item) != E_OK) {
757 LOGE("Create sub-directory for single ver failed, errno:%d", errno);
758 return -E_SYSTEM_API_FAIL;
759 }
760
761 if (option_.securityOpt.securityLabel == NOT_SET) {
762 continue;
763 }
764
765 SecurityOption option = option_.securityOpt;
766 if (item == DBConstant::METADB_DIR) {
767 option.securityLabel = ((option_.securityOpt.securityLabel >= SecurityLabel::S2) ?
768 SecurityLabel::S2 : option_.securityOpt.securityLabel);
769 option.securityFlag = SecurityFlag::ECE;
770 }
771
772 int errCode = RuntimeContext::GetInstance()->SetSecurityOption(option_.subdir + "/" + item, option);
773 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
774 LOGE("Set the security option of sub-directory failed[%d]", errCode);
775 return errCode;
776 }
777 }
778 return E_OK;
779 }
780
GetExistedSecOption(SecurityOption & secOption) const781 int SQLiteSingleVerStorageEngine::GetExistedSecOption(SecurityOption &secOption) const
782 {
783 // Check the existence of the database, include the origin database and the database in the 'main' directory.
784 auto mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
785 auto mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
786 auto origDbFilePath = option_.subdir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
787 if (!OS::CheckPathExistence(origDbFilePath) && !OS::CheckPathExistence(mainDbFilePath)) {
788 secOption = option_.securityOpt;
789 return E_OK;
790 }
791
792 // the main database file has high priority of the security option.
793 int errCode;
794 if (OS::CheckPathExistence(mainDbFilePath)) {
795 errCode = GetPathSecurityOption(mainDbFilePath, secOption);
796 } else {
797 errCode = GetPathSecurityOption(origDbFilePath, secOption);
798 }
799 if (errCode != E_OK) {
800 secOption = SecurityOption();
801 if (errCode == -E_NOT_SUPPORT) {
802 return E_OK;
803 }
804 LOGE("Get the security option of the existed database failed.");
805 }
806 return errCode;
807 }
808
ClearCorruptedFlag()809 void SQLiteSingleVerStorageEngine::ClearCorruptedFlag()
810 {
811 isCorrupted_ = false;
812 }
813
PreCreateExecutor(bool isWrite)814 int SQLiteSingleVerStorageEngine::PreCreateExecutor(bool isWrite)
815 {
816 // Assume that create the write executor firstly and the write one we will not be released.
817 // If the write one would be released in the future, should take care the pass through.
818 if (!isWrite) {
819 return E_OK;
820 }
821
822 if (option_.isMemDb) {
823 return E_OK;
824 }
825
826 // Get the existed database secure option.
827 SecurityOption existedSecOpt;
828 int errCode = GetExistedSecOption(existedSecOpt);
829 if (errCode != E_OK) {
830 return errCode;
831 }
832
833 errCode = CheckDatabaseSecOpt(existedSecOpt);
834 if (errCode != E_OK) {
835 return errCode;
836 }
837
838 // Judge whether need update the security option of the engine.
839 // Should update the security in the import or rekey scene(inner).
840 if (!isNeedUpdateSecOpt_) {
841 option_.securityOpt = existedSecOpt;
842 }
843
844 errCode = CreateNewDirsAndSetSecOpt();
845 if (errCode != E_OK) {
846 return errCode;
847 }
848
849 if (!isUpdated_) {
850 errCode = SQLiteSingleVerDatabaseUpgrader::TransferDatabasePath(option_.subdir, option_);
851 if (errCode != E_OK) {
852 LOGE("[PreCreateExecutor] Transfer Db file path failed[%d].", errCode);
853 return errCode;
854 }
855 }
856
857 return E_OK;
858 }
859
EndCreateExecutor(bool isWrite)860 int SQLiteSingleVerStorageEngine::EndCreateExecutor(bool isWrite)
861 {
862 if (option_.isMemDb || !isWrite) {
863 return E_OK;
864 }
865
866 int errCode = SQLiteSingleVerDatabaseUpgrader::SetSecOption(option_.subdir, option_.securityOpt,
867 isNeedUpdateSecOpt_);
868 if (errCode != E_OK) {
869 if (errCode == -E_NOT_SUPPORT) {
870 option_.securityOpt = SecurityOption();
871 errCode = E_OK;
872 }
873 LOGE("SetSecOption failed:%d", errCode);
874 return errCode;
875 }
876
877 // after setting secOption, the database file operation ends
878 // database create completed, delete the token
879 if (OS::CheckPathExistence(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) &&
880 OS::RemoveFile(option_.subdir + DBConstant::PATH_POSTFIX_DB_INCOMPLETE) != E_OK) {
881 LOGE("Finish to create the complete database, but delete token fail! errCode = [E_SYSTEM_API_FAIL]");
882 return -E_SYSTEM_API_FAIL;
883 }
884 return errCode;
885 }
886
TryAttachMetaDb(sqlite3 * & dbHandle,bool & isAttachMeta)887 int SQLiteSingleVerStorageEngine::TryAttachMetaDb(sqlite3 *&dbHandle, bool &isAttachMeta)
888 {
889 // attach or not depend on its true secOpt, but it's not permit while option_.secOpt different from true secOpt
890 if ((!option_.isMemDb) && (ParamCheckUtils::IsS3SECEOpt(option_.securityOpt))) {
891 int errCode = AttachMetaDatabase(dbHandle, option_);
892 if (errCode != E_OK) {
893 (void)sqlite3_close_v2(dbHandle);
894 dbHandle = nullptr;
895 return errCode;
896 }
897 isAttachMeta = true;
898 }
899 return E_OK;
900 }
901
CreateNewExecutor(bool isWrite,StorageExecutor * & handle)902 int SQLiteSingleVerStorageEngine::CreateNewExecutor(bool isWrite, StorageExecutor *&handle)
903 {
904 int errCode = PreCreateExecutor(isWrite);
905 if (errCode != E_OK) {
906 return errCode;
907 }
908
909 sqlite3 *dbHandle = nullptr;
910 errCode = GetDbHandle(isWrite, option_.securityOpt, dbHandle);
911 if (errCode != E_OK) {
912 return errCode;
913 }
914
915 bool isAttachMeta = false;
916 errCode = TryAttachMetaDb(dbHandle, isAttachMeta);
917 if (errCode != E_OK) {
918 return errCode;
919 }
920
921 RegisterFunctionIfNeed(dbHandle);
922 errCode = Upgrade(dbHandle);
923 if (errCode != E_OK) {
924 (void)sqlite3_close_v2(dbHandle);
925 dbHandle = nullptr;
926 return errCode;
927 }
928
929 errCode = EndCreateExecutor(isWrite);
930 if (errCode != E_OK) {
931 LOGE("After create executor, set security option incomplete!");
932 (void)sqlite3_close_v2(dbHandle);
933 dbHandle = nullptr;
934 return errCode;
935 }
936
937 handle = NewSQLiteStorageExecutor(dbHandle, isWrite, option_.isMemDb);
938 if (handle == nullptr) {
939 LOGE("New SQLiteStorageExecutor[%d] for the pool failed.", isWrite);
940 (void)sqlite3_close_v2(dbHandle);
941 dbHandle = nullptr;
942 return -E_OUT_OF_MEMORY;
943 }
944 if (isAttachMeta) {
945 SQLiteSingleVerStorageExecutor *singleVerHandle = static_cast<SQLiteSingleVerStorageExecutor *>(handle);
946 singleVerHandle->SetAttachMetaMode(isAttachMeta);
947 }
948 return E_OK;
949 }
950
Upgrade(sqlite3 * db)951 int SQLiteSingleVerStorageEngine::Upgrade(sqlite3 *db)
952 {
953 if (isUpdated_ || GetEngineState() == EngineState::CACHEDB) {
954 LOGI("Storage engine is in cache status or has been upgraded[%d]!", isUpdated_);
955 return E_OK;
956 }
957
958 std::unique_ptr<SQLiteSingleVerDatabaseUpgrader> upgrader;
959 LOGD("[SqlSingleEngine][Upgrade] NewSchemaStrSize=%zu", option_.schema.size());
960 if (option_.schema.empty()) {
961 upgrader = std::make_unique<SQLiteSingleVerDatabaseUpgrader>(db, option_.securityOpt, option_.isMemDb);
962 } else {
963 SchemaObject schema;
964 int errCode = schema.ParseFromSchemaString(option_.schema);
965 if (errCode != E_OK) {
966 LOGE("Upgrader failed while parsing the origin schema:%d", errCode);
967 return errCode;
968 }
969 upgrader = std::make_unique<SQLiteSingleVerSchemaDatabaseUpgrader>(db, schema,
970 option_.securityOpt, option_.isMemDb);
971 }
972
973 std::string mainDbDir = GetDbDir(option_.subdir, DbType::MAIN);
974 std::string mainDbFilePath = mainDbDir + "/" + DBConstant::SINGLE_VER_DATA_STORE + DBConstant::SQLITE_DB_EXTENSION;
975 SecurityOption secOpt = option_.securityOpt;
976 int errCode = E_OK;
977 if (isNeedUpdateSecOpt_) {
978 errCode = GetPathSecurityOption(mainDbFilePath, secOpt);
979 if (errCode != E_OK) {
980 LOGI("[SingleVerStorageEngine::Upgrade] Failed to get the path security option, errCode = [%d]", errCode);
981 if (errCode != -E_NOT_SUPPORT) {
982 return errCode;
983 }
984 secOpt = SecurityOption();
985 }
986 }
987
988 upgrader->SetMetaUpgrade(secOpt, option_.securityOpt, option_.subdir);
989 upgrader->SetSubdir(option_.subdir);
990 errCode = upgrader->Upgrade();
991 if (errCode != E_OK) {
992 LOGE("Single ver database upgrade failed:%d", errCode);
993 return errCode;
994 }
995
996 LOGD("Finish upgrade single ver database!");
997 isUpdated_ = true; // Identification to avoid repeated upgrades
998 return errCode;
999 }
1000
1001 // Attention: This function should be called before "Upgrade".
1002 // Attention: This function should be called for each executor on the sqlite3 handle that the executor binds to.
RegisterFunctionIfNeed(sqlite3 * dbHandle) const1003 void SQLiteSingleVerStorageEngine::RegisterFunctionIfNeed(sqlite3 *dbHandle) const
1004 {
1005 // This function should accept a sqlite3 handle with no perception of database classification. That is, if it is
1006 // not a newly created database, the meta-Table should exist and can be accessed.
1007 std::string schemaStr = option_.schema;
1008 if (schemaStr.empty()) {
1009 // If schema from GetKvStore::Option is empty, we have to try to load it from database. ReadOnly mode if exist;
1010 int errCode = SQLiteUtils::GetSchema(dbHandle, schemaStr);
1011 if (errCode != E_OK) {
1012 LOGD("[SqlSinEngine] Can't get schema from db[%d], maybe it is just created or not a schema-db.", errCode);
1013 }
1014 }
1015 if (!schemaStr.empty()) {
1016 // This must be a Schema-Database, if it is Json-Schema, the Register will do nothing and return E_OK
1017 int errCode = SQLiteUtils::RegisterFlatBufferFunction(dbHandle, schemaStr);
1018 if (errCode != E_OK) { // Not very likely
1019 // Just warning, if no index had been or need to be created, then put or kv-get can still use.
1020 LOGW("[SqlSinEngine] RegisterFlatBufferExtractFunction fail, errCode = %d", errCode);
1021 }
1022 }
1023
1024 // This function is used to update meta_data in triggers when it's attached to mainDB
1025 int errCode = SQLiteUtils::RegisterMetaDataUpdateFunction(dbHandle);
1026 if (errCode != E_OK) {
1027 LOGW("[SqlSinEngine] RegisterMetaDataUpdateFunction fail, errCode = %d", errCode);
1028 }
1029 }
1030
AttachMetaDatabase(sqlite3 * dbHandle,const OpenDbProperties & option) const1031 int SQLiteSingleVerStorageEngine::AttachMetaDatabase(sqlite3 *dbHandle, const OpenDbProperties &option) const
1032 {
1033 int errCode;
1034 LOGD("SQLiteSingleVerStorageEngine begin attach metaDb!");
1035 std::string metaDbPath = option.subdir + "/" + DBConstant::METADB_DIR + "/" +
1036 DBConstant::SINGLE_VER_META_STORE + DBConstant::SQLITE_DB_EXTENSION;
1037 // attach metaDb may failed while createIfNecessary is false, here need to create metaDb first.
1038 if (!option.createIfNecessary && !OS::CheckPathExistence(metaDbPath)) {
1039 errCode = SQLiteUtils::CreateMetaDatabase(metaDbPath);
1040 if (errCode != E_OK) {
1041 return errCode;
1042 }
1043 }
1044 CipherPassword passwd;
1045 errCode = SQLiteUtils::AttachNewDatabase(dbHandle, option.cipherType, passwd, metaDbPath, "meta");
1046 if (errCode != E_OK) {
1047 LOGE("AttachNewDatabase fail, errCode = %d", errCode);
1048 }
1049 return errCode;
1050 }
1051
ResetCacheRecordVersion()1052 void SQLiteSingleVerStorageEngine::ResetCacheRecordVersion()
1053 {
1054 (void)cacheRecordVersion_.store(CACHE_RECORD_DEFAULT_VERSION, std::memory_order_seq_cst);
1055 }
1056
IncreaseCacheRecordVersion()1057 void SQLiteSingleVerStorageEngine::IncreaseCacheRecordVersion()
1058 {
1059 (void)cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1060 }
1061
GetAndIncreaseCacheRecordVersion()1062 uint64_t SQLiteSingleVerStorageEngine::GetAndIncreaseCacheRecordVersion()
1063 {
1064 return cacheRecordVersion_.fetch_add(1, std::memory_order_seq_cst);
1065 }
1066
GetCacheRecordVersion() const1067 uint64_t SQLiteSingleVerStorageEngine::GetCacheRecordVersion() const
1068 {
1069 return cacheRecordVersion_.load(std::memory_order_seq_cst);
1070 }
1071
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,int eventType) const1072 void SQLiteSingleVerStorageEngine::CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData *&committedData,
1073 int eventType) const
1074 {
1075 std::shared_lock<std::shared_mutex> lock(notifyMutex_);
1076 if (commitNotifyFunc_ == nullptr) {
1077 LOGE("commitNotifyFunc_ is nullptr, can't notify now.");
1078 RefObject::DecObjRef(committedData);
1079 committedData = nullptr;
1080 return;
1081 }
1082 commitNotifyFunc_(eventType, static_cast<KvDBCommitNotifyFilterAbleData *>(committedData));
1083 committedData = nullptr;
1084 }
1085
InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData * & committedData) const1086 void SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag(SingleVerNaturalStoreCommitNotifyData *&committedData) const
1087 {
1088 if (committedData == nullptr) {
1089 LOGI("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] committedData is null.");
1090 return;
1091 }
1092 auto identifier = GetIdentifier();
1093 auto kvdb = KvDBManager::GetInstance()->FindKvDB(identifier);
1094 if (kvdb == nullptr) {
1095 LOGE("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] kvdb is null.");
1096 return;
1097 }
1098 unsigned int conflictFlag = 0;
1099 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1100 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ONLY) != 0) {
1101 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ONLY);
1102 }
1103 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1104 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_FOREIGN_KEY_ORIG) != 0) {
1105 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_FOREIGN_KEY_ORIG);
1106 }
1107 if (static_cast<GenericKvDB *>(kvdb)->GetRegisterFunctionCount(
1108 RegisterFuncType::CONFLICT_SINGLE_VERSION_NS_NATIVE_ALL) != 0) {
1109 conflictFlag |= static_cast<unsigned>(SQLiteGeneralNSConflictType::SQLITE_GENERAL_NS_NATIVE_ALL);
1110 }
1111 RefObject::DecObjRef(kvdb);
1112 LOGD("[SQLiteSingleVerStorageEngine::InitConflictNotifiedFlag] conflictFlag Flag: %u", conflictFlag);
1113 committedData->SetConflictedNotifiedFlag(static_cast<int>(conflictFlag));
1114 }
1115
SetMaxTimestamp(Timestamp maxTimestamp) const1116 void SQLiteSingleVerStorageEngine::SetMaxTimestamp(Timestamp maxTimestamp) const
1117 {
1118 auto kvdbManager = KvDBManager::GetInstance();
1119 if (kvdbManager == nullptr) {
1120 return;
1121 }
1122 auto identifier = GetIdentifier();
1123 auto kvdb = kvdbManager->FindKvDB(identifier);
1124 if (kvdb == nullptr) {
1125 LOGE("[SQLiteSingleVerStorageEngine::SetMaxTimestamp] kvdb is null.");
1126 return;
1127 }
1128
1129 auto kvStore = static_cast<SQLiteSingleVerNaturalStore *>(kvdb);
1130 kvStore->SetMaxTimestamp(maxTimestamp);
1131 RefObject::DecObjRef(kvdb);
1132 }
1133
CommitNotifyForMigrateCache(NotifyMigrateSyncData & syncData) const1134 void SQLiteSingleVerStorageEngine::CommitNotifyForMigrateCache(NotifyMigrateSyncData &syncData) const
1135 {
1136 const auto &isRemote = syncData.isRemote;
1137 const auto &isRemoveDeviceData = syncData.isRemoveDeviceData;
1138 auto &committedData = syncData.committedData;
1139 auto &entries = syncData.entries;
1140
1141 // Put data. Including insert, update and delete.
1142 if (!isRemoveDeviceData) {
1143 if (committedData != nullptr) {
1144 int eventType = static_cast<int>(isRemote ?
1145 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT :
1146 SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT);
1147 CommitAndReleaseNotifyData(committedData, eventType);
1148 }
1149 return;
1150 }
1151
1152 // Remove device data.
1153 if (entries.empty() || entries.size() > MAX_TOTAL_NOTIFY_ITEM_SIZE) {
1154 return;
1155 }
1156 size_t totalSize = 0;
1157 for (auto iter = entries.begin(); iter != entries.end();) {
1158 auto &entry = *iter;
1159 if (committedData == nullptr) {
1160 committedData = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData();
1161 if (committedData == nullptr) {
1162 LOGE("Alloc committed notify data failed.");
1163 return;
1164 }
1165 }
1166 if (entry.key.size() > DBConstant::MAX_KEY_SIZE || entry.value.size() > DBConstant::MAX_VALUE_SIZE) {
1167 iter++;
1168 continue;
1169 }
1170 if (entry.key.size() + entry.value.size() + totalSize > MAX_TOTAL_NOTIFY_DATA_SIZE) {
1171 CommitAndReleaseNotifyData(committedData,
1172 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1173 totalSize = 0;
1174 continue;
1175 }
1176 totalSize += (entry.key.size() + entry.value.size());
1177 committedData->InsertCommittedData(std::move(entry), DataType::DELETE, false);
1178 iter++;
1179 }
1180 if (committedData != nullptr) {
1181 CommitAndReleaseNotifyData(committedData,
1182 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_SYNC_EVENT));
1183 }
1184 }
1185
1186 // Cache subscribe when engine state is CACHE mode, and its will be applied at the beginning of migrate.
CacheSubscribe(const std::string & subscribeId,const QueryObject & query)1187 void SQLiteSingleVerStorageEngine::CacheSubscribe(const std::string &subscribeId, const QueryObject &query)
1188 {
1189 std::lock_guard<std::mutex> lock(subscribeMutex_);
1190 subscribeQuery_[subscribeId] = query;
1191 }
1192 }
1193