• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_executor.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27 
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31     int errCode = -E_NOT_SUPPORT;
32     if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
33         std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34             INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35         std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36             UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37         errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38             saveLocalStatements_);
39     } else if (type == SingleVerDataType::SYNC_TYPE) {
40         std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41             INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42         std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43             UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44         std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45             SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46         errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47     }
48     if (errCode != E_OK) {
49         LOGE("Prepare to save sync cache data failed:%d", errCode);
50     }
51     return CheckCorruptedStatus(errCode);
52 }
53 
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56     int errCode = E_OK;
57     if (type == SingleVerDataType::LOCAL_TYPE_SQLITE) {
58         SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59         SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60         SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61     } else if (type == SingleVerDataType::SYNC_TYPE) {
62         SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63         SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64         SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65     }
66 
67     return CheckCorruptedStatus(errCode);
68 }
69 
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72     int errCode = E_OK;
73     SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74     SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75     SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76 
77     return CheckCorruptedStatus(errCode);
78 }
79 
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81     bool isNeedNotify, uint64_t recordVersion) const
82 {
83     // device name always hash string.
84     std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85 
86     Key hashKey;
87     int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88     if (errCode != E_OK) {
89         return errCode;
90     }
91 
92     DataItem dataItem;
93     dataItem.key = REMOVE_DEVICE_DATA_KEY;
94     dataItem.value = devVect;
95     if (isNeedNotify) {
96         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97     } else {
98         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99     }
100 
101     sqlite3_stmt *statement = nullptr;
102     std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103         INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105     if (errCode != E_OK) {
106         goto ERROR;
107     }
108 
109     errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110     if (errCode != E_OK) {
111         goto ERROR;
112     }
113 
114     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116         LOGE("Failed to execute rm the device synced data:%d", errCode);
117     } else {
118         errCode = E_OK;
119     }
120 
121 ERROR:
122     int ret = E_OK;
123     SQLiteUtils::ResetStatement(statement, true, ret);
124     return CheckCorruptedStatus(errCode);
125 }
126 
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const127 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
128     std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
129 {
130     std::string sql;
131     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
132         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
133     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
134         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
135     } else {
136         return -E_INVALID_ARGS;
137     }
138 
139     sqlite3_stmt *statement = nullptr;
140     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
141     if (errCode != E_OK) {
142         LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
143         return CheckCorruptedStatus(errCode);
144     }
145 
146     errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
147     if (errCode != E_OK) {
148         LOGE("Failed to get all the data items by the min version:[%d]", errCode);
149     }
150 
151     int ret = E_OK;
152     SQLiteUtils::ResetStatement(statement, true, ret);
153     return CheckCorruptedStatus(errCode);
154 }
155 
MigrateRmDevData(const DataItem & dataItem) const156 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
157 {
158     if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
159         LOGE("This item not means remove devices data, can not continue exe!");
160         return -E_INVALID_ARGS;
161     }
162 
163     std::string sql;
164     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
165         sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL : REMOVE_DEV_DATA_SQL;
166     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
167         sql = dataItem.value.empty() ? REMOVE_ALL_DEV_DATA_SQL_FROM_CACHEHANDLE: REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
168     } else {
169         return -E_INVALID_ARGS;
170     }
171 
172     sqlite3_stmt *statement = nullptr;
173     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
174     if (errCode != E_OK) {
175         LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
176         return CheckCorruptedStatus(errCode);
177     }
178 
179     if (!dataItem.value.empty()) {
180         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
181         if (errCode != E_OK) {
182             LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
183             goto END;
184         }
185     }
186 
187     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
188     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
189         errCode = E_OK;
190     }
191 END:
192     int ret = E_OK;
193     SQLiteUtils::ResetStatement(statement, true, ret);
194     return CheckCorruptedStatus(errCode);
195 }
196 
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)197 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
198     const std::string &attachDbAbsPath, EngineState engineState)
199 {
200     std::string attachAsName;
201     if (engineState == EngineState::MAINDB) {
202         attachAsName = "cache";
203     } else if (engineState == EngineState::CACHEDB)  {
204         attachAsName = "maindb";
205     } else if (engineState == EngineState::ATTACHING) {
206         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
207         return E_OK;
208     } else {
209         return -E_INVALID_ARGS;
210     }
211 
212     int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
213     if (errCode != E_OK) {
214         LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
215         return CheckCorruptedStatus(errCode);
216     }
217 
218     if (engineState == EngineState::MAINDB) {
219         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
220     } else if (engineState == EngineState::CACHEDB)  {
221         executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
222     } else {
223         return -E_INVALID_ARGS;
224     }
225     LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
226         static_cast<unsigned>(executorState_));
227     return errCode;
228 }
229 
GetMaxVersionInCacheDb(uint64_t & maxVersion) const230 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
231 {
232     sqlite3_stmt *statement = nullptr;
233     std::string sql;
234     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
235         sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
236     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
237         sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
238     } else {
239         return -E_INVALID_ARGS;
240     }
241 
242     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
243     if (errCode != E_OK) {
244         LOGE("GetStatement fail when get max version in cache db");
245         return CheckCorruptedStatus(errCode);
246     }
247 
248     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
249     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
250         maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
251         errCode = E_OK;
252     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
253         maxVersion = 0;
254         errCode = E_OK;
255     }
256     int ret = E_OK;
257     SQLiteUtils::ResetStatement(statement, true, ret);
258     return CheckCorruptedStatus(errCode);
259 }
260 
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)261 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
262 {
263     // Put or delete. Prepare notify data here.
264     NotifyConflictAndObserverData notify;
265     notify.committedData = syncData.committedData;
266     int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
267         syncData.isPermitForceWrite);
268     if (errCode != E_OK) {
269         ResetForMigrateCacheData();
270         LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
271         return errCode;
272     }
273     // after solving conflict, the item should not be saved into mainDB
274     if (notify.dataStatus.isDefeated) {
275         LOGE("Data status is defeated:%d", errCode);
276         return errCode;
277     }
278     bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
279     sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
280     if (statement == nullptr) {
281         LOGE("GetStatement fail when put migrating-data to main! ");
282         return -E_INVALID_ARGS;
283     }
284 
285     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
286         errCode = EraseSyncData(dataItem.key);
287         goto END;
288     }
289 
290     errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
291     if (errCode != E_OK) {
292         goto END;
293     }
294 
295     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
296     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
297         errCode = E_OK;
298     } else {
299         LOGD("StepWithRetry fail when put migrating-data to main!");
300     }
301 END:
302     ResetForMigrateCacheData();
303     return errCode;
304 }
305 
CheckDataWithQuery(std::vector<DataItem> & dataItems)306 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
307 {
308     int errCode = E_OK;
309     sqlite3_stmt *stmt = nullptr;
310     int ret = E_OK;
311     for (auto &item : dataItems) {
312         if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
313             continue;
314         }
315         std::string sql;
316         DBCommon::VectorToString(item.value, sql);
317         if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
318             static const std::string SYNC_DATA_TABLE = "sync_data";
319             static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
320             std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
321             if (startPos != std::string::npos) {
322                 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
323             }
324         }
325         errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
326         if (errCode != E_OK) {
327             LOGE("Get Check miss query data statement failed. %d", errCode);
328             return errCode;
329         }
330 
331         errCode = CheckMissQueryDataItem(stmt, item.dev, item);
332         if (errCode != E_OK) {
333             LOGE("Check miss query data item failed. %d", errCode);
334             break;
335         }
336         SQLiteUtils::ResetStatement(stmt, true, ret);
337     }
338     SQLiteUtils::ResetStatement(stmt, true, ret);
339     return CheckCorruptedStatus(errCode);
340 }
341 
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)342 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
343 {
344     syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
345     syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
346         (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
347 
348     int errCode = CheckDataWithQuery(dataItems);
349     if (errCode != E_OK) {
350         LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
351         goto END;
352     }
353 
354     for (auto &item : dataItems) {
355         // Remove device data owns one version itself.
356         // Get entry here. Prepare notify data in storageEngine.
357         if (syncData.isRemoveDeviceData) {
358             errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
359             if (errCode != E_OK) {
360                 LOGE("Failed to get remove devices data");
361                 return errCode;
362             }
363             errCode = MigrateRmDevData(item);
364             LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
365             if (errCode != E_OK) {
366                 break;
367             }
368             continue;
369         }
370 
371         if (item.neglect) { // Do not save this record if it is neglected
372             continue;
373         }
374 
375         errCode = MigrateDataItem(item, syncData);
376         if (errCode != E_OK) {
377             LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
378             break;
379         }
380     }
381 END:
382     ResetForMigrateCacheData();
383     return CheckCorruptedStatus(errCode);
384 }
385 
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)386 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
387     std::vector<DataItem> &dataItems)
388 {
389     int errCode = StartTransaction(TransactType::IMMEDIATE);
390     if (errCode != E_OK) {
391         return errCode;
392     }
393 
394     // Init migrate data.
395     errCode = InitMigrateData();
396     if (errCode != E_OK) {
397         LOGE("Init migrate data failed, errCode = [%d]", errCode);
398         goto END;
399     }
400 
401     // fix dataItem timestamp for migrate
402     errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
403     if (errCode != E_OK) {
404         LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
405         goto END;
406     }
407 
408     errCode = MigrateDataItems(dataItems, syncData);
409     if (errCode != E_OK) {
410         goto END;
411     }
412 
413     // delete recordVersion data
414     errCode = DelCacheDbDataByVersion(recordVer);
415     if (errCode != E_OK) {
416         LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
417         goto END;
418     }
419 
420     errCode = Commit();
421     if (errCode != E_OK) {
422         LOGE("Commit data error and rollback, errCode = [%d]", errCode);
423         goto END;
424     }
425     return E_OK;
426 END:
427     Rollback();
428     return errCode;
429 }
430 
DelCacheDbDataByVersion(uint64_t version) const431 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
432 {
433     std::string sql;
434     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
435         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
436     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
437         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
438     } else {
439         return -E_INVALID_ARGS;
440     }
441 
442     sqlite3_stmt *statement = nullptr;
443     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
444     if (errCode != E_OK) {
445         LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
446         return errCode;
447     }
448 
449     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
450     if (errCode != E_OK) {
451         LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
452         goto END;
453     }
454 
455     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
456     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
457         errCode = E_OK;
458     }
459 
460 END:
461     int ret = E_OK;
462     SQLiteUtils::ResetStatement(statement, true, ret);
463     return CheckCorruptedStatus(errCode);
464 }
465 
VacuumLocalData() const466 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
467 {
468     std::string sql;
469     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
470         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
471     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
472         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
473     } else {
474         return -E_INVALID_ARGS;
475     }
476 
477     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
478     if (errCode != E_OK) {
479         LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
480     }
481 
482     return CheckCorruptedStatus(errCode);
483 }
484 
485 // The local table data is only for local reading and writing, which can be sensed by itself.
486 // The current migration process does not provide callback subscription function.
MigrateLocalData()487 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
488 {
489     // Nick name "main" represent current database(dbhande) in sqlite grammar
490     std::string migrateLocaldataSql;
491     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
492         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
493     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
494         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
495     } else {
496         return -E_INVALID_ARGS;
497     }
498 
499     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
500     if (errCode != E_OK) {
501         LOGW("Failed to migrate the local data:%d", errCode);
502         return CheckCorruptedStatus(errCode);
503     }
504 
505     return VacuumLocalData();
506 }
507 
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const508 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
509     const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
510 {
511     int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
512     if (errCode != E_OK) {
513         LOGE("Bind saved sync data primary key failed:%d", errCode);
514         return errCode;
515     }
516 
517     // if delete flag is set, just use the hash key instead of the key
518     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
519         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
520     } else {
521         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
522     }
523 
524     if (errCode != E_OK) {
525         LOGE("Bind saved sync data key failed:%d", errCode);
526         return errCode;
527     }
528 
529     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
530     if (errCode != E_OK) {
531         LOGE("Bind saved sync data value failed:%d", errCode);
532         return errCode;
533     }
534 
535     LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
536         dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
537     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
538         static_cast<int64_t>(dataItem.flag));
539     if (errCode != E_OK) {
540         LOGE("Bind saved sync data flag failed:%d", errCode);
541         return errCode;
542     }
543     errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
544     if (errCode != E_OK) {
545         LOGE("Bind saved sync data time stamp failed:%d", errCode);
546         return errCode;
547     }
548     return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
549 }
550 
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const551 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
552     sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
553 {
554     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
555     if (errCode != E_OK) {
556         LOGE("Bind saved sync data hash key failed:%d", errCode);
557         return errCode;
558     }
559     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
560     if (errCode != E_OK) {
561         LOGE("Bind saved sync data version failed:%d", errCode);
562     }
563     return errCode;
564 }
565 
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const566 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
567     sqlite3_stmt *statement, const DataItem &dataItem) const
568 {
569     int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
570     if (errCode != E_OK) {
571         LOGE("Bind saved sync data stamp failed:%d", errCode);
572         return errCode;
573     }
574 
575     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
576     if (errCode != E_OK) {
577         LOGE("Bind saved sync data write stamp failed:%d", errCode);
578     }
579     return errCode;
580 }
581 
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const582 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
583     const std::string &origDev, const std::string &deviceName) const
584 {
585     std::string devName = DBCommon::TransferHashString(deviceName);
586     std::vector<uint8_t> devVect(devName.begin(), devName.end());
587     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
588     if (errCode != E_OK) {
589         LOGE("Bind dev for sync data failed:%d", errCode);
590         return errCode;
591     }
592 
593     std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
594     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
595     if (errCode != E_OK) {
596         LOGE("Bind orig dev for sync data failed:%d", errCode);
597     }
598     return errCode;
599 }
600 
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)601 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
602 {
603     int errCode = E_OK;
604     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
605 
606     std::string sql;
607     std::string expandedSql;
608     errCode = helper.GetSyncDataCheckSql(sql);
609     if (errCode != E_OK) {
610         LOGE("Get sync data check sql failed");
611         return errCode;
612     }
613     sqlite3_stmt *stmt = nullptr;
614     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
615     if (errCode != E_OK) {
616         LOGE("Get statement fail. %d", errCode);
617         return -E_INVALID_QUERY_FORMAT;
618     }
619 
620     errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
621     if (errCode != E_OK) {
622         goto END;
623     }
624 
625     errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
626     if (errCode != E_OK) {
627         LOGE("Get expand sql fail. %d", errCode);
628     }
629     DBCommon::StringToVector(expandedSql, dataItem.value);
630 END:
631     int ret = E_OK;
632     SQLiteUtils::ResetStatement(stmt, true, ret);
633     return errCode != E_OK ? errCode : ret;
634 }
635 
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)636 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
637     Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
638 {
639     Key hashKey;
640     int errCode = E_OK;
641     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
642         hashKey = dataItem.key;
643     } else {
644         errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
645         if (errCode != E_OK) {
646             return errCode;
647         }
648     }
649 
650     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
651         errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
652         if (errCode != E_OK) {
653             LOGE("Get sync data check sql failed. %d", errCode);
654             return errCode;
655         }
656     }
657 
658     std::string origDev = dataItem.origDev;
659     if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
660         origDev.clear();
661     }
662     dataItem.dev = deviceInfo.deviceName;
663     dataItem.origDev = origDev;
664     errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
665     if (errCode == E_OK) {
666         maxStamp = std::max(dataItem.timestamp, maxStamp);
667     } else {
668         LOGE("Save sync data to db failed:%d", errCode);
669     }
670     return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
671 }
672 
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const673 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
674     const Key &hashKey, uint64_t recordVersion) const
675 {
676     auto statement = saveSyncStatements_.GetDataSaveStatement(false);
677     if (statement == nullptr) {
678         return -E_INVALID_ARGS;
679     }
680     int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
681     if (errCode != E_OK) {
682         return errCode;
683     }
684 
685     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
686     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
687         errCode = E_OK;
688     }
689     return errCode;
690 }
691 
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const692 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
693 {
694     sqlite3_stmt *statement = nullptr;
695     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
696     if (errCode != E_OK) {
697         goto ERROR;
698     }
699 
700     errCode = BindLocalDataInCacheMode(statement, dataItem);
701     if (errCode != E_OK) {
702         goto ERROR;
703     }
704 
705     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
706     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
707         errCode = E_OK;
708     }
709 
710 ERROR:
711     int ret = E_OK;
712     SQLiteUtils::ResetStatement(statement, true, ret);
713     return CheckCorruptedStatus(errCode);
714 }
715 
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const716 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
717     const LocalDataItem &dataItem) const
718 {
719     int errCode = SQLiteUtils::BindBlobToStatement(statement,
720         BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
721     if (errCode != E_OK) {
722         LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
723         return errCode;
724     }
725 
726     // if delete flag is set, just use the hash key instead of the key
727     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
728         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
729     } else {
730         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
731     }
732 
733     if (errCode != E_OK) {
734         LOGE("Bind saved sync data key failed:%d", errCode);
735         return errCode;
736     }
737 
738     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
739     if (errCode != E_OK) {
740         LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
741         return errCode;
742     }
743 
744     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
745     if (errCode != E_OK) {
746         LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
747         return errCode;
748     }
749 
750     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
751         static_cast<int64_t>(dataItem.flag));
752     if (errCode != E_OK) {
753         LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
754         return errCode;
755     }
756 
757     return E_OK;
758 }
759 
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)760 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
761     const DeviceInfo &deviceInfo, NotifyConflictAndObserverData &notify, bool isPermitForceWrite)
762 {
763     int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
764     if (errCode != E_OK) {
765         errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
766         if (errCode == -E_IGNORE_DATA) {
767             notify.dataStatus.isDefeated = true;
768             errCode = E_OK;
769         }
770         return errCode;
771     }
772 
773     // If delete data, the key is empty.
774     if (isSyncMigrating_ && dataItem.key.empty()) {
775         dataItem.key = notify.getData.key;
776     }
777 
778     PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
779     if (notify.dataStatus.isDefeated) {
780         LOGE("Data status is defeated:%d", errCode);
781         return ResetForMigrateCacheData();
782     }
783 
784     PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.committedData);
785     return ResetForMigrateCacheData();
786 }
787 
GetMinTimestampInCacheDB(Timestamp & minStamp) const788 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
789 {
790     if (dbHandle_ == nullptr) {
791         return E_OK;
792     }
793     std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
794         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
795         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
796     sqlite3_stmt *statement = nullptr;
797     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
798     if (errCode != E_OK) {
799         goto ERROR;
800     }
801 
802     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
803     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
804         minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
805         LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
806         errCode = E_OK;
807     } else {
808         LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
809     }
810 
811 ERROR:
812     int ret = E_OK;
813     SQLiteUtils::ResetStatement(statement, true, ret);
814     return errCode != E_OK ? errCode : ret;
815 }
816 
InitMigrateTimestampOffset()817 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
818 {
819     // Not first migrate, migrateTimeOffset_ has been set.
820     if (migrateTimeOffset_ != 0) {
821         return E_OK;
822     }
823 
824     // Get min timestamp of local data in sync_data, cacheDB.
825     Timestamp minTimeInCache = 0;
826     int errCode = GetMinTimestampInCacheDB(minTimeInCache);
827     if (errCode != E_OK) {
828         return errCode;
829     }
830 
831     // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
832     if (minTimeInCache == 0) {
833         migrateTimeOffset_ = -1;
834         LOGI("Time offset during migrating is -1.");
835         return E_OK;
836     }
837 
838     // Get max timestamp in mainDB.
839     Timestamp maxTimeInMain = 0;
840     InitCurrentMaxStamp(maxTimeInMain);
841 
842     // Get timestamp offset between mainDB and cacheDB.
843     // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
844     // the last data record in the original mainDB after the migration.
845     migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
846     LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
847         " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
848     return E_OK;
849 }
850 
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)851 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
852 {
853     if (dataItems.empty()) {
854         LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
855         return -E_INVALID_ARGS;
856     }
857 
858     // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
859     int errCode = InitMigrateTimestampOffset();
860     if (errCode != E_OK) {
861         return errCode;
862     }
863 
864     // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
865     Timestamp maxTimeInDataItems = 0;
866     for (auto &item : dataItems) {
867         item.timestamp -= migrateTimeOffset_;
868         maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
869     }
870 
871     // Update max timestamp in mainDB.
872     maxTimestampInMainDB_ = maxTimeInDataItems;
873     return E_OK;
874 }
875 
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const876 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
877     std::vector<Entry> &entries) const
878 {
879     // When removing device data, key is 'remove', value is device name.
880     if (item.key != REMOVE_DEVICE_DATA_KEY) {
881         LOGE("Invalid key. Can not notify remove device data.");
882         return -E_INVALID_ARGS;
883     }
884     if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
885         LOGI("No need to notify remove device data.");
886         return E_OK;
887     }
888     entries.clear();
889     std::string dev;
890     DBCommon::VectorToString(item.value, dev);
891     return GetAllSyncedEntries(dev, entries);
892 }
893 
InitMigrateData()894 int SQLiteSingleVerStorageExecutor::InitMigrateData()
895 {
896     // Sync_data already in migrating. Need not to init data.
897     if (isSyncMigrating_) {
898         return E_OK;
899     }
900     ClearMigrateData();
901     std::string querySQL;
902     std::string insertSQL;
903     std::string updateSQL;
904     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
905         querySQL = SELECT_SYNC_HASH_SQL;
906         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
907         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
908     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
909         querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
910         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
911         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
912     } else {
913         LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
914         return -E_INVALID_DB;
915     }
916     int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
917     if (errCode != E_OK) {
918         LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
919         return errCode;
920     }
921     isSyncMigrating_ = true;
922     return errCode;
923 }
924 
ClearMigrateData()925 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
926 {
927     // Reset data.
928     migrateTimeOffset_ = 0;
929     maxTimestampInMainDB_ = 0;
930 
931     // Reset statement.
932     int errCode = migrateSyncStatements_.ResetStatement();
933     if (errCode != E_OK) {
934         LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
935     }
936 
937     isSyncMigrating_ = false;
938 }
939 
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const940 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
941 {
942     if (maxTimestampInMainDB_ == 0) {
943         return -E_NOT_INIT;
944     }
945     maxTimestamp = maxTimestampInMainDB_;
946     return E_OK;
947 }
948 
DeleteMetaData(const std::vector<Key> & keys)949 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
950 {
951     sqlite3_stmt *statement = nullptr;
952     const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
953     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
954     if (errCode != E_OK) {
955         return errCode;
956     }
957 
958     int ret = E_OK;
959     for (const auto &key : keys) {
960         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
961         if (errCode != E_OK) {
962             break;
963         }
964 
965         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
966         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
967             break;
968         }
969         errCode = E_OK;
970         SQLiteUtils::ResetStatement(statement, false, ret);
971     }
972 
973     SQLiteUtils::ResetStatement(statement, true, ret);
974     return CheckCorruptedStatus(errCode);
975 }
976 
DeleteMetaDataByPrefixKey(const Key & keyPrefix)977 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
978 {
979     sqlite3_stmt *statement = nullptr;
980     const std::string sql = attachMetaMode_ ?
981         REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
982 
983     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
984     if (errCode != E_OK) {
985         return errCode;
986     }
987 
988     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
989     if (errCode == E_OK) {
990         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
991         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
992             errCode = E_OK;
993         }
994     }
995 
996     int ret = E_OK;
997     SQLiteUtils::ResetStatement(statement, true, ret);
998     return CheckCorruptedStatus(errCode);
999 }
1000 } // namespace DistributedDB
1001