1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "sqlite_single_ver_storage_executor.h"
17
18 #include <algorithm>
19
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31 int errCode = -E_NOT_SUPPORT;
32 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
33 std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34 INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35 std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36 UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37 errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38 saveLocalStatements_);
39 } else if (type == SingleVerDataType::SYNC_TYPE) {
40 std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42 std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43 UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44 std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45 SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46 errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47 }
48 if (errCode != E_OK) {
49 LOGE("Prepare to save sync cache data failed:%d", errCode);
50 }
51 return CheckCorruptedStatus(errCode);
52 }
53
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56 int errCode = E_OK;
57 if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
58 SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59 SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60 SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61 } else if (type == SingleVerDataType::SYNC_TYPE) {
62 SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63 SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64 SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65 }
66
67 return CheckCorruptedStatus(errCode);
68 }
69
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72 int errCode = E_OK;
73 SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74 SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75 SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76
77 return CheckCorruptedStatus(errCode);
78 }
79
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81 bool isNeedNotify, uint64_t recordVersion) const
82 {
83 // device name always hash string.
84 std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85
86 Key hashKey;
87 int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88 if (errCode != E_OK) {
89 return errCode;
90 }
91
92 DataItem dataItem;
93 dataItem.key = REMOVE_DEVICE_DATA_KEY;
94 dataItem.value = devVect;
95 if (isNeedNotify) {
96 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97 } else {
98 dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99 }
100
101 sqlite3_stmt *statement = nullptr;
102 std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103 INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105 if (errCode != E_OK) {
106 goto ERROR;
107 }
108
109 errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110 if (errCode != E_OK) {
111 goto ERROR;
112 }
113
114 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116 LOGE("Failed to execute rm the device synced data:%d", errCode);
117 } else {
118 errCode = E_OK;
119 }
120
121 ERROR:
122 int ret = E_OK;
123 SQLiteUtils::ResetStatement(statement, true, ret);
124 return CheckCorruptedStatus(errCode);
125 }
126
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const127 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
128 std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
129 {
130 std::string sql;
131 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
132 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
133 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
134 sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
135 } else {
136 return -E_INVALID_ARGS;
137 }
138
139 sqlite3_stmt *statement = nullptr;
140 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
141 if (errCode != E_OK) {
142 LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
143 return CheckCorruptedStatus(errCode);
144 }
145
146 errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
147 if (errCode != E_OK) {
148 LOGE("Failed to get all the data items by the min version:[%d]", errCode);
149 }
150
151 int ret = E_OK;
152 SQLiteUtils::ResetStatement(statement, true, ret);
153 return CheckCorruptedStatus(errCode);
154 }
155
MigrateRmDevData(const DataItem & dataItem) const156 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
157 {
158 if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
159 LOGE("This item not means remove devices data, can not continue exe!");
160 return -E_INVALID_ARGS;
161 }
162
163 std::string sql;
164 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
165 sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL : REMOVE_DEV_DATA_SQL;
166 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
167 sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL_FROM_CACHEHANDLE: REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
168 } else {
169 return -E_INVALID_ARGS;
170 }
171
172 sqlite3_stmt *statement = nullptr;
173 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
174 if (errCode != E_OK) {
175 LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
176 return CheckCorruptedStatus(errCode);
177 }
178
179 if (!dataItem.value.empty()) {
180 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
181 if (errCode != E_OK) {
182 LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
183 goto END;
184 }
185 }
186
187 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
188 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
189 errCode = E_OK;
190 }
191 END:
192 int ret = E_OK;
193 SQLiteUtils::ResetStatement(statement, true, ret);
194 return CheckCorruptedStatus(errCode);
195 }
196
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)197 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
198 const std::string &attachDbAbsPath, EngineState engineState)
199 {
200 std::string attachAsName;
201 if (engineState == EngineState::MAINDB) {
202 attachAsName = "cache";
203 } else if (engineState == EngineState::CACHEDB) {
204 attachAsName = "maindb";
205 } else if (engineState == EngineState::ATTACHING) {
206 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
207 return E_OK;
208 } else {
209 return -E_INVALID_ARGS;
210 }
211
212 int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
213 if (errCode != E_OK) {
214 LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
215 return CheckCorruptedStatus(errCode);
216 }
217
218 if (engineState == EngineState::MAINDB) {
219 executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
220 } else if (engineState == EngineState::CACHEDB) {
221 executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
222 } else {
223 return -E_INVALID_ARGS;
224 }
225 LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
226 static_cast<unsigned>(executorState_));
227 return errCode;
228 }
229
GetMaxVersionInCacheDb(uint64_t & maxVersion) const230 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
231 {
232 sqlite3_stmt *statement = nullptr;
233 std::string sql;
234 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
235 sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
236 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
237 sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
238 } else {
239 return -E_INVALID_ARGS;
240 }
241
242 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
243 if (errCode != E_OK) {
244 LOGE("GetStatement fail when get max version in cache db");
245 return CheckCorruptedStatus(errCode);
246 }
247
248 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
249 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
250 maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
251 errCode = E_OK;
252 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
253 maxVersion = 0;
254 errCode = E_OK;
255 }
256 int ret = E_OK;
257 SQLiteUtils::ResetStatement(statement, true, ret);
258 return CheckCorruptedStatus(errCode);
259 }
260
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)261 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
262 {
263 // Put or delete. Prepare notify data here.
264 NotifyConflictAndObserverData notify;
265 notify.committedData = syncData.committedData;
266 int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
267 syncData.isPermitForceWrite);
268 if (errCode != E_OK) {
269 ResetForMigrateCacheData();
270 LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
271 return errCode;
272 }
273 // after solving conflict, the item should not be saved into mainDB
274 if (notify.dataStatus.isDefeated) {
275 LOGE("Data status is defeated:%d", errCode);
276 return errCode;
277 }
278 bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
279 sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
280 if (statement == nullptr) {
281 LOGE("GetStatement fail when put migrating-data to main! ");
282 return -E_INVALID_ARGS;
283 }
284
285 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
286 errCode = EraseSyncData(dataItem.key);
287 goto END;
288 }
289
290 errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
291 if (errCode != E_OK) {
292 goto END;
293 }
294
295 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
296 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
297 errCode = E_OK;
298 } else {
299 LOGD("StepWithRetry fail when put migrating-data to main!");
300 }
301 END:
302 ResetForMigrateCacheData();
303 return errCode;
304 }
305
CheckDataWithQuery(std::vector<DataItem> & dataItems)306 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
307 {
308 int errCode = E_OK;
309 sqlite3_stmt *stmt = nullptr;
310 int ret = E_OK;
311 for (auto &item : dataItems) {
312 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
313 continue;
314 }
315 std::string sql;
316 DBCommon::VectorToString(item.value, sql);
317 if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
318 static const std::string SYNC_DATA_TABLE = "sync_data";
319 static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
320 std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
321 if (startPos != std::string::npos) {
322 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
323 }
324 }
325 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
326 if (errCode != E_OK) {
327 LOGE("Get Check miss query data statement failed. %d", errCode);
328 return errCode;
329 }
330
331 errCode = CheckMissQueryDataItem(stmt, item.dev, item);
332 if (errCode != E_OK) {
333 LOGE("Check miss query data item failed. %d", errCode);
334 break;
335 }
336 SQLiteUtils::ResetStatement(stmt, true, ret);
337 }
338 SQLiteUtils::ResetStatement(stmt, true, ret);
339 return CheckCorruptedStatus(errCode);
340 }
341
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)342 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
343 {
344 syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
345 syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
346 (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
347
348 int errCode = CheckDataWithQuery(dataItems);
349 if (errCode != E_OK) {
350 LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
351 goto END;
352 }
353
354 for (auto &item : dataItems) {
355 // Remove device data owns one version itself.
356 // Get entry here. Prepare notify data in storageEngine.
357 if (syncData.isRemoveDeviceData) {
358 errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
359 if (errCode != E_OK) {
360 LOGE("Failed to get remove devices data");
361 return errCode;
362 }
363 errCode = MigrateRmDevData(item);
364 LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
365 if (errCode != E_OK) {
366 break;
367 }
368 continue;
369 }
370
371 if (item.neglect) { // Do not save this record if it is neglected
372 continue;
373 }
374
375 errCode = MigrateDataItem(item, syncData);
376 if (errCode != E_OK) {
377 LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
378 break;
379 }
380 }
381 END:
382 ResetForMigrateCacheData();
383 return CheckCorruptedStatus(errCode);
384 }
385
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)386 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
387 std::vector<DataItem> &dataItems)
388 {
389 int errCode = StartTransaction(TransactType::IMMEDIATE);
390 if (errCode != E_OK) {
391 return errCode;
392 }
393
394 // Init migrate data.
395 errCode = InitMigrateData();
396 if (errCode != E_OK) {
397 LOGE("Init migrate data failed, errCode = [%d]", errCode);
398 goto END;
399 }
400
401 // fix dataItem timestamp for migrate
402 errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
403 if (errCode != E_OK) {
404 LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
405 goto END;
406 }
407
408 errCode = MigrateDataItems(dataItems, syncData);
409 if (errCode != E_OK) {
410 goto END;
411 }
412
413 // delete recordVersion data
414 errCode = DelCacheDbDataByVersion(recordVer);
415 if (errCode != E_OK) {
416 LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
417 goto END;
418 }
419
420 errCode = Commit();
421 if (errCode != E_OK) {
422 LOGE("Commit data error and rollback, errCode = [%d]", errCode);
423 goto END;
424 }
425 return E_OK;
426 END:
427 Rollback();
428 return errCode;
429 }
430
DelCacheDbDataByVersion(uint64_t version) const431 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
432 {
433 std::string sql;
434 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
435 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
436 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
437 sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
438 } else {
439 return -E_INVALID_ARGS;
440 }
441
442 sqlite3_stmt *statement = nullptr;
443 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
444 if (errCode != E_OK) {
445 LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
446 return errCode;
447 }
448
449 errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
450 if (errCode != E_OK) {
451 LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
452 goto END;
453 }
454
455 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
456 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
457 errCode = E_OK;
458 }
459
460 END:
461 int ret = E_OK;
462 SQLiteUtils::ResetStatement(statement, true, ret);
463 return CheckCorruptedStatus(errCode);
464 }
465
VacuumLocalData() const466 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
467 {
468 std::string sql;
469 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
470 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
471 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
472 sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
473 } else {
474 return -E_INVALID_ARGS;
475 }
476
477 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
478 if (errCode != E_OK) {
479 LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
480 }
481
482 return CheckCorruptedStatus(errCode);
483 }
484
485 // The local table data is only for local reading and writing, which can be sensed by itself.
486 // The current migration process does not provide callback subscription function.
MigrateLocalData()487 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
488 {
489 // Nick name "main" represent current database(dbhande) in sqlite grammar
490 std::string migrateLocaldataSql;
491 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
492 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
493 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
494 migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
495 } else {
496 return -E_INVALID_ARGS;
497 }
498
499 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
500 if (errCode != E_OK) {
501 LOGW("Failed to migrate the local data:%d", errCode);
502 return CheckCorruptedStatus(errCode);
503 }
504
505 return VacuumLocalData();
506 }
507
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const508 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
509 const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
510 {
511 int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
512 if (errCode != E_OK) {
513 LOGE("Bind saved sync data primary key failed:%d", errCode);
514 return errCode;
515 }
516
517 // if delete flag is set, just use the hash key instead of the key
518 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
519 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
520 } else {
521 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
522 }
523
524 if (errCode != E_OK) {
525 LOGE("Bind saved sync data key failed:%d", errCode);
526 return errCode;
527 }
528
529 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
530 if (errCode != E_OK) {
531 LOGE("Bind saved sync data value failed:%d", errCode);
532 return errCode;
533 }
534
535 LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
536 dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
537 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
538 static_cast<int64_t>(dataItem.flag));
539 if (errCode != E_OK) {
540 LOGE("Bind saved sync data flag failed:%d", errCode);
541 return errCode;
542 }
543 errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
544 if (errCode != E_OK) {
545 LOGE("Bind saved sync data time stamp failed:%d", errCode);
546 return errCode;
547 }
548 return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
549 }
550
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const551 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
552 sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
553 {
554 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
555 if (errCode != E_OK) {
556 LOGE("Bind saved sync data hash key failed:%d", errCode);
557 return errCode;
558 }
559 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
560 if (errCode != E_OK) {
561 LOGE("Bind saved sync data version failed:%d", errCode);
562 }
563 return errCode;
564 }
565
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const566 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
567 sqlite3_stmt *statement, const DataItem &dataItem) const
568 {
569 int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
570 if (errCode != E_OK) {
571 LOGE("Bind saved sync data stamp failed:%d", errCode);
572 return errCode;
573 }
574
575 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
576 if (errCode != E_OK) {
577 LOGE("Bind saved sync data write stamp failed:%d", errCode);
578 }
579 return errCode;
580 }
581
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const582 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
583 const std::string &origDev, const std::string &deviceName) const
584 {
585 std::string devName = DBCommon::TransferHashString(deviceName);
586 std::vector<uint8_t> devVect(devName.begin(), devName.end());
587 int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
588 if (errCode != E_OK) {
589 LOGE("Bind dev for sync data failed:%d", errCode);
590 return errCode;
591 }
592
593 std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
594 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
595 if (errCode != E_OK) {
596 LOGE("Bind orig dev for sync data failed:%d", errCode);
597 }
598 return errCode;
599 }
600
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)601 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
602 {
603 int errCode = E_OK;
604 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
605
606 std::string sql;
607 std::string expandedSql;
608 errCode = helper.GetSyncDataCheckSql(sql);
609 if (errCode != E_OK) {
610 LOGE("Get sync data check sql failed");
611 return errCode;
612 }
613 sqlite3_stmt *stmt = nullptr;
614 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
615 if (errCode != E_OK) {
616 LOGE("Get statement fail. %d", errCode);
617 return -E_INVALID_QUERY_FORMAT;
618 }
619
620 errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
621 if (errCode != E_OK) {
622 goto END;
623 }
624
625 errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
626 if (errCode != E_OK) {
627 LOGE("Get expand sql fail. %d", errCode);
628 }
629 DBCommon::StringToVector(expandedSql, dataItem.value);
630 END:
631 int ret = E_OK;
632 SQLiteUtils::ResetStatement(stmt, true, ret);
633 return errCode != E_OK ? errCode : ret;
634 }
635
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)636 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
637 Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
638 {
639 Key hashKey;
640 int errCode = E_OK;
641 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
642 hashKey = dataItem.key;
643 } else {
644 errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
645 if (errCode != E_OK) {
646 return errCode;
647 }
648 }
649
650 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
651 errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
652 if (errCode != E_OK) {
653 LOGE("Get sync data check sql failed. %d", errCode);
654 return errCode;
655 }
656 }
657
658 std::string origDev = dataItem.origDev;
659 if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
660 origDev.clear();
661 }
662 dataItem.dev = deviceInfo.deviceName;
663 dataItem.origDev = origDev;
664 errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
665 if (errCode == E_OK) {
666 maxStamp = std::max(dataItem.timestamp, maxStamp);
667 } else {
668 LOGE("Save sync data to db failed:%d", errCode);
669 }
670 return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
671 }
672
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const673 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
674 const Key &hashKey, uint64_t recordVersion) const
675 {
676 auto statement = saveSyncStatements_.GetDataSaveStatement(false);
677 if (statement == nullptr) {
678 return -E_INVALID_ARGS;
679 }
680 int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
681 if (errCode != E_OK) {
682 return errCode;
683 }
684
685 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
686 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
687 errCode = E_OK;
688 }
689 return errCode;
690 }
691
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const692 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
693 {
694 sqlite3_stmt *statement = nullptr;
695 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
696 if (errCode != E_OK) {
697 goto ERROR;
698 }
699
700 errCode = BindLocalDataInCacheMode(statement, dataItem);
701 if (errCode != E_OK) {
702 goto ERROR;
703 }
704
705 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
706 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
707 errCode = E_OK;
708 }
709
710 ERROR:
711 int ret = E_OK;
712 SQLiteUtils::ResetStatement(statement, true, ret);
713 return CheckCorruptedStatus(errCode);
714 }
715
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const716 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
717 const LocalDataItem &dataItem) const
718 {
719 int errCode = SQLiteUtils::BindBlobToStatement(statement,
720 BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
721 if (errCode != E_OK) {
722 LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
723 return errCode;
724 }
725
726 // if delete flag is set, just use the hash key instead of the key
727 if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
728 errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
729 } else {
730 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
731 }
732
733 if (errCode != E_OK) {
734 LOGE("Bind saved sync data key failed:%d", errCode);
735 return errCode;
736 }
737
738 errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
739 if (errCode != E_OK) {
740 LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
741 return errCode;
742 }
743
744 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
745 if (errCode != E_OK) {
746 LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
747 return errCode;
748 }
749
750 errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
751 static_cast<int64_t>(dataItem.flag));
752 if (errCode != E_OK) {
753 LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
754 return errCode;
755 }
756
757 return E_OK;
758 }
759
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)760 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
761 const DeviceInfo &deviceInfo, NotifyConflictAndObserverData ¬ify, bool isPermitForceWrite)
762 {
763 int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
764 if (errCode != E_OK) {
765 errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
766 if (errCode == -E_IGNORE_DATA) {
767 notify.dataStatus.isDefeated = true;
768 errCode = E_OK;
769 }
770 return errCode;
771 }
772
773 // If delete data, the key is empty.
774 if (isSyncMigrating_ && dataItem.key.empty()) {
775 dataItem.key = notify.getData.key;
776 }
777
778 PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
779 if (notify.dataStatus.isDefeated) {
780 LOGE("Data status is defeated:%d", errCode);
781 return ResetForMigrateCacheData();
782 }
783
784 PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.committedData);
785 return ResetForMigrateCacheData();
786 }
787
GetMinTimestampInCacheDB(Timestamp & minStamp) const788 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
789 {
790 if (dbHandle_ == nullptr) {
791 return E_OK;
792 }
793 std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
794 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
795 SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
796 sqlite3_stmt *statement = nullptr;
797 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
798 if (errCode != E_OK) {
799 goto ERROR;
800 }
801
802 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
803 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
804 minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
805 LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
806 errCode = E_OK;
807 } else {
808 LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
809 }
810
811 ERROR:
812 int ret = E_OK;
813 SQLiteUtils::ResetStatement(statement, true, ret);
814 return errCode != E_OK ? errCode : ret;
815 }
816
InitMigrateTimestampOffset()817 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
818 {
819 // Not first migrate, migrateTimeOffset_ has been set.
820 if (migrateTimeOffset_ != 0) {
821 return E_OK;
822 }
823
824 // Get min timestamp of local data in sync_data, cacheDB.
825 Timestamp minTimeInCache = 0;
826 int errCode = GetMinTimestampInCacheDB(minTimeInCache);
827 if (errCode != E_OK) {
828 return errCode;
829 }
830
831 // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
832 if (minTimeInCache == 0) {
833 migrateTimeOffset_ = -1;
834 LOGI("Time offset during migrating is -1.");
835 return E_OK;
836 }
837
838 // Get max timestamp in mainDB.
839 Timestamp maxTimeInMain = 0;
840 InitCurrentMaxStamp(maxTimeInMain);
841
842 // Get timestamp offset between mainDB and cacheDB.
843 // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
844 // the last data record in the original mainDB after the migration.
845 migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
846 LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
847 " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
848 return E_OK;
849 }
850
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)851 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
852 {
853 if (dataItems.empty()) {
854 LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
855 return -E_INVALID_ARGS;
856 }
857
858 // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
859 int errCode = InitMigrateTimestampOffset();
860 if (errCode != E_OK) {
861 return errCode;
862 }
863
864 // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
865 Timestamp maxTimeInDataItems = 0;
866 for (auto &item : dataItems) {
867 item.timestamp -= migrateTimeOffset_;
868 maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
869 }
870
871 // Update max timestamp in mainDB.
872 maxTimestampInMainDB_ = maxTimeInDataItems;
873 return E_OK;
874 }
875
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const876 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
877 std::vector<Entry> &entries) const
878 {
879 // When removing device data, key is 'remove', value is device name.
880 if (item.key != REMOVE_DEVICE_DATA_KEY) {
881 LOGE("Invalid key. Can not notify remove device data.");
882 return -E_INVALID_ARGS;
883 }
884 if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
885 LOGI("No need to notify remove device data.");
886 return E_OK;
887 }
888 entries.clear();
889 std::string dev;
890 DBCommon::VectorToString(item.value, dev);
891 return GetAllSyncedEntries(dev, entries);
892 }
893
InitMigrateData()894 int SQLiteSingleVerStorageExecutor::InitMigrateData()
895 {
896 // Sync_data already in migrating. Need not to init data.
897 if (isSyncMigrating_) {
898 return E_OK;
899 }
900 ClearMigrateData();
901 std::string querySQL;
902 std::string insertSQL;
903 std::string updateSQL;
904 if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
905 querySQL = SELECT_SYNC_HASH_SQL;
906 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
907 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
908 } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
909 querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
910 insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
911 updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
912 } else {
913 LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
914 return -E_INVALID_DB;
915 }
916 int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
917 if (errCode != E_OK) {
918 LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
919 return errCode;
920 }
921 isSyncMigrating_ = true;
922 return errCode;
923 }
924
ClearMigrateData()925 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
926 {
927 // Reset data.
928 migrateTimeOffset_ = 0;
929 maxTimestampInMainDB_ = 0;
930
931 // Reset statement.
932 int errCode = migrateSyncStatements_.ResetStatement();
933 if (errCode != E_OK) {
934 LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
935 }
936
937 isSyncMigrating_ = false;
938 }
939
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const940 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
941 {
942 if (maxTimestampInMainDB_ == 0) {
943 return -E_NOT_INIT;
944 }
945 maxTimestamp = maxTimestampInMainDB_;
946 return E_OK;
947 }
948
DeleteMetaData(const std::vector<Key> & keys)949 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
950 {
951 sqlite3_stmt *statement = nullptr;
952 const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
953 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
954 if (errCode != E_OK) {
955 return errCode;
956 }
957
958 int ret = E_OK;
959 for (const auto &key : keys) {
960 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
961 if (errCode != E_OK) {
962 break;
963 }
964
965 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
966 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
967 break;
968 }
969 errCode = E_OK;
970 SQLiteUtils::ResetStatement(statement, false, ret);
971 }
972
973 SQLiteUtils::ResetStatement(statement, true, ret);
974 return CheckCorruptedStatus(errCode);
975 }
976
DeleteMetaDataByPrefixKey(const Key & keyPrefix)977 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
978 {
979 sqlite3_stmt *statement = nullptr;
980 const std::string sql = attachMetaMode_ ?
981 REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
982
983 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
984 if (errCode != E_OK) {
985 return errCode;
986 }
987
988 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
989 if (errCode == E_OK) {
990 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
991 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
992 errCode = E_OK;
993 }
994 }
995
996 int ret = E_OK;
997 SQLiteUtils::ResetStatement(statement, true, ret);
998 return CheckCorruptedStatus(errCode);
999 }
1000 } // namespace DistributedDB
1001