1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31 int errCode = -E_NOT_SUPPORT;
32 if (type == SingleVerDataType::LOCAL_TYPE) {
33 std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34 INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35 std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36 UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37 errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38 saveLocalStatements_);
39 } else if (type == SingleVerDataType::SYNC_TYPE) {
40 std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42 std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43 UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44 std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45 SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46 errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47 }
48 if (errCode != E_OK) {
49 LOGE("Prepare to save sync cache data failed:%d", errCode);
50 }
51 return CheckCorruptedStatus(errCode);
52 }
53
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56 int errCode = E_OK;
57 if (type == SingleVerDataType::LOCAL_TYPE) {
58 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61 } else if (type == SingleVerDataType::SYNC_TYPE) {
62 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65 }
66
67 return CheckCorruptedStatus(errCode);
68 }
69
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72 int errCode = E_OK;
73 SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74 SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75 SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76
77 return CheckCorruptedStatus(errCode);
78 }
79
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81 bool isNeedNotify, uint64_t recordVersion) const
82 {
83 // device name always hash string.
84 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85
86 Key hashKey;
87 int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88 if (errCode != E_OK) {
89 return errCode;
90 }
91
92 DataItem dataItem;
93 dataItem.key = REMOVE_DEVICE_DATA_KEY;
94 dataItem.value = devVect;
95 if (isNeedNotify) {
96 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97 } else {
98 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99 }
100
101 sqlite3_stmt *statement = nullptr;
102 std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105 if (errCode != E_OK) {
106 goto ERROR;
107 }
108
109 errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110 if (errCode != E_OK) {
111 goto ERROR;
112 }
113
114 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116 LOGE("Failed to execute rm the device synced data:%d", errCode);
117 } else {
118 errCode = E_OK;
119 }
120
121 ERROR:
122 SQLiteUtils::ResetStatement(statement, true, errCode);
123 return CheckCorruptedStatus(errCode);
124 }
125
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const126 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
127 std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
128 {
129 std::string sql;
130 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
131 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
132 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
133 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
134 } else {
135 return -E_INVALID_ARGS;
136 }
137
138 sqlite3_stmt *statement = nullptr;
139 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
140 if (errCode != E_OK) {
141 LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
142 goto END;
143 }
144
145 errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
146 if (errCode != E_OK) {
147 LOGE("Failed to get all the data items by the min version:[%d]", errCode);
148 }
149
150 END:
151 SQLiteUtils::ResetStatement(statement, true, errCode);
152 return CheckCorruptedStatus(errCode);
153 }
154
MigrateRmDevData(const DataItem & dataItem) const155 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
156 {
157 if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
158 LOGE("This item not means remove devices data, can not continue exe!");
159 return -E_INVALID_ARGS;
160 }
161
162 std::string sql;
163 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
164 sql = REMOVE_DEV_DATA_SQL;
165 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
166 sql = REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
167 } else {
168 return -E_INVALID_ARGS;
169 }
170
171 sqlite3_stmt *statement = nullptr;
172 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
173 if (errCode != E_OK) {
174 LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
175 return CheckCorruptedStatus(errCode);
176 }
177
178 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
179 if (errCode != E_OK) {
180 LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
181 goto END;
182 }
183
184 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
185 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
186 errCode = E_OK;
187 }
188 END:
189 SQLiteUtils::ResetStatement(statement, true, errCode);
190 return CheckCorruptedStatus(errCode);
191 }
192
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)193 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
194 const std::string &attachDbAbsPath, EngineState engineState)
195 {
196 std::string attachAsName;
197 if (engineState == EngineState::MAINDB) {
198 attachAsName = "cache";
199 } else if (engineState == EngineState::CACHEDB) {
200 attachAsName = "maindb";
201 } else if (engineState == EngineState::ATTACHING) {
202 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
203 return E_OK;
204 } else {
205 return -E_INVALID_ARGS;
206 }
207
208 int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
209 if (errCode != E_OK) {
210 LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
211 return CheckCorruptedStatus(errCode);
212 }
213
214 if (engineState == EngineState::MAINDB) {
215 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
216 } else if (engineState == EngineState::CACHEDB) {
217 executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
218 } else {
219 return -E_INVALID_ARGS;
220 }
221 LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
222 static_cast<unsigned>(executorState_));
223 return errCode;
224 }
225
GetMaxVersionInCacheDb(uint64_t & maxVersion) const226 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
227 {
228 sqlite3_stmt *statement = nullptr;
229 std::string sql;
230 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
231 sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
232 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
233 sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
234 } else {
235 return -E_INVALID_ARGS;
236 }
237
238 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
239 if (errCode != E_OK) {
240 LOGE("GetStatement fail when get max version in cache db");
241 return CheckCorruptedStatus(errCode);
242 }
243
244 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
245 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
246 maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
247 errCode = E_OK;
248 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
249 maxVersion = 0;
250 errCode = E_OK;
251 }
252 SQLiteUtils::ResetStatement(statement, true, errCode);
253 return CheckCorruptedStatus(errCode);
254 }
255
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)256 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
257 {
258 // Put or delete. Prepare notify data here.
259 NotifyConflictAndObserverData notify;
260 notify.committedData = syncData.committedData;
261 int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
262 syncData.isPermitForceWrite);
263 if (errCode != E_OK) {
264 ResetForMigrateCacheData();
265 LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
266 return errCode;
267 }
268 // after solving conflict, the item should not be saved into mainDB
269 if (notify.dataStatus.isDefeated) {
270 LOGD("Data status is defeated");
271 return errCode;
272 }
273 bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
274 sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
275 if (statement == nullptr) {
276 LOGE("GetStatement fail when put migrating-data to main! ");
277 return -E_INVALID_ARGS;
278 }
279
280 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
281 errCode = EraseSyncData(dataItem.key);
282 goto END;
283 }
284
285 errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
286 if (errCode != E_OK) {
287 goto END;
288 }
289
290 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
291 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
292 errCode = E_OK;
293 } else {
294 LOGD("StepWithRetry fail when put migrating-data to main!");
295 }
296 END:
297 ResetForMigrateCacheData();
298 return errCode;
299 }
300
CheckDataWithQuery(std::vector<DataItem> & dataItems)301 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
302 {
303 int errCode = E_OK;
304 sqlite3_stmt *stmt = nullptr;
305 for (auto &item : dataItems) {
306 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
307 continue;
308 }
309 std::string sql;
310 DBCommon::VectorToString(item.value, sql);
311 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
312 static const std::string SYNC_DATA_TABLE = "sync_data";
313 static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
314 std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
315 if (startPos != std::string::npos) {
316 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
317 }
318 }
319 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
320 if (errCode != E_OK) {
321 LOGE("Get Check miss query data statement failed. %d", errCode);
322 return errCode;
323 }
324
325 errCode = CheckMissQueryDataItem(stmt, item.dev, item);
326 if (errCode != E_OK) {
327 LOGE("Check miss query data item failed. %d", errCode);
328 break;
329 }
330 SQLiteUtils::ResetStatement(stmt, true, errCode);
331 }
332 SQLiteUtils::ResetStatement(stmt, true, errCode);
333 return CheckCorruptedStatus(errCode);
334 }
335
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)336 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
337 {
338 syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
339 syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
340 (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
341
342 int errCode = CheckDataWithQuery(dataItems);
343 if (errCode != E_OK) {
344 LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
345 goto END;
346 }
347
348 for (auto &item : dataItems) {
349 // Remove device data owns one version itself.
350 // Get entry here. Prepare notify data in storageEngine.
351 if (syncData.isRemoveDeviceData) {
352 errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
353 if (errCode != E_OK) {
354 LOGE("Failed to get remove devices data");
355 return errCode;
356 }
357 errCode = MigrateRmDevData(item);
358 LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
359 if (errCode != E_OK) {
360 break;
361 }
362 continue;
363 }
364
365 if (item.neglect) { // Do not save this record if it is neglected
366 continue;
367 }
368
369 errCode = MigrateDataItem(item, syncData);
370 if (errCode != E_OK) {
371 LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
372 break;
373 }
374 }
375 END:
376 ResetForMigrateCacheData();
377 return CheckCorruptedStatus(errCode);
378 }
379
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)380 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
381 std::vector<DataItem> &dataItems)
382 {
383 int errCode = StartTransaction(TransactType::IMMEDIATE);
384 if (errCode != E_OK) {
385 return errCode;
386 }
387
388 // Init migrate data.
389 errCode = InitMigrateData();
390 if (errCode != E_OK) {
391 LOGE("Init migrate data failed, errCode = [%d]", errCode);
392 goto END;
393 }
394
395 // fix dataItem timestamp for migrate
396 errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
397 if (errCode != E_OK) {
398 LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
399 goto END;
400 }
401
402 errCode = MigrateDataItems(dataItems, syncData);
403 if (errCode != E_OK) {
404 goto END;
405 }
406
407 // delete recordVersion data
408 errCode = DelCacheDbDataByVersion(recordVer);
409 if (errCode != E_OK) {
410 LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
411 goto END;
412 }
413
414 errCode = Commit();
415 if (errCode != E_OK) {
416 LOGE("Commit data error and rollback, errCode = [%d]", errCode);
417 goto END;
418 }
419 return E_OK;
420 END:
421 Rollback();
422 return errCode;
423 }
424
DelCacheDbDataByVersion(uint64_t version) const425 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
426 {
427 std::string sql;
428 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
429 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
430 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
431 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
432 } else {
433 return -E_INVALID_ARGS;
434 }
435
436 sqlite3_stmt *statement = nullptr;
437 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
438 if (errCode != E_OK) {
439 LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
440 return errCode;
441 }
442
443 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
444 if (errCode != E_OK) {
445 LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
446 goto END;
447 }
448
449 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
450 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
451 errCode = E_OK;
452 }
453
454 END:
455 SQLiteUtils::ResetStatement(statement, true, errCode);
456 return CheckCorruptedStatus(errCode);
457 }
458
VacuumLocalData() const459 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
460 {
461 std::string sql;
462 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
463 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
464 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
465 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
466 } else {
467 return -E_INVALID_ARGS;
468 }
469
470 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
471 if (errCode != E_OK) {
472 LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
473 }
474
475 return CheckCorruptedStatus(errCode);
476 }
477
478 // The local table data is only for local reading and writing, which can be sensed by itself.
479 // The current migration process does not provide callback subscription function.
MigrateLocalData()480 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
481 {
482 // Nick name "main" represent current database(dbhande) in sqlite grammar
483 std::string migrateLocaldataSql;
484 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
485 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
486 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
487 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
488 } else {
489 return -E_INVALID_ARGS;
490 }
491
492 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
493 if (errCode != E_OK) {
494 LOGW("Failed to migrate the local data:%d", errCode);
495 return CheckCorruptedStatus(errCode);
496 }
497
498 return VacuumLocalData();
499 }
500
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const501 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
502 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
503 {
504 int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
505 if (errCode != E_OK) {
506 LOGE("Bind saved sync data primary key failed:%d", errCode);
507 return errCode;
508 }
509
510 // if delete flag is set, just use the hash key instead of the key
511 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
512 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
513 } else {
514 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
515 }
516
517 if (errCode != E_OK) {
518 LOGE("Bind saved sync data key failed:%d", errCode);
519 return errCode;
520 }
521
522 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
523 if (errCode != E_OK) {
524 LOGE("Bind saved sync data value failed:%d", errCode);
525 return errCode;
526 }
527
528 LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
529 dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
530 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
531 static_cast<int64_t>(dataItem.flag));
532 if (errCode != E_OK) {
533 LOGE("Bind saved sync data flag failed:%d", errCode);
534 return errCode;
535 }
536 errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
537 if (errCode != E_OK) {
538 LOGE("Bind saved sync data time stamp failed:%d", errCode);
539 return errCode;
540 }
541 return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
542 }
543
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const544 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
545 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
546 {
547 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
548 if (errCode != E_OK) {
549 LOGE("Bind saved sync data hash key failed:%d", errCode);
550 return errCode;
551 }
552 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
553 if (errCode != E_OK) {
554 LOGE("Bind saved sync data version failed:%d", errCode);
555 }
556 return errCode;
557 }
558
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const559 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
560 sqlite3_stmt *statement, const DataItem &dataItem) const
561 {
562 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
563 if (errCode != E_OK) {
564 LOGE("Bind saved sync data stamp failed:%d", errCode);
565 return errCode;
566 }
567
568 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
569 if (errCode != E_OK) {
570 LOGE("Bind saved sync data write stamp failed:%d", errCode);
571 }
572 return errCode;
573 }
574
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const575 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
576 const std::string &origDev, const std::string &deviceName) const
577 {
578 std::string devName = DBCommon::TransferHashString(deviceName);
579 std::vector<uint8_t> devVect(devName.begin(), devName.end());
580 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
581 if (errCode != E_OK) {
582 LOGE("Bind dev for sync data failed:%d", errCode);
583 return errCode;
584 }
585
586 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
587 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
588 if (errCode != E_OK) {
589 LOGE("Bind orig dev for sync data failed:%d", errCode);
590 }
591 return errCode;
592 }
593
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)594 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
595 {
596 int errCode = E_OK;
597 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
598
599 std::string sql;
600 std::string expandedSql;
601 errCode = helper.GetSyncDataCheckSql(sql);
602 if (errCode != E_OK) {
603 LOGE("Get sync data check sql failed");
604 return errCode;
605 }
606 sqlite3_stmt *stmt = nullptr;
607 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
608 if (errCode != E_OK) {
609 LOGE("Get statement fail. %d", errCode);
610 return -E_INVALID_QUERY_FORMAT;
611 }
612
613 errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
614 if (errCode != E_OK) {
615 goto END;
616 }
617
618 errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
619 if (errCode != E_OK) {
620 LOGE("Get expand sql fail. %d", errCode);
621 }
622 DBCommon::StringToVector(expandedSql, dataItem.value);
623 END:
624 SQLiteUtils::ResetStatement(stmt, true, errCode);
625 return errCode;
626 }
627
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)628 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
629 Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
630 {
631 Key hashKey;
632 int errCode = E_OK;
633 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
634 hashKey = dataItem.key;
635 } else {
636 errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
637 if (errCode != E_OK) {
638 return errCode;
639 }
640 }
641
642 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
643 errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
644 if (errCode != E_OK) {
645 LOGE("Get sync data check sql failed. %d", errCode);
646 return errCode;
647 }
648 }
649
650 std::string origDev = dataItem.origDev;
651 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
652 origDev.clear();
653 }
654 dataItem.dev = deviceInfo.deviceName;
655 dataItem.origDev = origDev;
656 errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
657 if (errCode == E_OK) {
658 maxStamp = std::max(dataItem.timestamp, maxStamp);
659 } else {
660 LOGE("Save sync data to db failed:%d", errCode);
661 }
662 return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
663 }
664
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const665 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
666 const Key &hashKey, uint64_t recordVersion) const
667 {
668 auto statement = saveSyncStatements_.GetDataSaveStatement(false);
669 if (statement == nullptr) {
670 return -E_INVALID_ARGS;
671 }
672 int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
673 if (errCode != E_OK) {
674 return errCode;
675 }
676
677 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
678 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
679 errCode = E_OK;
680 }
681 return errCode;
682 }
683
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const684 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
685 {
686 sqlite3_stmt *statement = nullptr;
687 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
688 if (errCode != E_OK) {
689 goto ERROR;
690 }
691
692 errCode = BindLocalDataInCacheMode(statement, dataItem);
693 if (errCode != E_OK) {
694 goto ERROR;
695 }
696
697 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
698 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
699 errCode = E_OK;
700 }
701
702 ERROR:
703 SQLiteUtils::ResetStatement(statement, true, errCode);
704 return CheckCorruptedStatus(errCode);
705 }
706
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const707 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
708 const LocalDataItem &dataItem) const
709 {
710 int errCode = SQLiteUtils::BindBlobToStatement(statement,
711 BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
712 if (errCode != E_OK) {
713 LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
714 return errCode;
715 }
716
717 // if delete flag is set, just use the hash key instead of the key
718 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
719 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
720 } else {
721 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
722 }
723
724 if (errCode != E_OK) {
725 LOGE("Bind saved sync data key failed:%d", errCode);
726 return errCode;
727 }
728
729 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
730 if (errCode != E_OK) {
731 LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
732 return errCode;
733 }
734
735 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
736 if (errCode != E_OK) {
737 LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
738 return errCode;
739 }
740
741 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
742 static_cast<int64_t>(dataItem.flag));
743 if (errCode != E_OK) {
744 LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
745 return errCode;
746 }
747
748 return E_OK;
749 }
750
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)751 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
752 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
753 {
754 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
755 if (errCode != E_OK) {
756 errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
757 if (errCode == -E_IGNORE_DATA) {
758 notify.dataStatus.isDefeated = true;
759 errCode = E_OK;
760 }
761 return errCode;
762 }
763
764 // If delete data, the key is empty.
765 if (isSyncMigrating_ && dataItem.key.empty()) {
766 dataItem.key = notify.getData.key;
767 }
768
769 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
770 if (notify.dataStatus.isDefeated) {
771 LOGD("Data status is defeated:%d", errCode);
772 return ResetForMigrateCacheData();
773 }
774
775 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.hashKey, notify.committedData);
776 return ResetForMigrateCacheData();
777 }
778
GetMinTimestampInCacheDB(Timestamp & minStamp) const779 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
780 {
781 if (dbHandle_ == nullptr) {
782 return E_OK;
783 }
784 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
785 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
786 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
787 sqlite3_stmt *statement = nullptr;
788 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
789 if (errCode != E_OK) {
790 goto ERROR;
791 }
792
793 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
794 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
795 minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
796 LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
797 errCode = E_OK;
798 } else {
799 LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
800 }
801
802 ERROR:
803 SQLiteUtils::ResetStatement(statement, true, errCode);
804 return errCode;
805 }
806
InitMigrateTimestampOffset()807 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
808 {
809 // Not first migrate, migrateTimeOffset_ has been set.
810 if (migrateTimeOffset_ != 0) {
811 return E_OK;
812 }
813
814 // Get min timestamp of local data in sync_data, cacheDB.
815 Timestamp minTimeInCache = 0;
816 int errCode = GetMinTimestampInCacheDB(minTimeInCache);
817 if (errCode != E_OK) {
818 return errCode;
819 }
820
821 // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
822 if (minTimeInCache == 0) {
823 migrateTimeOffset_ = -1;
824 LOGI("Time offset during migrating is -1.");
825 return E_OK;
826 }
827
828 // Get max timestamp in mainDB.
829 Timestamp maxTimeInMain = 0;
830 InitCurrentMaxStamp(maxTimeInMain);
831
832 // Get timestamp offset between mainDB and cacheDB.
833 // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
834 // the last data record in the original mainDB after the migration.
835 migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
836 LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
837 " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
838 return E_OK;
839 }
840
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)841 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
842 {
843 if (dataItems.empty()) {
844 LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
845 return -E_INVALID_ARGS;
846 }
847
848 // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
849 int errCode = InitMigrateTimestampOffset();
850 if (errCode != E_OK) {
851 return errCode;
852 }
853
854 // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
855 Timestamp maxTimeInDataItems = 0;
856 for (auto &item : dataItems) {
857 item.timestamp -= migrateTimeOffset_;
858 maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
859 }
860
861 // Update max timestamp in mainDB.
862 maxTimestampInMainDB_ = maxTimeInDataItems;
863 return E_OK;
864 }
865
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const866 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
867 std::vector<Entry> &entries) const
868 {
869 // When removing device data, key is 'remove', value is device name.
870 if (item.key != REMOVE_DEVICE_DATA_KEY) {
871 LOGE("Invalid key. Can not notify remove device data.");
872 return -E_INVALID_ARGS;
873 }
874 if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
875 LOGI("No need to notify remove device data.");
876 return E_OK;
877 }
878 entries.clear();
879 std::string dev;
880 DBCommon::VectorToString(item.value, dev);
881 return GetAllSyncedEntries(dev, entries);
882 }
883
InitMigrateData()884 int SQLiteSingleVerStorageExecutor::InitMigrateData()
885 {
886 // Sync_data already in migrating. Need not to init data.
887 if (isSyncMigrating_) {
888 return E_OK;
889 }
890 ClearMigrateData();
891 std::string querySQL;
892 std::string insertSQL;
893 std::string updateSQL;
894 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
895 querySQL = SELECT_SYNC_HASH_SQL;
896 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
897 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
898 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
899 querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
900 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
901 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
902 } else {
903 LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
904 return -E_INVALID_DB;
905 }
906 int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
907 if (errCode != E_OK) {
908 LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
909 return errCode;
910 }
911 isSyncMigrating_ = true;
912 return errCode;
913 }
914
ClearMigrateData()915 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
916 {
917 // Reset data.
918 migrateTimeOffset_ = 0;
919 maxTimestampInMainDB_ = 0;
920
921 // Reset statement.
922 int errCode = migrateSyncStatements_.ResetStatement();
923 if (errCode != E_OK) {
924 LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
925 }
926
927 isSyncMigrating_ = false;
928 }
929
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const930 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
931 {
932 if (maxTimestampInMainDB_ == 0) {
933 return -E_NOT_INIT;
934 }
935 maxTimestamp = maxTimestampInMainDB_;
936 return E_OK;
937 }
938
DeleteMetaData(const std::vector<Key> & keys)939 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
940 {
941 sqlite3_stmt *statement = nullptr;
942 const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
943 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
944 if (errCode != E_OK) {
945 return errCode;
946 }
947
948 for (const auto &key : keys) {
949 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
950 if (errCode != E_OK) {
951 break;
952 }
953
954 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
955 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
956 break;
957 }
958 errCode = E_OK;
959 SQLiteUtils::ResetStatement(statement, false, errCode);
960 }
961
962 SQLiteUtils::ResetStatement(statement, true, errCode);
963 return CheckCorruptedStatus(errCode);
964 }
965
DeleteMetaDataByPrefixKey(const Key & keyPrefix)966 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
967 {
968 sqlite3_stmt *statement = nullptr;
969 const std::string sql = attachMetaMode_ ?
970 REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
971
972 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
973 if (errCode != E_OK) {
974 return errCode;
975 }
976
977 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
978 if (errCode == E_OK) {
979 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
980 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
981 errCode = E_OK;
982 }
983 }
984
985 SQLiteUtils::ResetStatement(statement, true, errCode);
986 return CheckCorruptedStatus(errCode);
987 }
988 } // namespace DistributedDB
989