• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "sqlite_single_ver_storage_executor.h"
17 
18 #include <algorithm>
19 
20 #include "log_print.h"
21 #include "db_constant.h"
22 #include "db_common.h"
23 #include "db_errno.h"
24 #include "parcel.h"
25 #include "runtime_context.h"
26 #include "sqlite_single_ver_storage_executor_sql.h"
27 
28 namespace DistributedDB {
PrepareForSavingCacheData(SingleVerDataType type)29 int SQLiteSingleVerStorageExecutor::PrepareForSavingCacheData(SingleVerDataType type)
30 {
31     int errCode = -E_NOT_SUPPORT;
32     if (type == SingleVerDataType::LOCAL_TYPE) {
33         std::string insertLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
34             INSERT_LOCAL_SQL_FROM_CACHEHANDLE : INSERT_CACHE_LOCAL_SQL);
35         std::string updateLocalSql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
36             UPDATE_LOCAL_SQL_FROM_CACHEHANDLE : UPDATE_CACHE_LOCAL_SQL);
37         errCode = PrepareForSavingData(SELECT_CACHE_LOCAL_HASH_SQL, insertLocalSql, updateLocalSql,
38             saveLocalStatements_);
39     } else if (type == SingleVerDataType::SYNC_TYPE) {
40         std::string insertSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
41             INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL);
42         std::string updateSyncSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
43             UPDATE_CACHE_SYNC_SQL_FROM_MAINHANDLE : UPDATE_CACHE_SYNC_SQL);
44         std::string selectSyncHashSql = ((executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
45             SELECT_CACHE_SYNC_HASH_SQL_FROM_MAINHANDLE : SELECT_CACHE_SYNC_HASH_SQL);
46         errCode = PrepareForSavingData(selectSyncHashSql, insertSyncSql, updateSyncSql, saveSyncStatements_);
47     }
48     if (errCode != E_OK) {
49         LOGE("Prepare to save sync cache data failed:%d", errCode);
50     }
51     return CheckCorruptedStatus(errCode);
52 }
53 
ResetForSavingCacheData(SingleVerDataType type)54 int SQLiteSingleVerStorageExecutor::ResetForSavingCacheData(SingleVerDataType type)
55 {
56     int errCode = E_OK;
57     if (type == SingleVerDataType::LOCAL_TYPE) {
58         SQLiteUtils::ResetStatement(saveLocalStatements_.insertStatement, false, errCode);
59         SQLiteUtils::ResetStatement(saveLocalStatements_.updateStatement, false, errCode);
60         SQLiteUtils::ResetStatement(saveLocalStatements_.queryStatement, false, errCode);
61     } else if (type == SingleVerDataType::SYNC_TYPE) {
62         SQLiteUtils::ResetStatement(saveSyncStatements_.insertStatement, false, errCode);
63         SQLiteUtils::ResetStatement(saveSyncStatements_.updateStatement, false, errCode);
64         SQLiteUtils::ResetStatement(saveSyncStatements_.queryStatement, false, errCode);
65     }
66 
67     return CheckCorruptedStatus(errCode);
68 }
69 
ResetForMigrateCacheData()70 int SQLiteSingleVerStorageExecutor::ResetForMigrateCacheData()
71 {
72     int errCode = E_OK;
73     SQLiteUtils::ResetStatement(migrateSyncStatements_.insertStatement, false, errCode);
74     SQLiteUtils::ResetStatement(migrateSyncStatements_.updateStatement, false, errCode);
75     SQLiteUtils::ResetStatement(migrateSyncStatements_.queryStatement, false, errCode);
76 
77     return CheckCorruptedStatus(errCode);
78 }
79 
RemoveDeviceDataInCacheMode(const std::string & hashDev,bool isNeedNotify,uint64_t recordVersion) const80 int SQLiteSingleVerStorageExecutor::RemoveDeviceDataInCacheMode(const std::string &hashDev,
81     bool isNeedNotify, uint64_t recordVersion) const
82 {
83     // device name always hash string.
84     std::vector<uint8_t> devVect(hashDev.begin(), hashDev.end());
85 
86     Key hashKey;
87     int errCode = DBCommon::CalcValueHash(REMOVE_DEVICE_DATA_KEY, hashKey);
88     if (errCode != E_OK) {
89         return errCode;
90     }
91 
92     DataItem dataItem;
93     dataItem.key = REMOVE_DEVICE_DATA_KEY;
94     dataItem.value = devVect;
95     if (isNeedNotify) {
96         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG;
97     } else {
98         dataItem.flag = DataItem::REMOVE_DEVICE_DATA_FLAG;
99     }
100 
101     sqlite3_stmt *statement = nullptr;
102     std::string sql = (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) ?
103         INSERT_CACHE_SYNC_SQL_FROM_MAINHANDLE : INSERT_CACHE_SYNC_SQL;
104     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
105     if (errCode != E_OK) {
106         goto ERROR;
107     }
108 
109     errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
110     if (errCode != E_OK) {
111         goto ERROR;
112     }
113 
114     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
115     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
116         LOGE("Failed to execute rm the device synced data:%d", errCode);
117     } else {
118         errCode = E_OK;
119     }
120 
121 ERROR:
122     SQLiteUtils::ResetStatement(statement, true, errCode);
123     return CheckCorruptedStatus(errCode);
124 }
125 
GetMinVersionCacheData(std::vector<DataItem> & dataItems,uint64_t & minVerIncurCacheDb) const126 int SQLiteSingleVerStorageExecutor::GetMinVersionCacheData(
127     std::vector<DataItem> &dataItems, uint64_t &minVerIncurCacheDb) const
128 {
129     std::string sql;
130     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
131         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_MAINHANDLE;
132     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
133         sql = MIGRATE_SELECT_MIN_VER_CACHEDATA_FROM_CACHEHANDLE;
134     } else {
135         return -E_INVALID_ARGS;
136     }
137 
138     sqlite3_stmt *statement = nullptr;
139     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
140     if (errCode != E_OK) {
141         LOGE("GetStatement fail when get min version cache data! errCode = [%d]", errCode);
142         goto END;
143     }
144 
145     errCode = GetAllDataItems(statement, dataItems, minVerIncurCacheDb, true);
146     if (errCode != E_OK) {
147         LOGE("Failed to get all the data items by the min version:[%d]", errCode);
148     }
149 
150 END:
151     SQLiteUtils::ResetStatement(statement, true, errCode);
152     return CheckCorruptedStatus(errCode);
153 }
154 
MigrateRmDevData(const DataItem & dataItem) const155 int SQLiteSingleVerStorageExecutor::MigrateRmDevData(const DataItem &dataItem) const
156 {
157     if (dataItem.key != REMOVE_DEVICE_DATA_KEY) {
158         LOGE("This item not means remove devices data, can not continue exe!");
159         return -E_INVALID_ARGS;
160     }
161 
162     std::string sql;
163     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
164         sql = REMOVE_DEV_DATA_SQL;
165     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
166         sql = REMOVE_DEV_DATA_SQL_FROM_CACHEHANDLE;
167     } else {
168         return -E_INVALID_ARGS;
169     }
170 
171     sqlite3_stmt *statement = nullptr;
172     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
173     if (errCode != E_OK) {
174         LOGE("GetStatement fail when remove device data migrating-data to main! errCode = [%d]", errCode);
175         return CheckCorruptedStatus(errCode);
176     }
177 
178     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, dataItem.value, true);
179     if (errCode != E_OK) {
180         LOGE("[singerVerExecutor][MiRmData] Bind dev for sync data failed:%d", errCode);
181         goto END;
182     }
183 
184     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
185     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
186         errCode = E_OK;
187     }
188 END:
189     SQLiteUtils::ResetStatement(statement, true, errCode);
190     return CheckCorruptedStatus(errCode);
191 }
192 
AttachMainDbAndCacheDb(CipherType type,const CipherPassword & passwd,const std::string & attachDbAbsPath,EngineState engineState)193 int SQLiteSingleVerStorageExecutor::AttachMainDbAndCacheDb(CipherType type, const CipherPassword &passwd,
194     const std::string &attachDbAbsPath, EngineState engineState)
195 {
196     std::string attachAsName;
197     if (engineState == EngineState::MAINDB) {
198         attachAsName = "cache";
199     } else if (engineState == EngineState::CACHEDB)  {
200         attachAsName = "maindb";
201     } else if (engineState == EngineState::ATTACHING) {
202         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
203         return E_OK;
204     } else {
205         return -E_INVALID_ARGS;
206     }
207 
208     int errCode = SQLiteUtils::AttachNewDatabase(dbHandle_, type, passwd, attachDbAbsPath, attachAsName);
209     if (errCode != E_OK) {
210         LOGE("handle attach to [%s] fail! errCode = [%d]", attachAsName.c_str(), errCode);
211         return CheckCorruptedStatus(errCode);
212     }
213 
214     if (engineState == EngineState::MAINDB) {
215         executorState_ = ExecutorState::MAIN_ATTACH_CACHE;
216     } else if (engineState == EngineState::CACHEDB)  {
217         executorState_ = ExecutorState::CACHE_ATTACH_MAIN;
218     } else {
219         return -E_INVALID_ARGS;
220     }
221     LOGD("[singleVerExecutor][attachDb] current engineState[%u], executorState[%u]", static_cast<unsigned>(engineState),
222         static_cast<unsigned>(executorState_));
223     return errCode;
224 }
225 
GetMaxVersionInCacheDb(uint64_t & maxVersion) const226 int SQLiteSingleVerStorageExecutor::GetMaxVersionInCacheDb(uint64_t &maxVersion) const
227 {
228     sqlite3_stmt *statement = nullptr;
229     std::string sql;
230     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
231         sql = GET_MAX_VER_CACHEDATA_FROM_MAINHANDLE;
232     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
233         sql = GET_MAX_VER_CACHEDATA_FROM_CACHEHANDLE;
234     } else {
235         return -E_INVALID_ARGS;
236     }
237 
238     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
239     if (errCode != E_OK) {
240         LOGE("GetStatement fail when get max version in cache db");
241         return CheckCorruptedStatus(errCode);
242     }
243 
244     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
245     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
246         maxVersion = static_cast<uint64_t>(sqlite3_column_int64(statement, 0));
247         errCode = E_OK;
248     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
249         maxVersion = 0;
250         errCode = E_OK;
251     }
252     SQLiteUtils::ResetStatement(statement, true, errCode);
253     return CheckCorruptedStatus(errCode);
254 }
255 
MigrateDataItem(DataItem & dataItem,const NotifyMigrateSyncData & syncData)256 int SQLiteSingleVerStorageExecutor::MigrateDataItem(DataItem &dataItem, const NotifyMigrateSyncData &syncData)
257 {
258     // Put or delete. Prepare notify data here.
259     NotifyConflictAndObserverData notify;
260     notify.committedData = syncData.committedData;
261     int errCode = PutIntoConflictAndCommitForMigrateCache(dataItem, {dataItem.dev.empty(), dataItem.dev}, notify,
262         syncData.isPermitForceWrite);
263     if (errCode != E_OK) {
264         ResetForMigrateCacheData();
265         LOGE("PutIntoConflictAndCommitForMigrateCache failed, errCode = %d", errCode);
266         return errCode;
267     }
268     // after solving conflict, the item should not be saved into mainDB
269     if (notify.dataStatus.isDefeated) {
270         LOGD("Data status is defeated");
271         return errCode;
272     }
273     bool isUpdate = notify.dataStatus.preStatus != DataStatus::NOEXISTED;
274     sqlite3_stmt *statement = migrateSyncStatements_.GetDataSaveStatement(isUpdate);
275     if (statement == nullptr) {
276         LOGE("GetStatement fail when put migrating-data to main! ");
277         return -E_INVALID_ARGS;
278     }
279 
280     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
281         errCode = EraseSyncData(dataItem.key);
282         goto END;
283     }
284 
285     errCode = BindSavedSyncData(statement, dataItem, dataItem.hashKey, { dataItem.origDev, dataItem.dev }, isUpdate);
286     if (errCode != E_OK) {
287         goto END;
288     }
289 
290     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
291     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
292         errCode = E_OK;
293     } else {
294         LOGD("StepWithRetry fail when put migrating-data to main!");
295     }
296 END:
297     ResetForMigrateCacheData();
298     return errCode;
299 }
300 
CheckDataWithQuery(std::vector<DataItem> & dataItems)301 int SQLiteSingleVerStorageExecutor::CheckDataWithQuery(std::vector<DataItem> &dataItems)
302 {
303     int errCode = E_OK;
304     sqlite3_stmt *stmt = nullptr;
305     for (auto &item : dataItems) {
306         if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) == 0) {
307             continue;
308         }
309         std::string sql;
310         DBCommon::VectorToString(item.value, sql);
311         if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
312             static const std::string SYNC_DATA_TABLE = "sync_data";
313             static const std::string SYNC_DATA_TABLE_MAIN = "maindb.sync_data";
314             std::string::size_type startPos = sql.find(SYNC_DATA_TABLE);
315             if (startPos != std::string::npos) {
316                 sql.replace(startPos, SYNC_DATA_TABLE.length(), SYNC_DATA_TABLE_MAIN);
317             }
318         }
319         errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
320         if (errCode != E_OK) {
321             LOGE("Get Check miss query data statement failed. %d", errCode);
322             return errCode;
323         }
324 
325         errCode = CheckMissQueryDataItem(stmt, item.dev, item);
326         if (errCode != E_OK) {
327             LOGE("Check miss query data item failed. %d", errCode);
328             break;
329         }
330         SQLiteUtils::ResetStatement(stmt, true, errCode);
331     }
332     SQLiteUtils::ResetStatement(stmt, true, errCode);
333     return CheckCorruptedStatus(errCode);
334 }
335 
MigrateDataItems(std::vector<DataItem> & dataItems,NotifyMigrateSyncData & syncData)336 int SQLiteSingleVerStorageExecutor::MigrateDataItems(std::vector<DataItem> &dataItems, NotifyMigrateSyncData &syncData)
337 {
338     syncData.isRemote = ((dataItems[0].flag & DataItem::LOCAL_FLAG) == 0);
339     syncData.isRemoveDeviceData = (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_FLAG) != 0 ||
340         (dataItems[0].flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) != 0;
341 
342     int errCode = CheckDataWithQuery(dataItems);
343     if (errCode != E_OK) {
344         LOGE("Check migrate data with query failed! errCode = [%d]", errCode);
345         goto END;
346     }
347 
348     for (auto &item : dataItems) {
349         // Remove device data owns one version itself.
350         // Get entry here. Prepare notify data in storageEngine.
351         if (syncData.isRemoveDeviceData) {
352             errCode = GetEntriesForNotifyRemoveDevData(item, syncData.entries);
353             if (errCode != E_OK) {
354                 LOGE("Failed to get remove devices data");
355                 return errCode;
356             }
357             errCode = MigrateRmDevData(item);
358             LOGI("[PutMigratingDataToMain]Execute remove devices data! errCode = [%d]", errCode);
359             if (errCode != E_OK) {
360                 break;
361             }
362             continue;
363         }
364 
365         if (item.neglect) { // Do not save this record if it is neglected
366             continue;
367         }
368 
369         errCode = MigrateDataItem(item, syncData);
370         if (errCode != E_OK) {
371             LOGE("Migrate data item to main db failed! errCode = [%d]", errCode);
372             break;
373         }
374     }
375 END:
376     ResetForMigrateCacheData();
377     return CheckCorruptedStatus(errCode);
378 }
379 
MigrateSyncDataByVersion(uint64_t recordVer,NotifyMigrateSyncData & syncData,std::vector<DataItem> & dataItems)380 int SQLiteSingleVerStorageExecutor::MigrateSyncDataByVersion(uint64_t recordVer, NotifyMigrateSyncData &syncData,
381     std::vector<DataItem> &dataItems)
382 {
383     int errCode = StartTransaction(TransactType::IMMEDIATE);
384     if (errCode != E_OK) {
385         return errCode;
386     }
387 
388     // Init migrate data.
389     errCode = InitMigrateData();
390     if (errCode != E_OK) {
391         LOGE("Init migrate data failed, errCode = [%d]", errCode);
392         goto END;
393     }
394 
395     // fix dataItem timestamp for migrate
396     errCode = ProcessTimestampForSyncDataInCacheDB(dataItems);
397     if (errCode != E_OK) {
398         LOGE("Change the time stamp for migrate failed! errCode = [%d]", errCode);
399         goto END;
400     }
401 
402     errCode = MigrateDataItems(dataItems, syncData);
403     if (errCode != E_OK) {
404         goto END;
405     }
406 
407     // delete recordVersion data
408     errCode = DelCacheDbDataByVersion(recordVer);
409     if (errCode != E_OK) {
410         LOGE("Delete the migrated data in cacheDb! errCode = [%d]", errCode);
411         goto END;
412     }
413 
414     errCode = Commit();
415     if (errCode != E_OK) {
416         LOGE("Commit data error and rollback, errCode = [%d]", errCode);
417         goto END;
418     }
419     return E_OK;
420 END:
421     Rollback();
422     return errCode;
423 }
424 
DelCacheDbDataByVersion(uint64_t version) const425 int SQLiteSingleVerStorageExecutor::DelCacheDbDataByVersion(uint64_t version) const
426 {
427     std::string sql;
428     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
429         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_MAINHANDLE;
430     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
431         sql = MIGRATE_DEL_DATA_BY_VERSION_FROM_CACHEHANDLE;
432     } else {
433         return -E_INVALID_ARGS;
434     }
435 
436     sqlite3_stmt *statement = nullptr;
437     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
438     if (errCode != E_OK) {
439         LOGE("GetStatement fail when delete cache data by version! errCode = [%d]", errCode);
440         return errCode;
441     }
442 
443     errCode = SQLiteUtils::BindInt64ToStatement(statement, 1, static_cast<int64_t>(version));
444     if (errCode != E_OK) {
445         LOGE("[SingleVerExe] Bind destDbNickName error:[%d]", errCode);
446         goto END;
447     }
448 
449     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
450     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
451         errCode = E_OK;
452     }
453 
454 END:
455     SQLiteUtils::ResetStatement(statement, true, errCode);
456     return CheckCorruptedStatus(errCode);
457 }
458 
VacuumLocalData() const459 int SQLiteSingleVerStorageExecutor::VacuumLocalData() const
460 {
461     std::string sql;
462     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
463         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_MAINHANDLE;
464     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
465         sql = MIGRATE_VACUUM_LOCAL_SQL_FROM_CACHEHANDLE;
466     } else {
467         return -E_INVALID_ARGS;
468     }
469 
470     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
471     if (errCode != E_OK) {
472         LOGE("[SingleVerExe] vaccum local data failed: %d", errCode);
473     }
474 
475     return CheckCorruptedStatus(errCode);
476 }
477 
478 // The local table data is only for local reading and writing, which can be sensed by itself.
479 // The current migration process does not provide callback subscription function.
MigrateLocalData()480 int SQLiteSingleVerStorageExecutor::MigrateLocalData()
481 {
482     // Nick name "main" represent current database(dbhande) in sqlite grammar
483     std::string migrateLocaldataSql;
484     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
485         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_MAINHANDLE;
486     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN)  {
487         migrateLocaldataSql = MIGRATE_LOCAL_SQL_FROM_CACHEHANDLE;
488     } else {
489         return -E_INVALID_ARGS;
490     }
491 
492     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, migrateLocaldataSql);
493     if (errCode != E_OK) {
494         LOGW("Failed to migrate the local data:%d", errCode);
495         return CheckCorruptedStatus(errCode);
496     }
497 
498     return VacuumLocalData();
499 }
500 
BindSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const501 int SQLiteSingleVerStorageExecutor::BindSyncDataInCacheMode(sqlite3_stmt *statement,
502     const DataItem &dataItem, const Key &hashKey, uint64_t recordVersion) const
503 {
504     int errCode = BindPrimaryKeySyncDataInCacheMode(statement, hashKey, recordVersion);
505     if (errCode != E_OK) {
506         LOGE("Bind saved sync data primary key failed:%d", errCode);
507         return errCode;
508     }
509 
510     // if delete flag is set, just use the hash key instead of the key
511     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
512         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_SYNC_KEY_INDEX, -1));
513     } else {
514         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_KEY_INDEX, dataItem.key, false);
515     }
516 
517     if (errCode != E_OK) {
518         LOGE("Bind saved sync data key failed:%d", errCode);
519         return errCode;
520     }
521 
522     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_VAL_INDEX, dataItem.value, true);
523     if (errCode != E_OK) {
524         LOGE("Bind saved sync data value failed:%d", errCode);
525         return errCode;
526     }
527 
528     LOGD("Write timestamp:%" PRIu64 " timestamp:%" PRIu64 ", flag:%" PRIu64 ", version:%" PRIu64,
529         dataItem.writeTimestamp, dataItem.timestamp, dataItem.flag, recordVersion);
530     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_FLAG_INDEX,
531         static_cast<int64_t>(dataItem.flag));
532     if (errCode != E_OK) {
533         LOGE("Bind saved sync data flag failed:%d", errCode);
534         return errCode;
535     }
536     errCode = BindTimestampSyncDataInCacheMode(statement, dataItem);
537     if (errCode != E_OK) {
538         LOGE("Bind saved sync data time stamp failed:%d", errCode);
539         return errCode;
540     }
541     return BindDevSyncDataInCacheMode(statement, dataItem.origDev, dataItem.dev);
542 }
543 
BindPrimaryKeySyncDataInCacheMode(sqlite3_stmt * statement,const Key & hashKey,uint64_t recordVersion) const544 int SQLiteSingleVerStorageExecutor::BindPrimaryKeySyncDataInCacheMode(
545     sqlite3_stmt *statement, const Key &hashKey, uint64_t recordVersion) const
546 {
547     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_HASH_KEY_INDEX, hashKey, false);
548     if (errCode != E_OK) {
549         LOGE("Bind saved sync data hash key failed:%d", errCode);
550         return errCode;
551     }
552     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_VERSION_INDEX, recordVersion);
553     if (errCode != E_OK) {
554         LOGE("Bind saved sync data version failed:%d", errCode);
555     }
556     return errCode;
557 }
558 
BindTimestampSyncDataInCacheMode(sqlite3_stmt * statement,const DataItem & dataItem) const559 int SQLiteSingleVerStorageExecutor::BindTimestampSyncDataInCacheMode(
560     sqlite3_stmt *statement, const DataItem &dataItem) const
561 {
562     int errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_STAMP_INDEX, dataItem.timestamp);
563     if (errCode != E_OK) {
564         LOGE("Bind saved sync data stamp failed:%d", errCode);
565         return errCode;
566     }
567 
568     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_SYNC_W_TIME_INDEX, dataItem.writeTimestamp);
569     if (errCode != E_OK) {
570         LOGE("Bind saved sync data write stamp failed:%d", errCode);
571     }
572     return errCode;
573 }
574 
BindDevSyncDataInCacheMode(sqlite3_stmt * statement,const std::string & origDev,const std::string & deviceName) const575 int SQLiteSingleVerStorageExecutor::BindDevSyncDataInCacheMode(sqlite3_stmt *statement,
576     const std::string &origDev, const std::string &deviceName) const
577 {
578     std::string devName = DBCommon::TransferHashString(deviceName);
579     std::vector<uint8_t> devVect(devName.begin(), devName.end());
580     int errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_DEV_INDEX, devVect, true);
581     if (errCode != E_OK) {
582         LOGE("Bind dev for sync data failed:%d", errCode);
583         return errCode;
584     }
585 
586     std::vector<uint8_t> origDevVect(origDev.begin(), origDev.end());
587     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_SYNC_ORI_DEV_INDEX, origDevVect, true);
588     if (errCode != E_OK) {
589         LOGE("Bind orig dev for sync data failed:%d", errCode);
590     }
591     return errCode;
592 }
593 
GetExpandedCheckSql(QueryObject query,DataItem & dataItem)594 int SQLiteSingleVerStorageExecutor::GetExpandedCheckSql(QueryObject query, DataItem &dataItem)
595 {
596     int errCode = E_OK;
597     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
598 
599     std::string sql;
600     std::string expandedSql;
601     errCode = helper.GetSyncDataCheckSql(sql);
602     if (errCode != E_OK) {
603         LOGE("Get sync data check sql failed");
604         return errCode;
605     }
606     sqlite3_stmt *stmt = nullptr;
607     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
608     if (errCode != E_OK) {
609         LOGE("Get statement fail. %d", errCode);
610         return -E_INVALID_QUERY_FORMAT;
611     }
612 
613     errCode = helper.BindSyncDataCheckStmt(stmt, dataItem.key);
614     if (errCode != E_OK) {
615         goto END;
616     }
617 
618     errCode = SQLiteUtils::ExpandedSql(stmt, expandedSql);
619     if (errCode != E_OK) {
620         LOGE("Get expand sql fail. %d", errCode);
621     }
622     DBCommon::StringToVector(expandedSql, dataItem.value);
623 END:
624     SQLiteUtils::ResetStatement(stmt, true, errCode);
625     return errCode;
626 }
627 
SaveSyncDataItemInCacheMode(DataItem & dataItem,const DeviceInfo & deviceInfo,Timestamp & maxStamp,uint64_t recordVersion,const QueryObject & query)628 int SQLiteSingleVerStorageExecutor::SaveSyncDataItemInCacheMode(DataItem &dataItem, const DeviceInfo &deviceInfo,
629     Timestamp &maxStamp, uint64_t recordVersion, const QueryObject &query)
630 {
631     Key hashKey;
632     int errCode = E_OK;
633     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
634         hashKey = dataItem.key;
635     } else {
636         errCode = DBCommon::CalcValueHash(dataItem.key, hashKey);
637         if (errCode != E_OK) {
638             return errCode;
639         }
640     }
641 
642     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
643         errCode = GetExpandedCheckSql(query, dataItem); // record check sql in value for miss query data
644         if (errCode != E_OK) {
645             LOGE("Get sync data check sql failed. %d", errCode);
646             return errCode;
647         }
648     }
649 
650     std::string origDev = dataItem.origDev;
651     if (((dataItem.flag & DataItem::LOCAL_FLAG) != 0) && dataItem.origDev.empty()) {
652         origDev.clear();
653     }
654     dataItem.dev = deviceInfo.deviceName;
655     dataItem.origDev = origDev;
656     errCode = SaveSyncDataToCacheDatabase(dataItem, hashKey, recordVersion);
657     if (errCode == E_OK) {
658         maxStamp = std::max(dataItem.timestamp, maxStamp);
659     } else {
660         LOGE("Save sync data to db failed:%d", errCode);
661     }
662     return ResetForSavingCacheData(SingleVerDataType::SYNC_TYPE);
663 }
664 
SaveSyncDataToCacheDatabase(const DataItem & dataItem,const Key & hashKey,uint64_t recordVersion) const665 int SQLiteSingleVerStorageExecutor::SaveSyncDataToCacheDatabase(const DataItem &dataItem,
666     const Key &hashKey, uint64_t recordVersion) const
667 {
668     auto statement = saveSyncStatements_.GetDataSaveStatement(false);
669     if (statement == nullptr) {
670         return -E_INVALID_ARGS;
671     }
672     int errCode = BindSyncDataInCacheMode(statement, dataItem, hashKey, recordVersion);
673     if (errCode != E_OK) {
674         return errCode;
675     }
676 
677     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
678     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
679         errCode = E_OK;
680     }
681     return errCode;
682 }
683 
PutLocalDataToCacheDB(const LocalDataItem & dataItem) const684 int SQLiteSingleVerStorageExecutor::PutLocalDataToCacheDB(const LocalDataItem &dataItem) const
685 {
686     sqlite3_stmt *statement = nullptr;
687     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_CACHE_LOCAL_SQL, statement);
688     if (errCode != E_OK) {
689         goto ERROR;
690     }
691 
692     errCode = BindLocalDataInCacheMode(statement, dataItem);
693     if (errCode != E_OK) {
694         goto ERROR;
695     }
696 
697     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
698     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
699         errCode = E_OK;
700     }
701 
702 ERROR:
703     SQLiteUtils::ResetStatement(statement, true, errCode);
704     return CheckCorruptedStatus(errCode);
705 }
706 
BindLocalDataInCacheMode(sqlite3_stmt * statement,const LocalDataItem & dataItem) const707 int SQLiteSingleVerStorageExecutor::BindLocalDataInCacheMode(sqlite3_stmt *statement,
708     const LocalDataItem &dataItem) const
709 {
710     int errCode = SQLiteUtils::BindBlobToStatement(statement,
711         BIND_CACHE_LOCAL_HASH_KEY_INDEX, dataItem.hashKey, false);
712     if (errCode != E_OK) {
713         LOGE("[SingleVerExe][BindLocalData]Bind hash key error:%d", errCode);
714         return errCode;
715     }
716 
717     // if delete flag is set, just use the hash key instead of the key
718     if ((dataItem.flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
719         errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_zeroblob(statement, BIND_CACHE_LOCAL_KEY_INDEX, -1));
720     } else {
721         errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_KEY_INDEX, dataItem.key, false);
722     }
723 
724     if (errCode != E_OK) {
725         LOGE("Bind saved sync data key failed:%d", errCode);
726         return errCode;
727     }
728 
729     errCode = SQLiteUtils::BindBlobToStatement(statement, BIND_CACHE_LOCAL_VAL_INDEX, dataItem.value, true);
730     if (errCode != E_OK) {
731         LOGE("[SingleVerExe][BindLocalData]Bind value error:%d", errCode);
732         return errCode;
733     }
734 
735     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_TIMESTAMP_INDEX, dataItem.timestamp);
736     if (errCode != E_OK) {
737         LOGE("[SingleVerExe][BindLocalData]Bind timestamp error:%d", errCode);
738         return errCode;
739     }
740 
741     errCode = SQLiteUtils::BindInt64ToStatement(statement, BIND_CACHE_LOCAL_FLAG_INDEX,
742         static_cast<int64_t>(dataItem.flag));
743     if (errCode != E_OK) {
744         LOGE("[SingleVerExe][BindLocalData]Bind local data flag failed:%d", errCode);
745         return errCode;
746     }
747 
748     return E_OK;
749 }
750 
PutIntoConflictAndCommitForMigrateCache(DataItem & dataItem,const DeviceInfo & deviceInfo,NotifyConflictAndObserverData & notify,bool isPermitForceWrite)751 int SQLiteSingleVerStorageExecutor::PutIntoConflictAndCommitForMigrateCache(DataItem &dataItem,
752     const DeviceInfo &deviceInfo, NotifyConflictAndObserverData &notify, bool isPermitForceWrite)
753 {
754     int errCode = PrepareForNotifyConflictAndObserver(dataItem, deviceInfo, notify, isPermitForceWrite);
755     if (errCode != E_OK) {
756         errCode = (errCode == -E_NOT_FOUND ? E_OK : errCode);
757         if (errCode == -E_IGNORE_DATA) {
758             notify.dataStatus.isDefeated = true;
759             errCode = E_OK;
760         }
761         return errCode;
762     }
763 
764     // If delete data, the key is empty.
765     if (isSyncMigrating_ && dataItem.key.empty()) {
766         dataItem.key = notify.getData.key;
767     }
768 
769     PutConflictData(dataItem, notify.getData, deviceInfo, notify.dataStatus, notify.committedData);
770     if (notify.dataStatus.isDefeated) {
771         LOGD("Data status is defeated:%d", errCode);
772         return ResetForMigrateCacheData();
773     }
774 
775     PutIntoCommittedData(dataItem, notify.getData, notify.dataStatus, notify.hashKey, notify.committedData);
776     return ResetForMigrateCacheData();
777 }
778 
GetMinTimestampInCacheDB(Timestamp & minStamp) const779 int SQLiteSingleVerStorageExecutor::GetMinTimestampInCacheDB(Timestamp &minStamp) const
780 {
781     if (dbHandle_ == nullptr) {
782         return E_OK;
783     }
784     std::string sql = ((executorState_ == ExecutorState::CACHE_ATTACH_MAIN) ?
785         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL :
786         SELECT_NATIVE_MIN_TIMESTAMP_IN_CACHE_SYNC_DATA_SQL_FROM_MAINHANDLE);
787     sqlite3_stmt *statement = nullptr;
788     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
789     if (errCode != E_OK) {
790         goto ERROR;
791     }
792 
793     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
794     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
795         minStamp = static_cast<uint64_t>(sqlite3_column_int64(statement, 0)); // get the first column
796         LOGD("Min time stamp in cacheDB is %" PRIu64, minStamp);
797         errCode = E_OK;
798     } else {
799         LOGE("GetMinTimestampInCacheDB failed, errCode = %d.", errCode);
800     }
801 
802 ERROR:
803     SQLiteUtils::ResetStatement(statement, true, errCode);
804     return errCode;
805 }
806 
InitMigrateTimestampOffset()807 int SQLiteSingleVerStorageExecutor::InitMigrateTimestampOffset()
808 {
809     // Not first migrate, migrateTimeOffset_ has been set.
810     if (migrateTimeOffset_ != 0) {
811         return E_OK;
812     }
813 
814     // Get min timestamp of local data in sync_data, cacheDB.
815     Timestamp minTimeInCache = 0;
816     int errCode = GetMinTimestampInCacheDB(minTimeInCache);
817     if (errCode != E_OK) {
818         return errCode;
819     }
820 
821     // There is no native data in cacheDB, cannot get accurate migrateTimeOffset_ now.
822     if (minTimeInCache == 0) {
823         migrateTimeOffset_ = -1;
824         LOGI("Time offset during migrating is -1.");
825         return E_OK;
826     }
827 
828     // Get max timestamp in mainDB.
829     Timestamp maxTimeInMain = 0;
830     InitCurrentMaxStamp(maxTimeInMain);
831 
832     // Get timestamp offset between mainDB and cacheDB.
833     // The purpose of -1 is to ensure that the first data record in the original cacheDB is 1 greater than
834     // the last data record in the original mainDB after the migration.
835     migrateTimeOffset_ = minTimeInCache - maxTimeInMain - 1;
836     LOGI("Min timestamp in cacheDB is %" PRIu64 ", max timestamp in mainDB is %" PRIu64 ". Time offset during migrating"
837         " is %" PRId64 ".", minTimeInCache, maxTimeInMain, migrateTimeOffset_);
838     return E_OK;
839 }
840 
ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> & dataItems)841 int SQLiteSingleVerStorageExecutor::ProcessTimestampForSyncDataInCacheDB(std::vector<DataItem> &dataItems)
842 {
843     if (dataItems.empty()) {
844         LOGE("[SQLiteSingleVerStorageExecutor::ProcessTimestampForCacheDB] Invalid parameter : dataItems.");
845         return -E_INVALID_ARGS;
846     }
847 
848     // Get the offset between the min timestamp in dataitems and max timestamp in mainDB.
849     int errCode = InitMigrateTimestampOffset();
850     if (errCode != E_OK) {
851         return errCode;
852     }
853 
854     // Set real timestamp for DataItem in dataItems and get the max timestamp in these dataitems.
855     Timestamp maxTimeInDataItems = 0;
856     for (auto &item : dataItems) {
857         item.timestamp -= migrateTimeOffset_;
858         maxTimeInDataItems = std::max(maxTimeInDataItems, item.timestamp);
859     }
860 
861     // Update max timestamp in mainDB.
862     maxTimestampInMainDB_ = maxTimeInDataItems;
863     return E_OK;
864 }
865 
GetEntriesForNotifyRemoveDevData(const DataItem & item,std::vector<Entry> & entries) const866 int SQLiteSingleVerStorageExecutor::GetEntriesForNotifyRemoveDevData(const DataItem &item,
867     std::vector<Entry> &entries) const
868 {
869     // When removing device data, key is 'remove', value is device name.
870     if (item.key != REMOVE_DEVICE_DATA_KEY) {
871         LOGE("Invalid key. Can not notify remove device data.");
872         return -E_INVALID_ARGS;
873     }
874     if ((item.flag & DataItem::REMOVE_DEVICE_DATA_NOTIFY_FLAG) == 0) {
875         LOGI("No need to notify remove device data.");
876         return E_OK;
877     }
878     entries.clear();
879     std::string dev;
880     DBCommon::VectorToString(item.value, dev);
881     return GetAllSyncedEntries(dev, entries);
882 }
883 
InitMigrateData()884 int SQLiteSingleVerStorageExecutor::InitMigrateData()
885 {
886     // Sync_data already in migrating. Need not to init data.
887     if (isSyncMigrating_) {
888         return E_OK;
889     }
890     ClearMigrateData();
891     std::string querySQL;
892     std::string insertSQL;
893     std::string updateSQL;
894     if (executorState_ == ExecutorState::MAIN_ATTACH_CACHE) {
895         querySQL = SELECT_SYNC_HASH_SQL;
896         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_MAINHANDLE;
897         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_MAINHANDLE;
898     } else if (executorState_ == ExecutorState::CACHE_ATTACH_MAIN) {
899         querySQL = SELECT_MAIN_SYNC_HASH_SQL_FROM_CACHEHANDLE;
900         insertSQL = MIGRATE_INSERT_DATA_TO_MAINDB_FROM_CACHEHANDLE;
901         updateSQL = MIGRATE_UPDATE_DATA_TO_MAINDB_FROM_CACHEHANDLE;
902     } else {
903         LOGE("[InitMigrateData] executor in an error state[%u]!", static_cast<unsigned>(executorState_));
904         return -E_INVALID_DB;
905     }
906     int errCode = PrepareForSavingData(querySQL, insertSQL, updateSQL, migrateSyncStatements_);
907     if (errCode != E_OK) {
908         LOGE("Prepare migrateSyncStatements_ fail, errCode = %d", errCode);
909         return errCode;
910     }
911     isSyncMigrating_ = true;
912     return errCode;
913 }
914 
ClearMigrateData()915 void SQLiteSingleVerStorageExecutor::ClearMigrateData()
916 {
917     // Reset data.
918     migrateTimeOffset_ = 0;
919     maxTimestampInMainDB_ = 0;
920 
921     // Reset statement.
922     int errCode = migrateSyncStatements_.ResetStatement();
923     if (errCode != E_OK) {
924         LOGE("Reset migrateSync Statements failed, errCode = %d", errCode);
925     }
926 
927     isSyncMigrating_ = false;
928 }
929 
GetMaxTimestampDuringMigrating(Timestamp & maxTimestamp) const930 int SQLiteSingleVerStorageExecutor::GetMaxTimestampDuringMigrating(Timestamp &maxTimestamp) const
931 {
932     if (maxTimestampInMainDB_ == 0) {
933         return -E_NOT_INIT;
934     }
935     maxTimestamp = maxTimestampInMainDB_;
936     return E_OK;
937 }
938 
DeleteMetaData(const std::vector<Key> & keys)939 int SQLiteSingleVerStorageExecutor::DeleteMetaData(const std::vector<Key> &keys)
940 {
941     sqlite3_stmt *statement = nullptr;
942     const std::string sql = attachMetaMode_ ? REMOVE_ATTACH_META_VALUE_SQL : REMOVE_META_VALUE_SQL;
943     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
944     if (errCode != E_OK) {
945         return errCode;
946     }
947 
948     for (const auto &key : keys) {
949         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
950         if (errCode != E_OK) {
951             break;
952         }
953 
954         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
955         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
956             break;
957         }
958         errCode = E_OK;
959         SQLiteUtils::ResetStatement(statement, false, errCode);
960     }
961 
962     SQLiteUtils::ResetStatement(statement, true, errCode);
963     return CheckCorruptedStatus(errCode);
964 }
965 
DeleteMetaDataByPrefixKey(const Key & keyPrefix)966 int SQLiteSingleVerStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix)
967 {
968     sqlite3_stmt *statement = nullptr;
969     const std::string sql = attachMetaMode_ ?
970         REMOVE_ATTACH_META_VALUE_BY_KEY_PREFIX_SQL : REMOVE_META_VALUE_BY_KEY_PREFIX_SQL;
971 
972     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, statement);
973     if (errCode != E_OK) {
974         return errCode;
975     }
976 
977     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
978     if (errCode == E_OK) {
979         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
980         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
981             errCode = E_OK;
982         }
983     }
984 
985     SQLiteUtils::ResetStatement(statement, true, errCode);
986     return CheckCorruptedStatus(errCode);
987 }
988 } // namespace DistributedDB
989