1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31 int errCode = -E_NOT_SUPPORT;
32 if (type == SingleVerDataType::LOCAL_TYPE) {
33 std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34 INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35 std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36 UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37 errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38 saveLocalStatements_);
39 } else if (type == SingleVerDataType::SYNC_TYPE) {
40 std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42 std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43 UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44 std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45 SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46 errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47 }
48 if (errCode != E_OK) {
49 LOGE("Prepare to save sync cache data failed:%d", errCode);
50 }
51 return CheckCorruptedStatus(errCode);
52 }
53
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56 int errCode = E_OK;
57 if (type == SingleVerDataType::LOCAL_TYPE) {
58 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61 } else if (type == SingleVerDataType::SYNC_TYPE) {
62 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65 }
66
67 return CheckCorruptedStatus(errCode);
68 }
69
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72 int errCode = E_OK;
73 SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74 SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75 SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76
77 return CheckCorruptedStatus(errCode);
78 }
79
RemoveDeviceDataInCacheMode(const std::string & deviceName,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &deviceName,
81 bool isNeedNotify, uint64_t recordVersion) const
82 {
83 // Transfer the device name.
84 std::string devName = DBCommon::TransferHashString(deviceName);
85 std::vector<uint8_t> devVect(devName.begin(), devName.end());
86
87 Key hashKey;
88 int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
89 if (errCode != E_OK) {
90 return errCode;
91 }
92
93 DataItem dataItem;
94 dataItem.key = REMOVE_DEVICE_DATA_KEY;
95 dataItem.value = devVect;
96 if (isNeedNotify) {
97 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
98 } else {
99 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
100 }
101
102 sqlite3_stmt *statement = nullptr;
103 std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
104 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
105 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
106 if (errCode != E_OK) {
107 goto ERROR;
108 }
109
110 errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
111 if (errCode != E_OK) {
112 goto ERROR;
113 }
114
115 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
116 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
117 LOGE("Failed to execute rm the device synced data:%d", errCode);
118 } else {
119 errCode = E_OK;
120 }
121
122 ERROR:
123 SQLiteUtils::ResetStatement(statement, true, errCode);
124 return CheckCorruptedStatus(errCode);
125 }
126
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const127 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
128 std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
129 {
130 std::string sql;
131 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
132 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
133 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
134 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
135 } else {
136 return -E_INVALID_ARGS;
137 }
138
139 sqlite3_stmt *statement = nullptr;
140 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
141 if (errCode != E_OK) {
142 LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
143 goto END;
144 }
145
146 errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
147 if (errCode != E_OK) {
148 LOGE("Failed to get all the data items by the min version:[%d]", errCode);
149 }
150
151 END:
152 SQLiteUtils::ResetStatement(statement, true, errCode);
153 return CheckCorruptedStatus(errCode);
154 }
155
MigrateRmDevData(const DataItem & dataItem) const156 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
157 {
158 if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
159 LOGE("This item not means remove devices data, can not continue exe!");
160 return -E_INVALID_ARGS;
161 }
162
163 std::string sql;
164 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
165 sql = REMOVE_DEV_DATA_SQL;
166 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
167 sql = REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
168 } else {
169 return -E_INVALID_ARGS;
170 }
171
172 sqlite3_stmt *statement = nullptr;
173 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
174 if (errCode != E_OK) {
175 LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
176 return CheckCorruptedStatus(errCode);
177 }
178
179 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
180 if (errCode != E_OK) {
181 LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
182 goto END;
183 }
184
185 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
186 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
187 errCode = E_OK;
188 }
189 END:
190 SQLiteUtils::ResetStatement(statement, true, errCode);
191 return CheckCorruptedStatus(errCode);
192 }
193
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)194 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
195 const std::string &attachDbAbsPath, EngineState engineState)
196 {
197 std::string attachAsName;
198 if (engineState == EngineState::MAINDB) {
199 attachAsName = "cache";
200 } else if (engineState == EngineState::CACHEDB) {
201 attachAsName = "maindb";
202 } else if (engineState == EngineState::ATTACHING) {
203 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
204 return E_OK;
205 } else {
206 return -E_INVALID_ARGS;
207 }
208
209 int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
210 if (errCode != E_OK) {
211 LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
212 return CheckCorruptedStatus(errCode);
213 }
214
215 if (engineState == EngineState::MAINDB) {
216 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
217 } else if (engineState == EngineState::CACHEDB) {
218 executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
219 } else {
220 return -E_INVALID_ARGS;
221 }
222 LOGD("[singleVerExecutor][attachDb] current engineState[%d], executorState[%d]", engineState, executorState_);
223
224 return errCode;
225 }
226
GetMaxVersionIncacheDb(uint64_t & maxVersion) const227 int SQLiteSingleVerStorageExecutor::GetMaxVersionIncacheDb(uint64_t &maxVersion) const
228 {
229 sqlite3_stmt *statement = nullptr;
230 std::string sql;
231 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
232 sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
233 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
234 sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
235 } else {
236 return -E_INVALID_ARGS;
237 }
238
239 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
240 if (errCode != E_OK) {
241 LOGE("GetStatement fail when get max version in cache db! errCode = [%d]", errCode);
242 goto END;
243 }
244
245 do {
246 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
247 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
248 maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
249 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
250 errCode = E_OK;
251 break;
252 } else {
253 LOGE("SQLite step failed:%d", errCode);
254 break;
255 }
256 } while (true);
257
258 END:
259 SQLiteUtils::ResetStatement(statement, true, errCode);
260 return CheckCorruptedStatus(errCode);
261 }
262
MigrateDataItem(DataItem & dataItem,NotifyMigrateSyncData & syncData)263 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, NotifyMigrateSyncData &syncData)
264 {
265 // Put or delete. Prepare notify data here.
266 NotifyConflictAndObserverData notify;
267 notify.committedData = syncData.committedData;
268 int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
269 syncData.isPermitForceWrite);
270 if (errCode != E_OK) {
271 ResetForMigrateCacheData();
272 LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
273 return errCode;
274 }
275 // after solving conflict, the item should not be saved into mainDB
276 if (notify.dataStatus.isDefeated) {
277 LOGD("Data status is defeated:%d", errCode);
278 return errCode;
279 }
280 bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
281 sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
282 if (statement == nullptr) {
283 LOGE("GetStatement fail when put migrating-data to main! ");
284 return -E_INVALID_ARGS;
285 }
286
287 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
288 errCode = EraseSyncData(dataItem.key);
289 goto END;
290 }
291
292 errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
293 if (errCode != E_OK) {
294 goto END;
295 }
296
297 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
298 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
299 errCode = E_OK;
300 } else {
301 LOGD("StepWithRetry fail when put migrating-data to main!");
302 }
303 END:
304 ResetForMigrateCacheData();
305 return errCode;
306 }
307
CheckDataWithQuery(std::vector<DataItem> & dataItems)308 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
309 {
310 int errCode = E_OK;
311 sqlite3_stmt *stmt = nullptr;
312 for (auto &item : dataItems) {
313 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
314 continue;
315 }
316 std::string sql;
317 DBCommon::VectorToString(item.value, sql);
318 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
319 static const std::string SYNC_DATA_TABLE = "sync_data";
320 static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
321 std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
322 if (startPos != std::string::npos) {
323 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
324 }
325 }
326 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
327 if (errCode != E_OK) {
328 LOGE("Get Check miss query data statement failed. %d", errCode);
329 return errCode;
330 }
331
332 errCode = CheckMissQueryDataItem(stmt, item.dev, item);
333 if (errCode != E_OK) {
334 LOGE("Check miss query data item failed. %d", errCode);
335 break;
336 }
337 SQLiteUtils::ResetStatement(stmt, true, errCode);
338 }
339 SQLiteUtils::ResetStatement(stmt, true, errCode);
340 return CheckCorruptedStatus(errCode);
341 }
342
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)343 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
344 {
345 syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
346 syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
347 (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
348
349 int errCode = CheckDataWithQuery(dataItems);
350 if (errCode != E_OK) {
351 LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
352 goto END;
353 }
354
355 for (auto &item : dataItems) {
356 // Remove device data owns one version itself.
357 // Get entry here. Prepare notify data in storageEngine.
358 if (syncData.isRemoveDeviceData) {
359 errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
360 if (errCode != E_OK) {
361 LOGE("Failed to get remove devices data");
362 return errCode;
363 }
364 errCode = MigrateRmDevData(item);
365 LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
366 if (errCode != E_OK) {
367 break;
368 }
369 continue;
370 }
371
372 if (item.neglect) { // Do not save this record if it is neglected
373 continue;
374 }
375
376 errCode = MigrateDataItem(item, syncData);
377 if (errCode != E_OK) {
378 LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
379 break;
380 }
381 }
382 END:
383 ResetForMigrateCacheData();
384 return CheckCorruptedStatus(errCode);
385 }
386
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)387 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
388 std::vector<DataItem> &dataItems)
389 {
390 int errCode = StartTransaction(TransactType::IMMEDIATE);
391 if (errCode != E_OK) {
392 return errCode;
393 }
394
395 // Init migrate data.
396 errCode = InitMigrateData();
397 if (errCode != E_OK) {
398 LOGE("Init migrate data failed, errCode = [%d]", errCode);
399 goto END;
400 }
401
402 // fix dataItem timestamp for migrate
403 errCode = ProcessTimeStampForSyncDataInCacheDB(dataItems);
404 if (errCode != E_OK) {
405 LOGE("Chang the time stamp for migrate failed! errCode = [%d]", errCode);
406 goto END;
407 }
408
409 errCode = MigrateDataItems(dataItems, syncData);
410 if (errCode != E_OK) {
411 goto END;
412 }
413
414 // delete recordVersion data
415 errCode = DelCacheDbDataByVersion(recordVer);
416 if (errCode != E_OK) {
417 LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
418 goto END;
419 }
420
421 errCode = Commit();
422 if (errCode != E_OK) {
423 LOGE("Commit data error and rollback, errCode = [%d]", errCode);
424 goto END;
425 }
426 return E_OK;
427 END:
428 Rollback();
429 return errCode;
430 }
431
DelCacheDbDataByVersion(uint64_t version) const432 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
433 {
434 std::string sql;
435 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
436 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
437 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
438 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
439 } else {
440 return -E_INVALID_ARGS;
441 }
442
443 sqlite3_stmt *statement = nullptr;
444 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
445 if (errCode != E_OK) {
446 LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
447 return errCode;
448 }
449
450 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
451 if (errCode != E_OK) {
452 LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
453 goto END;
454 }
455
456 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
457 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
458 errCode = E_OK;
459 }
460
461 END:
462 SQLiteUtils::ResetStatement(statement, true, errCode);
463 return CheckCorruptedStatus(errCode);
464 }
465
VacuumLocalData() const466 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
467 {
468 std::string sql;
469 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
470 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
471 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
472 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
473 } else {
474 return -E_INVALID_ARGS;
475 }
476
477 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
478 if (errCode != E_OK) {
479 LOGE("SQLite sync mode failed: %d", errCode);
480 }
481
482 return CheckCorruptedStatus(errCode);
483 }
484
485 // The local table data is only for local reading and writing, which can be sensed by itself.
486 // The current migration process does not provide callback subscription function.
MigrateLocalData()487 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
488 {
489 // Nick name "main" represent current database(dbhande) in sqlite grammar
490 std::string migrateLocaldataSql;
491 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
492 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
493 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
494 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
495 } else {
496 return -E_INVALID_ARGS;
497 }
498
499 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
500 if (errCode != E_OK) {
501 LOGW("Failed to migrate the local data:%d", errCode);
502 return CheckCorruptedStatus(errCode);
503 }
504
505 return VacuumLocalData();
506 }
507
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const508 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
509 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
510 {
511 int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
512 if (errCode != E_OK) {
513 LOGE("Bind saved sync data primary key failed:%d", errCode);
514 return errCode;
515 }
516
517 // if delete flag is set, just use the hash key instead of the key
518 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
519 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
520 } else {
521 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
522 }
523
524 if (errCode != E_OK) {
525 LOGE("Bind saved sync data key failed:%d", errCode);
526 return errCode;
527 }
528
529 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
530 if (errCode != E_OK) {
531 LOGE("Bind saved sync data value failed:%d", errCode);
532 return errCode;
533 }
534
535 LOGD("Write timestamp:%llu timestamp%llu, %llu, version %llu", dataItem.writeTimeStamp, dataItem.timeStamp,
536 dataItem.flag, recordVersion);
537 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
538 static_cast<int64_t>(dataItem.flag));
539 if (errCode != E_OK) {
540 LOGE("Bind saved sync data flag failed:%d", errCode);
541 return errCode;
542 }
543 errCode = BindTimeStampSyncDataInCacheMode(statement, dataItem);
544 if (errCode != E_OK) {
545 LOGE("Bind saved sync data time stamp failed:%d", errCode);
546 return errCode;
547 }
548 return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
549 }
550
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const551 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
552 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
553 {
554 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
555 if (errCode != E_OK) {
556 LOGE("Bind saved sync data hash key failed:%d", errCode);
557 return errCode;
558 }
559 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
560 if (errCode != E_OK) {
561 LOGE("Bind saved sync data version failed:%d", errCode);
562 }
563 return errCode;
564 }
565
BindTimeStampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const566 int SQLiteSingleVerStorageExecutor::BindTimeStampSyncDataInCacheMode(
567 sqlite3_stmt *statement, const DataItem &dataItem) const
568 {
569 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timeStamp);
570 if (errCode != E_OK) {
571 LOGE("Bind saved sync data stamp failed:%d", errCode);
572 return errCode;
573 }
574
575 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimeStamp);
576 if (errCode != E_OK) {
577 LOGE("Bind saved sync data write stamp failed:%d", errCode);
578 }
579 return errCode;
580 }
581
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const582 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
583 const std::string &origDev, const std::string &deviceName) const
584 {
585 std::string devName = DBCommon::TransferHashString(deviceName);
586 std::vector<uint8_t> devVect(devName.begin(), devName.end());
587 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
588 if (errCode != E_OK) {
589 LOGE("Bind dev for sync data failed:%d", errCode);
590 return errCode;
591 }
592
593 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
594 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
595 if (errCode != E_OK) {
596 LOGE("Bind orig dev for sync data failed:%d", errCode);
597 }
598 return errCode;
599 }
600
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)601 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
602 {
603 int errCode = E_OK;
604 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
605
606 std::string sql;
607 std::string expandedSql;
608 errCode = helper.GetSyncDataCheckSql(sql);
609 if (errCode != E_OK) {
610 LOGE("Get sync data check sql failed");
611 return errCode;
612 }
613 sqlite3_stmt *stmt = nullptr;
614 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
615 if (errCode != E_OK) {
616 LOGE("Get statement fail. %d", errCode);
617 return -E_INVALID_QUERY_FORMAT;
618 }
619
620 errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
621 if (errCode != E_OK) {
622 goto END;
623 }
624
625 errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
626 if (errCode != E_OK) {
627 LOGE("Get expand sql fail. %d", errCode);
628 }
629 DBCommon::StringToVector(expandedSql, dataItem.value);
630 END:
631 SQLiteUtils::ResetStatement(stmt, true, errCode);
632 return errCode;
633 }
634
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,TimeStamp & maxStamp,uint64_t recordVersion,const QueryObject & query)635 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
636 TimeStamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
637 {
638 Key hashKey;
639 int errCode = E_OK;
640 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
641 hashKey = dataItem.key;
642 } else {
643 errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
644 if (errCode != E_OK) {
645 return errCode;
646 }
647 }
648
649 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
650 errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
651 if (errCode != E_OK) {
652 LOGE("Get sync data check sql failed. %d", errCode);
653 return errCode;
654 }
655 }
656
657 std::string origDev = dataItem.origDev;
658 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
659 origDev.clear();
660 }
661 dataItem.dev = deviceInfo.deviceName;
662 dataItem.origDev = origDev;
663 errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
664 if (errCode == E_OK) {
665 maxStamp = std::max(dataItem.timeStamp, maxStamp);
666 } else {
667 LOGE("Save sync data to db failed:%d", errCode);
668 }
669 return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
670 }
671
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const672 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
673 const Key &hashKey, uint64_t recordVersion) const
674 {
675 auto statement = saveSyncStatements_.GetDataSaveStatement(false);
676 if (statement == nullptr) {
677 return -E_INVALID_ARGS;
678 }
679 int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
680 if (errCode != E_OK) {
681 return errCode;
682 }
683
684 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
685 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
686 errCode = E_OK;
687 }
688 return errCode;
689 }
690
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const691 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
692 {
693 sqlite3_stmt *statement = nullptr;
694 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
695 if (errCode != E_OK) {
696 goto ERROR;
697 }
698
699 errCode = BindLocalDataInCacheMode(statement, dataItem);
700 if (errCode != E_OK) {
701 goto ERROR;
702 }
703
704 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
705 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
706 errCode = E_OK;
707 }
708
709 ERROR:
710 SQLiteUtils::ResetStatement(statement, true, errCode);
711 return CheckCorruptedStatus(errCode);
712 }
713
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const714 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
715 const LocalDataItem &dataItem) const
716 {
717 int errCode = SQLiteUtils::BindBlobToStatement(statement,
718 BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
719 if (errCode != E_OK) {
720 LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
721 return errCode;
722 }
723
724 // if delete flag is set, just use the hash key instead of the key
725 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
726 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
727 } else {
728 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
729 }
730
731 if (errCode != E_OK) {
732 LOGE("Bind saved sync data key failed:%d", errCode);
733 return errCode;
734 }
735
736 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
737 if (errCode != E_OK) {
738 LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
739 return errCode;
740 }
741
742 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timeStamp);
743 if (errCode != E_OK) {
744 LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
745 return errCode;
746 }
747
748 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
749 static_cast<int64_t>(dataItem.flag));
750 if (errCode != E_OK) {
751 LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
752 return errCode;
753 }
754
755 return E_OK;
756 }
757
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)758 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
759 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
760 {
761 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
762 if (errCode != E_OK) {
763 errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
764 if (errCode == -E_IGNORE_DATA) {
765 notify.dataStatus.isDefeated = true;
766 errCode = E_OK;
767 }
768 return errCode;
769 }
770
771 // If delete data, the key is empty.
772 if (isSyncMigrating_ && dataItem.key.empty()) {
773 dataItem.key = notify.getData.key;
774 }
775
776 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
777 if (notify.dataStatus.isDefeated) {
778 LOGD("Data status is defeated:%d", errCode);
779 return ResetForMigrateCacheData();
780 }
781
782 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.hashKey, notify.committedData);
783 return ResetForMigrateCacheData();
784 }
785
GetMinTimestampInCacheDB(TimeStamp & minStamp) const786 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(TimeStamp &minStamp) const
787 {
788 if (dbHandle_ == nullptr) {
789 return E_OK;
790 }
791 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
792 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
793 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
794 sqlite3_stmt *statement = nullptr;
795 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
796 if (errCode != E_OK) {
797 goto ERROR;
798 }
799
800 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
801 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
802 minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
803 LOGD("Min time stamp in cacheDB is %llu", minStamp);
804 errCode = E_OK;
805 } else {
806 LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
807 }
808
809 ERROR:
810 SQLiteUtils::ResetStatement(statement, true, errCode);
811 return errCode;
812 }
813
InitMigrateTimeStampOffset()814 int SQLiteSingleVerStorageExecutor::InitMigrateTimeStampOffset()
815 {
816 // Not first migrate, migrateTimeOffset_ has been set.
817 if (migrateTimeOffset_ != 0) {
818 return E_OK;
819 }
820
821 // Get min timestamp of local data in sync_data, cacheDB.
822 TimeStamp minTimeInCache = 0;
823 int errCode = GetMinTimestampInCacheDB(minTimeInCache);
824 if (errCode != E_OK) {
825 return errCode;
826 }
827
828 // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
829 if (minTimeInCache == 0) {
830 migrateTimeOffset_ = -1;
831 LOGI("Time offset during migrating is -1.");
832 return E_OK;
833 }
834
835 // Get max timestamp in mainDB.
836 TimeStamp maxTimeInMain = 0;
837 InitCurrentMaxStamp(maxTimeInMain);
838
839 // Get timestamp offset between mainDB and cacheDB.
840 // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
841 // the last data record in the original mainDB after the migration.
842 migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
843 LOGI("Min timestamp in cacheDB is %llu, max timestamp in mainDB is %llu. Time offset during migrating is %lld.",
844 minTimeInCache, maxTimeInMain, migrateTimeOffset_);
845 return E_OK;
846 }
847
ProcessTimeStampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)848 int SQLiteSingleVerStorageExecutor::ProcessTimeStampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
849 {
850 if (dataItems.empty()) {
851 LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimeStampForCacheDB] Invalid parameter : dataItems.");
852 return -E_INVALID_ARGS;
853 }
854
855 // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
856 int errCode = InitMigrateTimeStampOffset();
857 if (errCode != E_OK) {
858 return errCode;
859 }
860
861 // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
862 TimeStamp maxTimeInDataItems = 0;
863 for (auto &item : dataItems) {
864 item.timeStamp -= migrateTimeOffset_;
865 maxTimeInDataItems = std::max(maxTimeInDataItems, item.timeStamp);
866 }
867
868 // Update max timestamp in mainDB.
869 maxTimeStampInMainDB_ = maxTimeInDataItems;
870 return E_OK;
871 }
872
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const873 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
874 std::vector<Entry> &entries) const
875 {
876 // When removing device data, key is 'remove', value is device name.
877 if (item.key != REMOVE_DEVICE_DATA_KEY) {
878 LOGE("Invalid key. Can not notify remove device data.");
879 return -E_INVALID_ARGS;
880 }
881 if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
882 LOGI("No need to notify remove device data.");
883 return E_OK;
884 }
885 entries.clear();
886 std::string dev;
887 DBCommon::VectorToString(item.value, dev);
888 return GetAllSyncedEntries(dev, entries);
889 }
890
InitMigrateData()891 int SQLiteSingleVerStorageExecutor::InitMigrateData()
892 {
893 // Sync_data already in migrating. Need not to init data.
894 if (isSyncMigrating_) {
895 return E_OK;
896 }
897 ClearMigrateData();
898 std::string querySQL;
899 std::string insertSQL;
900 std::string updateSQL;
901 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
902 querySQL = SELECT_SYNC_HASH_SQL;
903 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
904 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
905 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
906 querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
907 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
908 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
909 } else {
910 LOGE("[InitMigrateData] executor in an error state[%d]!", executorState_);
911 return -E_INVALID_DB;
912 }
913 int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
914 if (errCode != E_OK) {
915 LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
916 return errCode;
917 }
918 isSyncMigrating_ = true;
919 return errCode;
920 }
921
ClearMigrateData()922 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
923 {
924 // Reset data.
925 migrateTimeOffset_ = 0;
926 maxTimeStampInMainDB_ = 0;
927
928 // Reset statement.
929 int errCode = migrateSyncStatements_.ResetStatement();
930 if (errCode != E_OK) {
931 LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
932 }
933
934 isSyncMigrating_ = false;
935 }
936
GetMaxTimeStampDuringMigrating(TimeStamp & maxTimeStamp) const937 int SQLiteSingleVerStorageExecutor::GetMaxTimeStampDuringMigrating(TimeStamp &maxTimeStamp) const
938 {
939 if (maxTimeStampInMainDB_ == 0) {
940 return -E_NOT_INIT;
941 }
942 maxTimeStamp = maxTimeStampInMainDB_;
943 return E_OK;
944 }
945
DeleteMetaData(const std::vector<Key> & keys)946 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
947 {
948 sqlite3_stmt *statement = nullptr;
949 const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
950 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
951 if (errCode != E_OK) {
952 return errCode;
953 }
954
955 for (const auto &key : keys) {
956 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
957 if (errCode != E_OK) {
958 break;
959 }
960
961 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
962 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
963 break;
964 }
965 errCode = E_OK;
966 SQLiteUtils::ResetStatement(statement, false, errCode);
967 }
968
969 SQLiteUtils::ResetStatement(statement, true, errCode);
970 return CheckCorruptedStatus(errCode);
971 }
972
DeleteMetaDataByPrefixKey(const Key & keyPrefix)973 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
974 {
975 sqlite3_stmt *statement = nullptr;
976 const std::string sql = attachMetaMode_ ?
977 REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
978
979 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
980 if (errCode != E_OK) {
981 return errCode;
982 }
983
984 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
985 if (errCode == E_OK) {
986 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
987 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
988 errCode = E_OK;
989 }
990 }
991
992 SQLiteUtils::ResetStatement(statement, true, errCode);
993 return CheckCorruptedStatus(errCode);
994 }
995 } // namespace DistributedDB
996