• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_executor.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27 
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31     int errCode = -E_NOT_SUPPORT;
32     if (type == SingleVerDataType::LOCAL_TYPE) {
33         std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34             INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35         std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36             UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37         errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38             saveLocalStatements_);
39     } else if (type == SingleVerDataType::SYNC_TYPE) {
40         std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41             INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42         std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43             UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44         std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45             SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46         errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47     }
48     if (errCode != E_OK) {
49         LOGE("Prepare to save sync cache data failed:%d", errCode);
50     }
51     return CheckCorruptedStatus(errCode);
52 }
53 
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56     int errCode = E_OK;
57     if (type == SingleVerDataType::LOCAL_TYPE) {
58         SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59         SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60         SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61     } else if (type == SingleVerDataType::SYNC_TYPE) {
62         SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63         SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64         SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65     }
66 
67     return CheckCorruptedStatus(errCode);
68 }
69 
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72     int errCode = E_OK;
73     SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74     SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75     SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76 
77     return CheckCorruptedStatus(errCode);
78 }
79 
RemoveDeviceDataInCacheMode(const std::string & deviceName,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &deviceName,
81     bool isNeedNotify, uint64_t recordVersion) const
82 {
83     // Transfer the device name.
84     std::string devName = DBCommon::TransferHashString(deviceName);
85     std::vector<uint8_t> devVect(devName.begin(), devName.end());
86 
87     Key hashKey;
88     int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
89     if (errCode != E_OK) {
90         return errCode;
91     }
92 
93     DataItem dataItem;
94     dataItem.key = REMOVE_DEVICE_DATA_KEY;
95     dataItem.value = devVect;
96     if (isNeedNotify) {
97         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
98     } else {
99         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
100     }
101 
102     sqlite3_stmt *statement = nullptr;
103     std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
104         INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
105     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
106     if (errCode != E_OK) {
107         goto ERROR;
108     }
109 
110     errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
111     if (errCode != E_OK) {
112         goto ERROR;
113     }
114 
115     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
116     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
117         LOGE("Failed to execute rm the device synced data:%d", errCode);
118     } else {
119         errCode = E_OK;
120     }
121 
122 ERROR:
123     SQLiteUtils::ResetStatement(statement, true, errCode);
124     return CheckCorruptedStatus(errCode);
125 }
126 
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const127 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
128     std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
129 {
130     std::string sql;
131     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
132         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
133     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
134         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
135     } else {
136         return -E_INVALID_ARGS;
137     }
138 
139     sqlite3_stmt *statement = nullptr;
140     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
141     if (errCode != E_OK) {
142         LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
143         goto END;
144     }
145 
146     errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
147     if (errCode != E_OK) {
148         LOGE("Failed to get all the data items by the min version:[%d]", errCode);
149     }
150 
151 END:
152     SQLiteUtils::ResetStatement(statement, true, errCode);
153     return CheckCorruptedStatus(errCode);
154 }
155 
MigrateRmDevData(const DataItem & dataItem) const156 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
157 {
158     if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
159         LOGE("This item not means remove devices data, can not continue exe!");
160         return -E_INVALID_ARGS;
161     }
162 
163     std::string sql;
164     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
165         sql = REMOVE_DEV_DATA_SQL;
166     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
167         sql = REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
168     } else {
169         return -E_INVALID_ARGS;
170     }
171 
172     sqlite3_stmt *statement = nullptr;
173     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
174     if (errCode != E_OK) {
175         LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
176         return CheckCorruptedStatus(errCode);
177     }
178 
179     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
180     if (errCode != E_OK) {
181         LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
182         goto END;
183     }
184 
185     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
186     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
187         errCode = E_OK;
188     }
189 END:
190     SQLiteUtils::ResetStatement(statement, true, errCode);
191     return CheckCorruptedStatus(errCode);
192 }
193 
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)194 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
195     const std::string &attachDbAbsPath, EngineState engineState)
196 {
197     std::string attachAsName;
198     if (engineState == EngineState::MAINDB) {
199         attachAsName = "cache";
200     } else if (engineState == EngineState::CACHEDB)  {
201         attachAsName = "maindb";
202     } else if (engineState == EngineState::ATTACHING) {
203         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
204         return E_OK;
205     } else {
206         return -E_INVALID_ARGS;
207     }
208 
209     int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
210     if (errCode != E_OK) {
211         LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
212         return CheckCorruptedStatus(errCode);
213     }
214 
215     if (engineState == EngineState::MAINDB) {
216         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
217     } else if (engineState == EngineState::CACHEDB)  {
218         executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
219     } else {
220         return -E_INVALID_ARGS;
221     }
222     LOGD("[singleVerExecutor][attachDb] current engineState[%d], executorState[%d]", engineState, executorState_);
223 
224     return errCode;
225 }
226 
GetMaxVersionIncacheDb(uint64_t & maxVersion) const227 int SQLiteSingleVerStorageExecutor::GetMaxVersionIncacheDb(uint64_t &maxVersion) const
228 {
229     sqlite3_stmt *statement = nullptr;
230     std::string sql;
231     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
232         sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
233     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
234         sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
235     } else {
236         return -E_INVALID_ARGS;
237     }
238 
239     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
240     if (errCode != E_OK) {
241         LOGE("GetStatement fail when get max version in cache db! errCode = [%d]", errCode);
242         goto END;
243     }
244 
245     do {
246         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
247         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
248             maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
249         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
250             errCode = E_OK;
251             break;
252         } else {
253             LOGE("SQLite step failed:%d", errCode);
254             break;
255         }
256     } while (true);
257 
258 END:
259     SQLiteUtils::ResetStatement(statement, true, errCode);
260     return CheckCorruptedStatus(errCode);
261 }
262 
MigrateDataItem(DataItem & dataItem,NotifyMigrateSyncData & syncData)263 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, NotifyMigrateSyncData &syncData)
264 {
265     // Put or delete. Prepare notify data here.
266     NotifyConflictAndObserverData notify;
267     notify.committedData = syncData.committedData;
268     int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
269         syncData.isPermitForceWrite);
270     if (errCode != E_OK) {
271         ResetForMigrateCacheData();
272         LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
273         return errCode;
274     }
275     // after solving conflict, the item should not be saved into mainDB
276     if (notify.dataStatus.isDefeated) {
277         LOGD("Data status is defeated:%d", errCode);
278         return errCode;
279     }
280     bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
281     sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
282     if (statement == nullptr) {
283         LOGE("GetStatement fail when put migrating-data to main! ");
284         return -E_INVALID_ARGS;
285     }
286 
287     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
288         errCode = EraseSyncData(dataItem.key);
289         goto END;
290     }
291 
292     errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
293     if (errCode != E_OK) {
294         goto END;
295     }
296 
297     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
298     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
299         errCode = E_OK;
300     } else {
301         LOGD("StepWithRetry fail when put migrating-data to main!");
302     }
303 END:
304     ResetForMigrateCacheData();
305     return errCode;
306 }
307 
CheckDataWithQuery(std::vector<DataItem> & dataItems)308 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
309 {
310     int errCode = E_OK;
311     sqlite3_stmt *stmt = nullptr;
312     for (auto &item : dataItems) {
313         if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
314             continue;
315         }
316         std::string sql;
317         DBCommon::VectorToString(item.value, sql);
318         if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
319             static const std::string SYNC_DATA_TABLE = "sync_data";
320             static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
321             std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
322             if (startPos != std::string::npos) {
323                 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
324             }
325         }
326         errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
327         if (errCode != E_OK) {
328             LOGE("Get Check miss query data statement failed. %d", errCode);
329             return errCode;
330         }
331 
332         errCode = CheckMissQueryDataItem(stmt, item.dev, item);
333         if (errCode != E_OK) {
334             LOGE("Check miss query data item failed. %d", errCode);
335             break;
336         }
337         SQLiteUtils::ResetStatement(stmt, true, errCode);
338     }
339     SQLiteUtils::ResetStatement(stmt, true, errCode);
340     return CheckCorruptedStatus(errCode);
341 }
342 
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)343 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
344 {
345     syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
346     syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
347         (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
348 
349     int errCode = CheckDataWithQuery(dataItems);
350     if (errCode != E_OK) {
351         LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
352         goto END;
353     }
354 
355     for (auto &item : dataItems) {
356         // Remove device data owns one version itself.
357         // Get entry here. Prepare notify data in storageEngine.
358         if (syncData.isRemoveDeviceData) {
359             errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
360             if (errCode != E_OK) {
361                 LOGE("Failed to get remove devices data");
362                 return errCode;
363             }
364             errCode = MigrateRmDevData(item);
365             LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
366             if (errCode != E_OK) {
367                 break;
368             }
369             continue;
370         }
371 
372         if (item.neglect) { // Do not save this record if it is neglected
373             continue;
374         }
375 
376         errCode = MigrateDataItem(item, syncData);
377         if (errCode != E_OK) {
378             LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
379             break;
380         }
381     }
382 END:
383     ResetForMigrateCacheData();
384     return CheckCorruptedStatus(errCode);
385 }
386 
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)387 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
388     std::vector<DataItem> &dataItems)
389 {
390     int errCode = StartTransaction(TransactType::IMMEDIATE);
391     if (errCode != E_OK) {
392         return errCode;
393     }
394 
395     // Init migrate data.
396     errCode = InitMigrateData();
397     if (errCode != E_OK) {
398         LOGE("Init migrate data failed, errCode = [%d]", errCode);
399         goto END;
400     }
401 
402     // fix dataItem timestamp for migrate
403     errCode = ProcessTimeStampForSyncDataInCacheDB(dataItems);
404     if (errCode != E_OK) {
405         LOGE("Chang the time stamp for migrate failed! errCode = [%d]", errCode);
406         goto END;
407     }
408 
409     errCode = MigrateDataItems(dataItems, syncData);
410     if (errCode != E_OK) {
411         goto END;
412     }
413 
414     // delete recordVersion data
415     errCode = DelCacheDbDataByVersion(recordVer);
416     if (errCode != E_OK) {
417         LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
418         goto END;
419     }
420 
421     errCode = Commit();
422     if (errCode != E_OK) {
423         LOGE("Commit data error and rollback, errCode = [%d]", errCode);
424         goto END;
425     }
426     return E_OK;
427 END:
428     Rollback();
429     return errCode;
430 }
431 
DelCacheDbDataByVersion(uint64_t version) const432 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
433 {
434     std::string sql;
435     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
436         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
437     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
438         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
439     } else {
440         return -E_INVALID_ARGS;
441     }
442 
443     sqlite3_stmt *statement = nullptr;
444     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
445     if (errCode != E_OK) {
446         LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
447         return errCode;
448     }
449 
450     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
451     if (errCode != E_OK) {
452         LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
453         goto END;
454     }
455 
456     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
457     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
458         errCode = E_OK;
459     }
460 
461 END:
462     SQLiteUtils::ResetStatement(statement, true, errCode);
463     return CheckCorruptedStatus(errCode);
464 }
465 
VacuumLocalData() const466 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
467 {
468     std::string sql;
469     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
470         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
471     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
472         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
473     } else {
474         return -E_INVALID_ARGS;
475     }
476 
477     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
478     if (errCode != E_OK) {
479         LOGE("SQLite sync mode failed: %d", errCode);
480     }
481 
482     return CheckCorruptedStatus(errCode);
483 }
484 
485 // The local table data is only for local reading and writing, which can be sensed by itself.
486 // The current migration process does not provide callback subscription function.
MigrateLocalData()487 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
488 {
489     // Nick name "main" represent current database(dbhande) in sqlite grammar
490     std::string migrateLocaldataSql;
491     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
492         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
493     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
494         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
495     } else {
496         return -E_INVALID_ARGS;
497     }
498 
499     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
500     if (errCode != E_OK) {
501         LOGW("Failed to migrate the local data:%d", errCode);
502         return CheckCorruptedStatus(errCode);
503     }
504 
505     return VacuumLocalData();
506 }
507 
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const508 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
509     const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
510 {
511     int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
512     if (errCode != E_OK) {
513         LOGE("Bind saved sync data primary key failed:%d", errCode);
514         return errCode;
515     }
516 
517     // if delete flag is set, just use the hash key instead of the key
518     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
519         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
520     } else {
521         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
522     }
523 
524     if (errCode != E_OK) {
525         LOGE("Bind saved sync data key failed:%d", errCode);
526         return errCode;
527     }
528 
529     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
530     if (errCode != E_OK) {
531         LOGE("Bind saved sync data value failed:%d", errCode);
532         return errCode;
533     }
534 
535     LOGD("Write timestamp:%llu timestamp%llu, %llu, version %llu", dataItem.writeTimeStamp, dataItem.timeStamp,
536         dataItem.flag, recordVersion);
537     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
538         static_cast<int64_t>(dataItem.flag));
539     if (errCode != E_OK) {
540         LOGE("Bind saved sync data flag failed:%d", errCode);
541         return errCode;
542     }
543     errCode = BindTimeStampSyncDataInCacheMode(statement, dataItem);
544     if (errCode != E_OK) {
545         LOGE("Bind saved sync data time stamp failed:%d", errCode);
546         return errCode;
547     }
548     return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
549 }
550 
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const551 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
552     sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
553 {
554     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
555     if (errCode != E_OK) {
556         LOGE("Bind saved sync data hash key failed:%d", errCode);
557         return errCode;
558     }
559     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
560     if (errCode != E_OK) {
561         LOGE("Bind saved sync data version failed:%d", errCode);
562     }
563     return errCode;
564 }
565 
BindTimeStampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const566 int SQLiteSingleVerStorageExecutor::BindTimeStampSyncDataInCacheMode(
567     sqlite3_stmt *statement, const DataItem &dataItem) const
568 {
569     int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timeStamp);
570     if (errCode != E_OK) {
571         LOGE("Bind saved sync data stamp failed:%d", errCode);
572         return errCode;
573     }
574 
575     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimeStamp);
576     if (errCode != E_OK) {
577         LOGE("Bind saved sync data write stamp failed:%d", errCode);
578     }
579     return errCode;
580 }
581 
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const582 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
583     const std::string &origDev, const std::string &deviceName) const
584 {
585     std::string devName = DBCommon::TransferHashString(deviceName);
586     std::vector<uint8_t> devVect(devName.begin(), devName.end());
587     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
588     if (errCode != E_OK) {
589         LOGE("Bind dev for sync data failed:%d", errCode);
590         return errCode;
591     }
592 
593     std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
594     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
595     if (errCode != E_OK) {
596         LOGE("Bind orig dev for sync data failed:%d", errCode);
597     }
598     return errCode;
599 }
600 
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)601 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
602 {
603     int errCode = E_OK;
604     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
605 
606     std::string sql;
607     std::string expandedSql;
608     errCode = helper.GetSyncDataCheckSql(sql);
609     if (errCode != E_OK) {
610         LOGE("Get sync data check sql failed");
611         return errCode;
612     }
613     sqlite3_stmt *stmt = nullptr;
614     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
615     if (errCode != E_OK) {
616         LOGE("Get statement fail. %d", errCode);
617         return -E_INVALID_QUERY_FORMAT;
618     }
619 
620     errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
621     if (errCode != E_OK) {
622         goto END;
623     }
624 
625     errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
626     if (errCode != E_OK) {
627         LOGE("Get expand sql fail. %d", errCode);
628     }
629     DBCommon::StringToVector(expandedSql, dataItem.value);
630 END:
631     SQLiteUtils::ResetStatement(stmt, true, errCode);
632     return errCode;
633 }
634 
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,TimeStamp & maxStamp,uint64_t recordVersion,const QueryObject & query)635 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
636     TimeStamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
637 {
638     Key hashKey;
639     int errCode = E_OK;
640     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
641         hashKey = dataItem.key;
642     } else {
643         errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
644         if (errCode != E_OK) {
645             return errCode;
646         }
647     }
648 
649     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
650         errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
651         if (errCode != E_OK) {
652             LOGE("Get sync data check sql failed. %d", errCode);
653             return errCode;
654         }
655     }
656 
657     std::string origDev = dataItem.origDev;
658     if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
659         origDev.clear();
660     }
661     dataItem.dev = deviceInfo.deviceName;
662     dataItem.origDev = origDev;
663     errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
664     if (errCode == E_OK) {
665         maxStamp = std::max(dataItem.timeStamp, maxStamp);
666     } else {
667         LOGE("Save sync data to db failed:%d", errCode);
668     }
669     return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
670 }
671 
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const672 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
673     const Key &hashKey, uint64_t recordVersion) const
674 {
675     auto statement = saveSyncStatements_.GetDataSaveStatement(false);
676     if (statement == nullptr) {
677         return -E_INVALID_ARGS;
678     }
679     int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
680     if (errCode != E_OK) {
681         return errCode;
682     }
683 
684     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
685     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
686         errCode = E_OK;
687     }
688     return errCode;
689 }
690 
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const691 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
692 {
693     sqlite3_stmt *statement = nullptr;
694     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
695     if (errCode != E_OK) {
696         goto ERROR;
697     }
698 
699     errCode = BindLocalDataInCacheMode(statement, dataItem);
700     if (errCode != E_OK) {
701         goto ERROR;
702     }
703 
704     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
705     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
706         errCode = E_OK;
707     }
708 
709 ERROR:
710     SQLiteUtils::ResetStatement(statement, true, errCode);
711     return CheckCorruptedStatus(errCode);
712 }
713 
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const714 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
715     const LocalDataItem &dataItem) const
716 {
717     int errCode = SQLiteUtils::BindBlobToStatement(statement,
718         BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
719     if (errCode != E_OK) {
720         LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
721         return errCode;
722     }
723 
724     // if delete flag is set, just use the hash key instead of the key
725     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
726         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
727     } else {
728         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
729     }
730 
731     if (errCode != E_OK) {
732         LOGE("Bind saved sync data key failed:%d", errCode);
733         return errCode;
734     }
735 
736     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
737     if (errCode != E_OK) {
738         LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
739         return errCode;
740     }
741 
742     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timeStamp);
743     if (errCode != E_OK) {
744         LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
745         return errCode;
746     }
747 
748     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
749         static_cast<int64_t>(dataItem.flag));
750     if (errCode != E_OK) {
751         LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
752         return errCode;
753     }
754 
755     return E_OK;
756 }
757 
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)758 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
759     const DeviceInfo &deviceInfo, NotifyConflictAndObserverData &notify, bool isPermitForceWrite)
760 {
761     int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
762     if (errCode != E_OK) {
763         errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
764         if (errCode == -E_IGNORE_DATA) {
765             notify.dataStatus.isDefeated = true;
766             errCode = E_OK;
767         }
768         return errCode;
769     }
770 
771     // If delete data, the key is empty.
772     if (isSyncMigrating_ && dataItem.key.empty()) {
773         dataItem.key = notify.getData.key;
774     }
775 
776     PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
777     if (notify.dataStatus.isDefeated) {
778         LOGD("Data status is defeated:%d", errCode);
779         return ResetForMigrateCacheData();
780     }
781 
782     PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.hashKey, notify.committedData);
783     return ResetForMigrateCacheData();
784 }
785 
GetMinTimestampInCacheDB(TimeStamp & minStamp) const786 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(TimeStamp &minStamp) const
787 {
788     if (dbHandle_ == nullptr) {
789         return E_OK;
790     }
791     std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
792         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
793         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
794     sqlite3_stmt *statement = nullptr;
795     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
796     if (errCode != E_OK) {
797         goto ERROR;
798     }
799 
800     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
801     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
802         minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
803         LOGD("Min time stamp in cacheDB is %llu", minStamp);
804         errCode = E_OK;
805     } else {
806         LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
807     }
808 
809 ERROR:
810     SQLiteUtils::ResetStatement(statement, true, errCode);
811     return errCode;
812 }
813 
InitMigrateTimeStampOffset()814 int SQLiteSingleVerStorageExecutor::InitMigrateTimeStampOffset()
815 {
816     // Not first migrate, migrateTimeOffset_ has been set.
817     if (migrateTimeOffset_ != 0) {
818         return E_OK;
819     }
820 
821     // Get min timestamp of local data in sync_data, cacheDB.
822     TimeStamp minTimeInCache = 0;
823     int errCode = GetMinTimestampInCacheDB(minTimeInCache);
824     if (errCode != E_OK) {
825         return errCode;
826     }
827 
828     // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
829     if (minTimeInCache == 0) {
830         migrateTimeOffset_ = -1;
831         LOGI("Time offset during migrating is -1.");
832         return E_OK;
833     }
834 
835     // Get max timestamp in mainDB.
836     TimeStamp maxTimeInMain = 0;
837     InitCurrentMaxStamp(maxTimeInMain);
838 
839     // Get timestamp offset between mainDB and cacheDB.
840     // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
841     // the last data record in the original mainDB after the migration.
842     migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
843     LOGI("Min timestamp in cacheDB is %llu, max timestamp in mainDB is %llu. Time offset during migrating is %lld.",
844         minTimeInCache, maxTimeInMain, migrateTimeOffset_);
845     return E_OK;
846 }
847 
ProcessTimeStampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)848 int SQLiteSingleVerStorageExecutor::ProcessTimeStampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
849 {
850     if (dataItems.empty()) {
851         LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimeStampForCacheDB] Invalid parameter : dataItems.");
852         return -E_INVALID_ARGS;
853     }
854 
855     // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
856     int errCode = InitMigrateTimeStampOffset();
857     if (errCode != E_OK) {
858         return errCode;
859     }
860 
861     // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
862     TimeStamp maxTimeInDataItems = 0;
863     for (auto &item : dataItems) {
864         item.timeStamp -= migrateTimeOffset_;
865         maxTimeInDataItems = std::max(maxTimeInDataItems, item.timeStamp);
866     }
867 
868     // Update max timestamp in mainDB.
869     maxTimeStampInMainDB_ = maxTimeInDataItems;
870     return E_OK;
871 }
872 
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const873 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
874     std::vector<Entry> &entries) const
875 {
876     // When removing device data, key is 'remove', value is device name.
877     if (item.key != REMOVE_DEVICE_DATA_KEY) {
878         LOGE("Invalid key. Can not notify remove device data.");
879         return -E_INVALID_ARGS;
880     }
881     if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
882         LOGI("No need to notify remove device data.");
883         return E_OK;
884     }
885     entries.clear();
886     std::string dev;
887     DBCommon::VectorToString(item.value, dev);
888     return GetAllSyncedEntries(dev, entries);
889 }
890 
InitMigrateData()891 int SQLiteSingleVerStorageExecutor::InitMigrateData()
892 {
893     // Sync_data already in migrating. Need not to init data.
894     if (isSyncMigrating_) {
895         return E_OK;
896     }
897     ClearMigrateData();
898     std::string querySQL;
899     std::string insertSQL;
900     std::string updateSQL;
901     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
902         querySQL = SELECT_SYNC_HASH_SQL;
903         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
904         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
905     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
906         querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
907         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
908         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
909     } else {
910         LOGE("[InitMigrateData] executor in an error state[%d]!", executorState_);
911         return -E_INVALID_DB;
912     }
913     int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
914     if (errCode != E_OK) {
915         LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
916         return errCode;
917     }
918     isSyncMigrating_ = true;
919     return errCode;
920 }
921 
ClearMigrateData()922 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
923 {
924     // Reset data.
925     migrateTimeOffset_ = 0;
926     maxTimeStampInMainDB_ = 0;
927 
928     // Reset statement.
929     int errCode = migrateSyncStatements_.ResetStatement();
930     if (errCode != E_OK) {
931         LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
932     }
933 
934     isSyncMigrating_ = false;
935 }
936 
GetMaxTimeStampDuringMigrating(TimeStamp & maxTimeStamp) const937 int SQLiteSingleVerStorageExecutor::GetMaxTimeStampDuringMigrating(TimeStamp &maxTimeStamp) const
938 {
939     if (maxTimeStampInMainDB_ == 0) {
940         return -E_NOT_INIT;
941     }
942     maxTimeStamp = maxTimeStampInMainDB_;
943     return E_OK;
944 }
945 
DeleteMetaData(const std::vector<Key> & keys)946 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
947 {
948     sqlite3_stmt *statement = nullptr;
949     const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
950     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
951     if (errCode != E_OK) {
952         return errCode;
953     }
954 
955     for (const auto &key : keys) {
956         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
957         if (errCode != E_OK) {
958             break;
959         }
960 
961         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
962         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
963             break;
964         }
965         errCode = E_OK;
966         SQLiteUtils::ResetStatement(statement, false, errCode);
967     }
968 
969     SQLiteUtils::ResetStatement(statement, true, errCode);
970     return CheckCorruptedStatus(errCode);
971 }
972 
DeleteMetaDataByPrefixKey(const Key & keyPrefix)973 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
974 {
975     sqlite3_stmt *statement = nullptr;
976     const std::string sql = attachMetaMode_ ?
977         REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
978 
979     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
980     if (errCode != E_OK) {
981         return errCode;
982     }
983 
984     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
985     if (errCode == E_OK) {
986         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
987         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
988             errCode = E_OK;
989         }
990     }
991 
992     SQLiteUtils::ResetStatement(statement, true, errCode);
993     return CheckCorruptedStatus(errCode);
994 }
995 } // namespace DistributedDB
996