• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32 
33 namespace DistributedDB {
34 namespace {
35 static constexpr const char* ROWID = "ROWID";
36 static constexpr const char* TIMESTAMP = "TIMESTAMP";
37 static constexpr const char* FLAG = "FLAG";
38 static constexpr const char* DATAKEY = "DATA_KEY";
39 static constexpr const char* DEVICE_FIELD = "DEVICE";
40 static constexpr const char* CLOUD_GID_FIELD = "CLOUD_GID";
41 static constexpr const char* FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
42 static constexpr const char* SET_FLAG_LOCAL = "FLAG | 0x02";    // set 1th bit of flag to one which is local
43 static constexpr const int SET_FLAG_ZERO_MASK = 0x03; // clear 2th bit of flag
44 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
45 static constexpr const int SET_CLOUD_FLAG = 0x05; // set 1th bit of flag to 0
46 static constexpr const int DATA_KEY_INDEX = 0;
47 static constexpr const int TIMESTAMP_INDEX = 3;
48 static constexpr const int W_TIMESTAMP_INDEX = 4;
49 static constexpr const int FLAG_INDEX = 5;
50 static constexpr const int CLOUD_GID_INDEX = 7;
51 
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)52 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
53 {
54     if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
55         return SQLITE_DENY;
56     }
57     return SQLITE_OK;
58 }
59 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)60 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
61     DistributedTableMode mode)
62     : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode)
63 {
64     bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
65     bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
66     bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
67     bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
68     bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
69     bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
70     bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
71 }
72 
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)73 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
74 {
75     std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
76     if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
77         LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
78         return -E_NOT_SUPPORT;
79     }
80 
81     if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
82         if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
83             LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
84             return -E_NOT_SUPPORT;
85         }
86 
87         if (DBCommon::HasConstraint(trimedSql, "ON CONFLICT", " )", " ")) {
88             LOGE("[CreateDistributedTable] Not support create distributed table with 'ON CONFLICT' constraint.");
89             return -E_NOT_SUPPORT;
90         }
91 
92         if (mode == DistributedTableMode::COLLABORATION) {
93             if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
94                 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
95                 return -E_NOT_SUPPORT;
96             }
97         }
98 
99         if (syncType == CLOUD_COOPERATION) {
100             int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
101             if (errCode != E_OK) {
102                 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
103                 return errCode;
104             }
105         }
106     }
107 
108     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
109         if (table.GetPrimaryKey().size() > 1) {
110             LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
111             return -E_NOT_SUPPORT;
112         }
113     }
114 
115     return E_OK;
116 }
117 
118 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)119 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
120 {
121     std::string sql = "SELECT get_sys_time(0) - max(rowid) - 1 FROM '" + tableName + "';";
122     sqlite3_stmt *stmt = nullptr;
123     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
124     if (errCode != E_OK) {
125         return errCode;
126     }
127     errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
128     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
129         timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
130         errCode = E_OK;
131     }
132     SQLiteUtils::ResetStatement(stmt, true, errCode);
133     return errCode;
134 }
135 }
136 
GeneLogInfoForExistedData(sqlite3 * db,const std::string & tableName,const TableInfo & table,const std::string & calPrimaryKeyHash)137 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName,
138     const TableInfo &table, const std::string &calPrimaryKeyHash)
139 {
140     int64_t timeOffset = 0;
141     int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
142     if (errCode != E_OK) {
143         return errCode;
144     }
145     std::string timeOffsetStr = std::to_string(timeOffset);
146     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
147     std::string sql = "INSERT INTO " + logTable +
148         " SELECT rowid, '', '', " + timeOffsetStr + " + rowid, " + timeOffsetStr + " + rowid, 0x2, " +
149         calPrimaryKeyHash + ", ''" + " FROM '" + tableName + "' AS a WHERE 1=1;";
150     return SQLiteUtils::ExecuteRawSQL(db, sql);
151 }
152 
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table,TableSyncType syncType)153 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
154     const std::string &identity, TableInfo &table, TableSyncType syncType)
155 {
156     if (dbHandle_ == nullptr) {
157         return -E_INVALID_DB;
158     }
159 
160     const std::string tableName = table.GetTableName();
161     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
162     if (errCode != E_OK) {
163         LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
164         return errCode;
165     }
166 
167     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
168         bool isEmpty = false;
169         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
170         if (errCode != E_OK || !isEmpty) {
171             LOGE("[CreateDistributedTable] check table empty failed. error=%d, isEmpty=%d", errCode, isEmpty);
172             return -E_NOT_SUPPORT;
173         }
174     }
175 
176     errCode = CheckTableConstraint(table, mode, syncType);
177     if (errCode != E_OK) {
178         LOGE("[CreateDistributedTable] check table constraint failed.");
179         return errCode;
180     }
181 
182     // create log table
183     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
184     errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
185     if (errCode != E_OK) {
186         LOGE("[CreateDistributedTable] create log table failed");
187         return errCode;
188     }
189 
190     if (!isUpgraded) {
191         std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, identity);
192         errCode = GeneLogInfoForExistedData(dbHandle_, tableName, table, calPrimaryKeyHash);
193         if (errCode != E_OK) {
194             return errCode;
195         }
196     }
197 
198     // add trigger
199     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
200     if (errCode != E_OK) {
201         LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
202         return errCode;
203     }
204     return SetLogTriggerStatus(true);
205 }
206 
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)207 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
208     DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
209 {
210     if (dbHandle_ == nullptr) {
211         return -E_INVALID_DB;
212     }
213     TableInfo newTableInfo;
214     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
215     if (errCode != E_OK) {
216         LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
217         return errCode;
218     }
219 
220     if (CheckTableConstraint(newTableInfo, mode, syncType)) {
221         LOGE("[UpgradeDistributedTable] Not support create distributed table without rowid.");
222         return -E_NOT_SUPPORT;
223     }
224 
225     // new table should has same or compatible upgrade
226     TableInfo tableInfo = schema.GetTable(tableName);
227     errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
228     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
229         LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
230         return -E_SCHEMA_MISMATCH;
231     } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
232         LOGD("[UpgradeDistributedTable] schema has not changed.");
233         return E_OK;
234     }
235 
236     schemaChanged = true;
237     errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
238     if (errCode != E_OK) {
239         LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
240     }
241 
242     schema.AddRelationalTable(newTableInfo);
243     return errCode;
244 }
245 
246 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)247 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
248     std::vector<std::string> &deviceTables)
249 {
250     if (device.empty() && tableName.empty()) { // device and table name should not both be empty
251         return -E_INVALID_ARGS;
252     }
253     std::string devicePattern = device.empty() ? "%" : device;
254     std::string tablePattern = tableName.empty() ? "%" : tableName;
255     std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
256 
257     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
258         deviceTableName + "';";
259     sqlite3_stmt *stmt = nullptr;
260     int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
261     if (errCode != E_OK) {
262         SQLiteUtils::ResetStatement(stmt, true, errCode);
263         return errCode;
264     }
265 
266     do {
267         errCode = SQLiteUtils::StepWithRetry(stmt, false);
268         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
269             errCode = E_OK;
270             break;
271         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
272             LOGE("Get table name failed. %d", errCode);
273             break;
274         }
275         std::string realTableName;
276         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
277         if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
278             continue;
279         }
280         if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
281             continue;
282         }
283         deviceTables.emplace_back(realTableName);
284     } while (true);
285 
286     SQLiteUtils::ResetStatement(stmt, true, errCode);
287     return errCode;
288 }
289 
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)290 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
291 {
292     std::vector<FieldInfo> fields;
293     auto itOld = oldTableInfo.GetFields().begin();
294     auto itNew = newTableInfo.GetFields().begin();
295     for (; itNew != newTableInfo.GetFields().end(); itNew++) {
296         if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
297             fields.emplace_back(itNew->second);
298             continue;
299         }
300         itOld++;
301     }
302     return fields;
303 }
304 
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)305 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
306 {
307     if (db == nullptr) {
308         return -E_INVALID_ARGS;
309     }
310 
311     std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
312         return a.GetColumnId()< b.GetColumnId();
313     });
314     int errCode = E_OK;
315     for (const auto &table : tables) {
316         for (const auto &field : fields) {
317             std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
318             alterSql += "'" + field.GetDataType() + "'";
319             alterSql += field.IsNotNull() ? " NOT NULL" : "";
320             alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
321             alterSql += ";";
322             errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
323             if (errCode != E_OK) {
324                 LOGE("Alter table failed. %d", errCode);
325                 break;
326             }
327         }
328     }
329     return errCode;
330 }
331 
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)332 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
333 {
334     IndexInfoMap indexes;
335     auto itOld = oldTableInfo.GetIndexDefine().begin();
336     auto itNew = newTableInfo.GetIndexDefine().begin();
337     auto itOldEnd = oldTableInfo.GetIndexDefine().end();
338     auto itNewEnd = newTableInfo.GetIndexDefine().end();
339 
340     while (itOld != itOldEnd && itNew != itNewEnd) {
341         if (itOld->first == itNew->first) {
342             if (itOld->second != itNew->second) {
343                 indexes.insert({itNew->first, itNew->second});
344             }
345             itOld++;
346             itNew++;
347         } else if (itOld->first < itNew->first) {
348             indexes.insert({itOld->first, {}});
349             itOld++;
350         } else if (itOld->first > itNew->first) {
351             indexes.insert({itNew->first, itNew->second});
352             itNew++;
353         }
354     }
355 
356     while (itOld != itOldEnd) {
357         indexes.insert({itOld->first, {}});
358         itOld++;
359     }
360 
361     while (itNew != itNewEnd) {
362         indexes.insert({itNew->first, itNew->second});
363         itNew++;
364     }
365 
366     return indexes;
367 }
368 
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)369 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
370 {
371     if (db == nullptr) {
372         return -E_INVALID_ARGS;
373     }
374 
375     int errCode = E_OK;
376     for (const auto &table : tables) {
377         for (const auto &index : indexes) {
378             if (index.first.empty()) {
379                 continue;
380             }
381             std::string realIndexName = table + "_" + index.first;
382             std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
383             errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
384             if (errCode != E_OK) {
385                 LOGE("Drop index failed. %d", errCode);
386                 return errCode;
387             }
388 
389             if (index.second.empty()) { // empty means drop index only
390                 continue;
391             }
392 
393             auto it = index.second.begin();
394             std::string indexDefine = *it++;
395             while (it != index.second.end()) {
396                 indexDefine += ", " + *it++;
397             }
398             std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
399                 "(" + indexDefine + ");";
400             errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
401             if (errCode != E_OK) {
402                 LOGE("Create index failed. %d", errCode);
403                 break;
404             }
405         }
406     }
407     return errCode;
408 }
409 }
410 
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)411 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
412     const TableInfo &newTableInfo)
413 {
414     std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
415     IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
416     std::vector<std::string> deviceTables;
417     int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
418     if (errCode != E_OK) {
419         LOGE("Get device table name for alter table failed. %d", errCode);
420         return errCode;
421     }
422 
423     LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
424         upgradeIndexes.size(), deviceTables.size());
425     errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
426     if (errCode != E_OK) {
427         LOGE("upgrade fields failed. %d", errCode);
428         return errCode;
429     }
430 
431     errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
432     if (errCode != E_OK) {
433         LOGE("upgrade indexes failed. %d", errCode);
434     }
435 
436     return E_OK;
437 }
438 
StartTransaction(TransactType type)439 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
440 {
441     if (dbHandle_ == nullptr) {
442         LOGE("Begin transaction failed, dbHandle is null.");
443         return -E_INVALID_DB;
444     }
445     int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
446     if (errCode != E_OK) {
447         LOGE("Begin transaction failed, errCode = %d", errCode);
448     }
449     return errCode;
450 }
451 
Commit()452 int SQLiteSingleVerRelationalStorageExecutor::Commit()
453 {
454     if (dbHandle_ == nullptr) {
455         return -E_INVALID_DB;
456     }
457 
458     return SQLiteUtils::CommitTransaction(dbHandle_);
459 }
460 
Rollback()461 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
462 {
463     if (dbHandle_ == nullptr) {
464         return -E_INVALID_DB;
465     }
466     int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
467     if (errCode != E_OK) {
468         LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
469     }
470     return errCode;
471 }
472 
SetTableInfo(const TableInfo & tableInfo)473 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
474 {
475     table_ = tableInfo;
476 }
477 
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)478 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
479 {
480     logInfo.dataKey = sqlite3_column_int64(logStatement, 0);  // 0 means dataKey index
481 
482     std::vector<uint8_t> dev;
483     int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev);  // 1 means dev index
484     if (errCode != E_OK) {
485         return errCode;
486     }
487     logInfo.device = std::string(dev.begin(), dev.end());
488 
489     std::vector<uint8_t> oriDev;
490     errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev);  // 2 means ori_dev index
491     if (errCode != E_OK) {
492         return errCode;
493     }
494     logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
495     logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3));  // 3 means timestamp index
496     logInfo.wTimestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4));  // 4 means w_timestamp index
497     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5));  // 5 means flag index
498     logInfo.flag &= (~DataItem::LOCAL_FLAG);
499     logInfo.flag &= (~DataItem::UPDATE_FLAG);
500     return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey);  // 6 means hashKey index
501 }
502 
503 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize)504 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize)
505 {
506     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD,
507         static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
508     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
509         static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
510     totalSize += sizeof(int64_t) + sizeof(int64_t);
511     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
512         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
513             sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
514         if (!cloudGid.empty()) {
515             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
516             totalSize += cloudGid.size();
517         }
518     }
519 }
520 
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)521 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
522 {
523     flags.insert_or_assign(ROWID,
524         static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
525     flags.insert_or_assign(TIMESTAMP,
526         static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
527     flags.insert_or_assign(FLAG,
528         static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
529 }
530 
IdentifyCloudType(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)531 int IdentifyCloudType(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
532 {
533     int64_t *rowid = std::get_if<int64_t>(&flags[ROWID]);
534     int64_t *flag = std::get_if<int64_t>(&flags[FLAG]);
535     int64_t *timeStamp = std::get_if<int64_t>(&flags[TIMESTAMP]);
536     if (rowid == nullptr || flag == nullptr || timeStamp == nullptr) {
537         return -E_INVALID_DATA;
538     }
539     if ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0) {
540         cloudSyncData.delData.record.push_back(data);
541         cloudSyncData.delData.extend.push_back(log);
542     } else if (log.find(CloudDbConstant::GID_FIELD) == log.end()) {
543         if (data.empty()) {
544             LOGE("The cloud data to be inserted is empty.");
545             return -E_INVALID_DATA;
546         }
547         cloudSyncData.insData.record.push_back(data);
548         cloudSyncData.insData.rowid.push_back(*rowid);
549         VBucket asset;
550         CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
551         cloudSyncData.insData.timestamp.push_back(*timeStamp);
552         cloudSyncData.insData.assets.push_back(asset);
553         cloudSyncData.insData.extend.push_back(log);
554     } else {
555         if (data.empty()) {
556             LOGE("The cloud data to be updated is empty.");
557             return -E_INVALID_DATA;
558         }
559         cloudSyncData.updData.record.push_back(data);
560         VBucket asset;
561         CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
562         if (!asset.empty()) {
563             cloudSyncData.updData.rowid.push_back(*rowid);
564             cloudSyncData.updData.timestamp.push_back(*timeStamp);
565             cloudSyncData.updData.assets.push_back(asset);
566         }
567         cloudSyncData.updData.extend.push_back(log);
568     }
569     return E_OK;
570 }
571 }
572 
GetDataItemSerialSize(DataItem & item,size_t appendLen)573 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
574 {
575     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
576     // the size would not be very large.
577     static const size_t maxOrigDevLength = 40;
578     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
579     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
580         Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
581     return dataSize;
582 }
583 
GetKvData(const Key & key,Value & value) const584 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
585 {
586     static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
587         "metadata WHERE key=?;";
588     sqlite3_stmt *statement = nullptr;
589     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
590     if (errCode != E_OK) {
591         goto END;
592     }
593 
594     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
595     if (errCode != E_OK) {
596         goto END;
597     }
598 
599     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
600     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
601         errCode = -E_NOT_FOUND;
602         goto END;
603     } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
604         goto END;
605     }
606 
607     errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
608     END:
609     SQLiteUtils::ResetStatement(statement, true, errCode);
610     return errCode;
611 }
612 
PutKvData(const Key & key,const Value & value) const613 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
614 {
615     static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
616         "metadata VALUES(?,?);";
617     sqlite3_stmt *statement = nullptr;
618     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
619     if (errCode != E_OK) {
620         goto ERROR;
621     }
622 
623     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);  // 1 means key index
624     if (errCode != E_OK) {
625         LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
626         goto ERROR;
627     }
628 
629     errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true);  // 2 means value index
630     if (errCode != E_OK) {
631         LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
632         goto ERROR;
633     }
634     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
635     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
636         errCode = E_OK;
637     }
638 ERROR:
639     SQLiteUtils::ResetStatement(statement, true, errCode);
640     return errCode;
641 }
642 
DeleteMetaData(const std::vector<Key> & keys) const643 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
644 {
645     static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
646         "metadata WHERE key=?;";
647     sqlite3_stmt *statement = nullptr;
648     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
649     if (errCode != E_OK) {
650         return errCode;
651     }
652 
653     for (const auto &key : keys) {
654         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
655         if (errCode != E_OK) {
656             break;
657         }
658 
659         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
660         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
661             break;
662         }
663         errCode = E_OK;
664         SQLiteUtils::ResetStatement(statement, false, errCode);
665     }
666     SQLiteUtils::ResetStatement(statement, true, errCode);
667     return CheckCorruptedStatus(errCode);
668 }
669 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const670 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
671 {
672     static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
673         "metadata WHERE key>=? AND key<=?;";
674     sqlite3_stmt *statement = nullptr;
675     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
676     if (errCode != E_OK) {
677         return errCode;
678     }
679 
680     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
681     if (errCode == E_OK) {
682         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
683         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
684             errCode = E_OK;
685         }
686     }
687     SQLiteUtils::ResetStatement(statement, true, errCode);
688     return CheckCorruptedStatus(errCode);
689 }
690 
GetAllMetaKeys(std::vector<Key> & keys) const691 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
692 {
693     static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
694     sqlite3_stmt *statement = nullptr;
695     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
696     if (errCode != E_OK) {
697         LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
698         return errCode;
699     }
700     errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
701     SQLiteUtils::ResetStatement(statement, true, errCode);
702     return errCode;
703 }
704 
GetLogInfoPre(sqlite3_stmt * queryStmt,const DataItem & dataItem,LogInfo & logInfoGet)705 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem,
706     LogInfo &logInfoGet)
707 {
708     if (queryStmt == nullptr) {
709         return -E_INVALID_ARGS;
710     }
711     int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey);  // 1 means hashkey index.
712     if (errCode != E_OK) {
713         return errCode;
714     }
715     if (mode_ != DistributedTableMode::COLLABORATION) {
716         errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev);  // 2 means device index.
717         if (errCode != E_OK) {
718             return errCode;
719         }
720     }
721 
722     errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
723     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
724         errCode = -E_NOT_FOUND;
725     } else {
726         errCode = GetLogData(queryStmt, logInfoGet);
727     }
728     return errCode;
729 }
730 
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)731 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
732     const DataItem &dataItem, int64_t rowid)
733 {
734     LogInfo logInfoGet;
735     int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
736     LogInfo logInfoBind;
737     logInfoBind.hashKey = dataItem.hashKey;
738     logInfoBind.device = dataItem.dev;
739     logInfoBind.timestamp = dataItem.timestamp;
740     logInfoBind.flag = dataItem.flag;
741 
742     if (errCode == -E_NOT_FOUND) { // insert
743         logInfoBind.wTimestamp = dataItem.writeTimestamp;
744         logInfoBind.originDev = dataItem.dev;
745     } else if (errCode == E_OK) { // update
746         logInfoBind.wTimestamp = logInfoGet.wTimestamp;
747         logInfoBind.originDev = logInfoGet.originDev;
748     } else {
749         return errCode;
750     }
751 
752     // bind
753     SQLiteUtils::BindInt64ToStatement(statement, 1, rowid);  // 1 means dataKey index
754     std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
755     SQLiteUtils::BindBlobToStatement(statement, 2, originDev);  // 2 means ori_dev index
756     SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp);  // 3 means timestamp index
757     SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp);  // 4 means w_timestamp index
758     SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag);  // 5 means flag index
759     SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey);  // 6 means hashKey index
760     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
761     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
762         return E_OK;
763     }
764     return errCode;
765 }
766 
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)767 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
768     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
769 {
770     if (stmt == nullptr) {
771         int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
772         if (errCode != E_OK) {
773             LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
774             return errCode;
775         }
776     }
777 
778     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
779     if (errCode != E_OK) {
780         SQLiteUtils::ResetStatement(stmt, true, errCode);
781         return errCode;
782     }
783     if (mode_ != DistributedTableMode::COLLABORATION) {
784         errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
785         if (errCode != E_OK) {
786             SQLiteUtils::ResetStatement(stmt, true, errCode);
787             return errCode;
788         }
789     }
790     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
791     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
792         errCode = E_OK;
793     }
794     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
795     return errCode;
796 }
797 
SaveSyncDataItem(const DataItem & dataItem,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)798 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt,
799     RelationalSyncDataInserter &inserter, int64_t &rowid)
800 {
801     if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
802         return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
803     }
804     if ((mode_ == DistributedTableMode::COLLABORATION && inserter.GetLocalTable().GetIdentifyKey().size() == 1u &&
805         inserter.GetLocalTable().GetIdentifyKey().at(0) == "rowid") ||
806         (mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().GetPrimaryKey().size() == 1u &&
807         inserter.GetLocalTable().GetPrimaryKey().at(0) == "rowid") ||
808         inserter.GetLocalTable().GetAutoIncrement()) {  // No primary key of auto increment
809         int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
810         if (errCode != E_OK) {
811             LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
812             return errCode;
813         }
814     }
815 
816     int errCode = inserter.BindInsertStatement(saveStmt.saveDataStmt, dataItem);
817     if (errCode != E_OK) {
818         LOGE("Bind data failed, errCode=%d.", errCode);
819         return errCode;
820     }
821     errCode = SQLiteUtils::StepWithRetry(saveStmt.saveDataStmt, isMemDb_);
822     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
823         rowid = SQLiteUtils::GetLastRowId(dbHandle_);
824         errCode = E_OK;
825     }
826     return errCode;
827 }
828 
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)829 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
830     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
831 {
832     if (stmt == nullptr) {
833         int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
834         if (errCode != E_OK) {
835             LOGE("[DeleteSyncLog] Get statement fail!");
836             return errCode;
837         }
838     }
839 
840     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
841     if (errCode != E_OK) {
842         SQLiteUtils::ResetStatement(stmt, true, errCode);
843         return errCode;
844     }
845     if (mode_ != DistributedTableMode::COLLABORATION) {
846         errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
847         if (errCode != E_OK) {
848             SQLiteUtils::ResetStatement(stmt, true, errCode);
849             return errCode;
850         }
851     }
852     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
853     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
854         errCode = E_OK;
855     }
856     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
857     return errCode;
858 }
859 
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)860 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
861     RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
862 {
863     int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
864     if (errCode != E_OK) {
865         return errCode;
866     }
867     return DeleteSyncLog(item, inserter, rmLogStmt);
868 }
869 
GetSyncDataPre(const DataItem & dataItem,sqlite3_stmt * queryStmt,DataItem & itemGet)870 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt,
871     DataItem &itemGet)
872 {
873     LogInfo logInfoGet;
874     int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
875     itemGet.timestamp = logInfoGet.timestamp;
876     SQLiteUtils::ResetStatement(queryStmt, false, errCode);
877     return errCode;
878 }
879 
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated)880 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
881     sqlite3_stmt *queryStmt, bool &isDefeated)
882 {
883     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
884         mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
885         isDefeated = false; // no need to solve conflict except miss query data
886         return E_OK;
887     }
888 
889     DataItem itemGet;
890     int errCode = GetSyncDataPre(dataItem, queryStmt, itemGet);
891     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
892         LOGE("Failed to get raw data. %d", errCode);
893         return errCode;
894     }
895     isDefeated = (dataItem.timestamp <= itemGet.timestamp); // defeated if item timestamp is earlier then raw data
896     return E_OK;
897 }
898 
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)899 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
900     SaveSyncDataStmt &saveStmt, DataItem &item)
901 {
902     bool isDefeated = false;
903     int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated);
904     if (errCode != E_OK) {
905         LOGE("check data conflict failed. %d", errCode);
906         return errCode;
907     }
908 
909     if (isDefeated) {
910         LOGD("Data was defeated.");
911         return E_OK;
912     }
913     if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
914         return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
915     }
916     int64_t rowid = -1;
917     errCode = SaveSyncDataItem(item, saveStmt, inserter, rowid);
918     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
919         errCode = SaveSyncLog(saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
920     }
921     return errCode;
922 }
923 
SaveSyncDataItems(RelationalSyncDataInserter & inserter)924 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
925 {
926     SaveSyncDataStmt saveStmt;
927     int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
928     if (errCode != E_OK) {
929         LOGE("Prepare insert sync data statement failed.");
930         return errCode;
931     }
932 
933     errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
934         if (item.neglect) { // Do not save this record if it is neglected
935             return E_OK;
936         }
937         int errCode = SaveSyncDataItem(inserter, saveStmt, item);
938         if (errCode != E_OK) {
939             LOGE("save sync data item failed. err=%d", errCode);
940             return errCode;
941         }
942         // Need not reset rmDataStmt and rmLogStmt here.
943         return saveStmt.ResetStatements(false);
944     });
945 
946     int ret = saveStmt.ResetStatements(true);
947     return errCode != E_OK ? errCode : ret;
948 }
949 
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)950 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
951 {
952     if (useTrans) {
953         int errCode = StartTransaction(TransactType::IMMEDIATE);
954         if (errCode != E_OK) {
955             return errCode;
956         }
957     }
958 
959     int errCode = SetLogTriggerStatus(false);
960     if (errCode != E_OK) {
961         goto END;
962     }
963 
964     errCode = SaveSyncDataItems(inserter);
965     if (errCode != E_OK) {
966         LOGE("Save sync data items failed. errCode=%d", errCode);
967         goto END;
968     }
969 
970     errCode = SetLogTriggerStatus(true);
971 END:
972     if (useTrans) {
973         if (errCode == E_OK) {
974             errCode = Commit();
975         } else {
976             (void)Rollback(); // Keep the error code of the first scene
977         }
978     }
979     return errCode;
980 }
981 
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const982 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
983     bool isGettingDeletedData) const
984 {
985     RowDataWithLog data;
986     int errCode = GetLogData(stmt, data.logInfo);
987     if (errCode != E_OK) {
988         LOGE("relational data value transfer to kv fail");
989         return errCode;
990     }
991 
992     if (!isGettingDeletedData) {
993         for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
994             DataValue value;
995             errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
996                 value);
997             if (errCode != E_OK) {
998                 return errCode;
999             }
1000             data.rowData.push_back(std::move(value)); // sorted by cid
1001         }
1002     }
1003 
1004     errCode = DataTransformer::SerializeDataItem(data,
1005         isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
1006     if (errCode != E_OK) {
1007         LOGE("relational data value transfer to kv fail");
1008     }
1009     return errCode;
1010 }
1011 
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1012 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1013 {
1014     int errCode = GetDataItemForSync(fullStmt, item, false);
1015     if (errCode != E_OK) {
1016         return errCode;
1017     }
1018     item.value = {};
1019     item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1020     return errCode;
1021 }
1022 
1023 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1024 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp &timestamp)
1025 {
1026     if (stmt == nullptr) {
1027         return -E_INVALID_ARGS;
1028     }
1029     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1030     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1031         timestamp = INT64_MAX;
1032         errCode = E_OK;
1033     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1034         timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3));  // 3 means timestamp index
1035         errCode = E_OK;
1036     }
1037     return errCode;
1038 }
1039 
StepNext(bool isMemDB,sqlite3_stmt * stmt)1040 int StepNext(bool isMemDB, sqlite3_stmt *stmt)
1041 {
1042     if (stmt == nullptr) {
1043         return -E_INVALID_ARGS;
1044     }
1045     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1046     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1047         errCode = -E_FINISHED;
1048     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1049         errCode = E_OK;
1050     }
1051     return errCode;
1052 }
1053 
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1054 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1055     std::vector<DataItem> &dataItems, DataItem &&item)
1056 {
1057     // If one record is over 4M, ignore it.
1058     if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1059         overLongSize++;
1060     } else {
1061         // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1062         dataTotalSize += GetDataItemSerialSize(item, appendLength);
1063         if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1064             return -E_UNFINISHED;
1065         } else {
1066             dataItems.push_back(item);
1067         }
1068     }
1069     return E_OK;
1070 }
1071 }
1072 
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1073 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1074     sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1075 {
1076     if (!isFirstTime) { // For the first time, never step before, can get nothing
1077         int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1078         if (errCode != E_OK) {
1079             return errCode;
1080         }
1081     }
1082     return StepNext(isMemDb_, queryStmt, queryTime);
1083 }
1084 
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1085 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1086     Timestamp &missQueryTime)
1087 {
1088     int errCode = GetMissQueryData(fullStmt, item);
1089     if (errCode != E_OK) {
1090         return errCode;
1091     }
1092     return StepNext(isMemDb_, fullStmt, missQueryTime);
1093 }
1094 
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1095 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1096     const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1097     const TableInfo &tableInfo)
1098 {
1099     baseTblName_ = tableInfo.GetTableName();
1100     SetTableInfo(tableInfo);
1101     sqlite3_stmt *queryStmt = nullptr;
1102     sqlite3_stmt *fullStmt = nullptr;
1103     bool isGettingDeletedData = false;
1104     int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1105     if (errCode != E_OK) {
1106         return errCode;
1107     }
1108 
1109     Timestamp queryTime = 0;
1110     Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1111 
1112     bool isFirstTime = true;
1113     size_t dataTotalSize = 0;
1114     size_t overLongSize = 0;
1115     do {
1116         DataItem item;
1117         if (queryTime < missQueryTime) {
1118             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1119         } else if (queryTime == missQueryTime) {
1120             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1121             if (errCode != E_OK) {
1122                 break;
1123             }
1124             errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1125         } else {
1126             errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1127         }
1128 
1129         if (errCode == E_OK && !isFirstTime) {
1130             errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1131         }
1132 
1133         if (errCode != E_OK) {
1134             break;
1135         }
1136 
1137         isFirstTime = false;
1138         if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1139             errCode = -E_FINISHED;
1140             break;
1141         }
1142     } while (true);
1143     LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1144         errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1145     SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1146     SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1147     return errCode;
1148 }
1149 
CheckDBModeForRelational()1150 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1151 {
1152     std::string journalMode;
1153     int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1154 
1155     for (auto &c : journalMode) { // convert to lowercase
1156         c = static_cast<char>(std::tolower(c));
1157     }
1158 
1159     if (errCode == E_OK && journalMode != "wal") {
1160         LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1161         return -E_NOT_SUPPORT;
1162     }
1163     return errCode;
1164 }
1165 
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1166 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1167     const std::string &tableName)
1168 {
1169     std::vector<std::string> deviceTables;
1170     int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1171     if (errCode != E_OK) {
1172         LOGE("Get device table name for alter table failed. %d", errCode);
1173         return errCode;
1174     }
1175 
1176     LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1177     for (const auto &table : deviceTables) {
1178         std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1179         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1180         if (errCode != E_OK) {
1181             LOGE("Delete device data failed. %d", errCode);
1182             break;
1183         }
1184     }
1185     return errCode;
1186 }
1187 
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1188 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1189 {
1190     std::string deleteLogSql =
1191         "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName +
1192         "_log WHERE flag&0x02=0 and (cloud_gid = '' or cloud_gid is null)";
1193     return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1194 }
1195 
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1196 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1197     const std::string &tableName)
1198 {
1199     std::string deleteLogSql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE device = ?";
1200     sqlite3_stmt *deleteLogStmt = nullptr;
1201     int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1202     if (errCode != E_OK) {
1203         LOGE("Get delete device data log statement failed. %d", errCode);
1204         return errCode;
1205     }
1206 
1207     errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1208     if (errCode != E_OK) {
1209         LOGE("Bind device to delete data log statement failed. %d", errCode);
1210         SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1211         return errCode;
1212     }
1213 
1214     errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1215     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1216         errCode = E_OK;
1217     } else {
1218         LOGE("Delete data log failed. %d", errCode);
1219     }
1220 
1221     SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1222     return errCode;
1223 }
1224 
DeleteDistributedLogTable(const std::string & tableName)1225 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1226 {
1227     if (tableName.empty()) {
1228         return -E_INVALID_ARGS;
1229     }
1230     std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1231     std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1232     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1233     if (errCode != E_OK) {
1234         LOGE("Delete distributed log table failed. %d", errCode);
1235     }
1236     return errCode;
1237 }
1238 
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1239 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1240     std::vector<std::string> &missingTables)
1241 {
1242     if (tableNames.empty()) {
1243         return E_OK;
1244     }
1245     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1246     sqlite3_stmt *stmt = nullptr;
1247     int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1248     if (errCode != E_OK) {
1249         SQLiteUtils::ResetStatement(stmt, true, errCode);
1250         return errCode;
1251     }
1252     for (const auto &tableName : tableNames) {
1253         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1254         if (errCode != E_OK) {
1255             LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1256             break;
1257         }
1258 
1259         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1260         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1261             errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1262             if (errCode != E_OK) {
1263                 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1264                 break;
1265             }
1266             errCode = DeleteDistributedLogTable(tableName);
1267             if (errCode != E_OK) {
1268                 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1269                 break;
1270             }
1271             missingTables.emplace_back(tableName);
1272         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1273             LOGE("Check distributed table failed. %d", errCode);
1274             break;
1275         }
1276         errCode = E_OK; // Check result ok for distributed table is still exists
1277         SQLiteUtils::ResetStatement(stmt, false, errCode);
1278     }
1279     SQLiteUtils::ResetStatement(stmt, true, errCode);
1280     return CheckCorruptedStatus(errCode);
1281 }
1282 
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1283 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1284     const TableInfo &baseTbl, const StoreInfo &info)
1285 {
1286     if (dbHandle_ == nullptr) {
1287         return -E_INVALID_DB;
1288     }
1289 
1290     if (device.empty() || !baseTbl.IsValid()) {
1291         return -E_INVALID_ARGS;
1292     }
1293 
1294     std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1295     int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1296     if (errCode != E_OK) {
1297         LOGE("Create device table failed. %d", errCode);
1298         return errCode;
1299     }
1300 
1301     errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1302     if (errCode != E_OK) {
1303         LOGE("Copy index to device table failed. %d", errCode);
1304     }
1305     return errCode;
1306 }
1307 
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1308 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1309     const std::string &schemaVersion)
1310 {
1311     if (dbHandle_ == nullptr) {
1312         return -E_INVALID_DB;
1313     }
1314 
1315     TableInfo newTable;
1316     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1317     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1318         LOGE("Check new schema failed. %d", errCode);
1319         return errCode;
1320     } else {
1321         errCode = table.CompareWithTable(newTable, schemaVersion);
1322         if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1323             LOGE("Check schema failed, schema was changed. %d", errCode);
1324             return -E_DISTRIBUTED_SCHEMA_CHANGED;
1325         } else {
1326             errCode = E_OK;
1327         }
1328     }
1329 
1330     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1331     if (errCode != E_OK) {
1332         LOGE("Get query helper for check query failed. %d", errCode);
1333         return errCode;
1334     }
1335 
1336     if (!query.IsQueryForRelationalDB()) {
1337         LOGE("Not support for this query type.");
1338         return -E_NOT_SUPPORT;
1339     }
1340 
1341     SyncTimeRange defaultTimeRange;
1342     sqlite3_stmt *stmt = nullptr;
1343     errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1344         stmt);
1345     if (errCode != E_OK) {
1346         LOGE("Get query statement for check query failed. %d", errCode);
1347     }
1348 
1349     SQLiteUtils::ResetStatement(stmt, true, errCode);
1350     return errCode;
1351 }
1352 
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1353 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1354     Timestamp &maxTimestamp) const
1355 {
1356     maxTimestamp = 0;
1357     for (const auto &tableName : tableNames) {
1358         const std::string sql = "SELECT max(timestamp) from " + DBConstant::RELATIONAL_PREFIX + tableName + "_log;";
1359         sqlite3_stmt *stmt = nullptr;
1360         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1361         if (errCode != E_OK) {
1362             return errCode;
1363         }
1364         errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1365         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1366             maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1367             errCode = E_OK;
1368         }
1369         SQLiteUtils::ResetStatement(stmt, true, errCode);
1370         if (errCode != E_OK) {
1371             maxTimestamp = 0;
1372             return errCode;
1373         }
1374     }
1375     return E_OK;
1376 }
1377 
SetLogTriggerStatus(bool status)1378 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1379 {
1380     const std::string key = "log_trigger_switch";
1381     std::string val = status ? "true" : "false";
1382     std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
1383         " VALUES ('" + key + "', '" + val + "')";
1384     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1385     if (errCode != E_OK) {
1386         LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1387     }
1388     return errCode;
1389 }
1390 
1391 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1392 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1393     std::vector<RelationalRowData *> &data)
1394 {
1395     size_t totalLength = 0;
1396     do {
1397         int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1398         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1399             return E_OK;
1400         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1401             LOGE("Get data by bind sql failed:%d", errCode);
1402             return errCode;
1403         }
1404 
1405         if (colNames.empty()) {
1406             SQLiteUtils::GetSelectCols(stmt, colNames);  // Get column names.
1407         }
1408         auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1409         if (relaRowData == nullptr) {
1410             LOGE("ExecuteQueryBySqlStmt OOM");
1411             return -E_OUT_OF_MEMORY;
1412         }
1413 
1414         auto dataSz = relaRowData->CalcLength();
1415         if (dataSz == 0) {  // invalid data
1416             delete relaRowData;
1417             relaRowData = nullptr;
1418             continue;
1419         }
1420 
1421         totalLength += static_cast<size_t>(dataSz);
1422         if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) {  // the set has been full
1423             delete relaRowData;
1424             relaRowData = nullptr;
1425             LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1426             return -E_REMOTE_OVER_SIZE;
1427         }
1428         data.push_back(relaRowData);
1429     } while (true);
1430     return E_OK;
1431 }
1432 }
1433 
1434 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1435 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1436     const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1437     std::vector<RelationalRowData *> &data)
1438 {
1439     int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1440     if (errCode != E_OK) {
1441         return errCode;
1442     }
1443 
1444     sqlite3_stmt *stmt = nullptr;
1445     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1446     if (errCode != E_OK) {
1447         (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1448         return errCode;
1449     }
1450     ResFinalizer finalizer([this, &stmt, &errCode] {
1451         (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1452         SQLiteUtils::ResetStatement(stmt, true, errCode);
1453     });
1454     for (size_t i = 0; i < bindArgs.size(); ++i) {
1455         errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1456         if (errCode != E_OK) {
1457             return errCode;
1458         }
1459     }
1460     return GetRowDatas(stmt, isMemDb_, colNames, data);
1461 }
1462 
CheckEncryptedOrCorrupted() const1463 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1464 {
1465     if (dbHandle_ == nullptr) {
1466         return -E_INVALID_DB;
1467     }
1468 
1469     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1470     if (errCode != E_OK) {
1471         LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1472     }
1473     return errCode;
1474 }
1475 
GetExistsDeviceList(std::set<std::string> & devices) const1476 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1477 {
1478     return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1479         isMemDb_, devices);
1480 }
1481 
GetUploadCount(const std::string & tableName,const Timestamp & timestamp,const bool isCloudForcePush,int64_t & count)1482 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const std::string &tableName,
1483     const Timestamp &timestamp, const bool isCloudForcePush, int64_t &count)
1484 {
1485     std::string sql = isCloudForcePush ?
1486         ("SELECT count(rowid) from '" + DBCommon::GetLogTableName(tableName)
1487         + "' where timestamp > ? and (flag & 0x04) != 0x04 and (cloud_gid != ''"
1488         " or (cloud_gid == '' and (flag & 0x01) = 0 ));"):
1489         ("SELECT count(rowid) from '" + DBCommon::GetLogTableName(tableName)
1490         + "' where timestamp > ? and (flag & 0x02) = 0x02 and (cloud_gid != ''"
1491         " or (cloud_gid == '' and (flag & 0x01) = 0 ));");
1492 
1493     sqlite3_stmt *stmt = nullptr;
1494     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1495     if (errCode != E_OK) {
1496         return errCode;
1497     }
1498     errCode = SQLiteUtils::BindInt64ToStatement(stmt, 1, timestamp);
1499     if (errCode != E_OK) {
1500         SQLiteUtils::ResetStatement(stmt, true, errCode);
1501         return errCode;
1502     }
1503     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1504     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1505         count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1506         errCode = E_OK;
1507     } else {
1508         LOGE("Failed to get the count to be uploaded. %d", errCode);
1509     }
1510     SQLiteUtils::ResetStatement(stmt, true, errCode);
1511     LOGD("upload count is %d, isCloudForcePush is %d", count, isCloudForcePush);
1512     return errCode;
1513 }
1514 
UpdateCloudLogGid(const CloudSyncData & cloudDataResult)1515 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult)
1516 {
1517     if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1518         cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1519         return -E_INVALID_ARGS;
1520     }
1521     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1522         + "' SET cloud_gid = ? where data_key = ? ";
1523     sqlite3_stmt *stmt = nullptr;
1524     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1525     if (errCode != E_OK) {
1526         return errCode;
1527     }
1528     for (size_t i = 0; i < cloudDataResult.insData.extend.size(); ++i) {
1529         auto gidEntry = cloudDataResult.insData.extend[i].find(CloudDbConstant::GID_FIELD);
1530         int64_t rowid = cloudDataResult.insData.rowid[i];
1531         if (gidEntry == cloudDataResult.insData.extend[i].end()) {
1532             errCode = -E_INVALID_ARGS;
1533             break;
1534         }
1535         std::string val;
1536         if (CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD,
1537             cloudDataResult.insData.extend[i], val) != E_OK) {
1538             errCode = -E_INVALID_DATA;
1539             break;
1540         }
1541         if (val.empty()) {
1542             errCode = -E_CLOUD_ERROR;
1543             break;
1544         }
1545         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, val);
1546         if (errCode != E_OK) {
1547             break;
1548         }
1549         errCode = SQLiteUtils::BindInt64ToStatement(stmt, 2, rowid); // 2 means the second bind args
1550         if (errCode != E_OK) {
1551             break;
1552         }
1553         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1554         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1555             errCode = E_OK;
1556             SQLiteUtils::ResetStatement(stmt, false, errCode);
1557         } else {
1558             LOGE("Update cloud log failed:%d", errCode);
1559             break;
1560         }
1561     }
1562     SQLiteUtils::ResetStatement(stmt, true, errCode);
1563     return errCode;
1564 }
1565 
GetSyncCloudData(CloudSyncData & cloudDataResult,const uint32_t & maxSize,SQLiteSingleVerRelationalContinueToken & token)1566 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(CloudSyncData &cloudDataResult,
1567     const uint32_t &maxSize, SQLiteSingleVerRelationalContinueToken &token)
1568 {
1569     token.GetCloudTableSchema(tableSchema_);
1570     sqlite3_stmt *queryStmt = nullptr;
1571     bool isStepNext = false;
1572     int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult.isCloudForcePushStrategy, queryStmt, isStepNext);
1573     if (errCode != E_OK) {
1574         (void)token.ReleaseCloudStatement();
1575         return errCode;
1576     }
1577     uint32_t totalSize = 0;
1578     uint32_t stepNum = 0;
1579     do {
1580         if (isStepNext) {
1581             errCode = StepNext(isMemDb_, queryStmt);
1582             if (errCode != E_OK) {
1583                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1584                 break;
1585             }
1586         }
1587         isStepNext = true;
1588         errCode = GetCloudDataForSync(queryStmt, cloudDataResult, stepNum++, totalSize, maxSize);
1589     } while (errCode == E_OK);
1590     LOGD("Get cloud sync data, insData:%u, upData:%u, delLog:%u", cloudDataResult.insData.record.size(),
1591         cloudDataResult.updData.record.size(), cloudDataResult.delData.extend.size());
1592     if (errCode != -E_UNFINISHED) {
1593         (void)token.ReleaseCloudStatement();
1594     }
1595     return errCode;
1596 }
1597 
GetCloudDataForSync(sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t stepNum,uint32_t & totalSize,const uint32_t & maxSize)1598 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(sqlite3_stmt *statement,
1599     CloudSyncData &cloudDataResult, uint32_t stepNum, uint32_t &totalSize, const uint32_t &maxSize)
1600 {
1601     VBucket log;
1602     VBucket extraLog;
1603     GetCloudLog(statement, log, totalSize);
1604     GetCloudExtraLog(statement, extraLog);
1605 
1606     VBucket data;
1607     int64_t flag = 0;
1608     int errCode = CloudStorageUtils::GetValueFromVBucket(FLAG, extraLog, flag);
1609     if (errCode != E_OK) {
1610         return errCode;
1611     }
1612     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1613         for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1614             Type cloudValue;
1615             errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1616                 tableSchema_.fields[cid].type, cid + 8, cloudValue); // 8 is the start index of query cloud data
1617             if (errCode != E_OK) {
1618                 return errCode;
1619             }
1620             SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1621             errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1622             if (errCode != E_OK) {
1623                 return errCode;
1624             }
1625         }
1626     }
1627 
1628     if (IsGetCloudDataContinue(stepNum, totalSize, maxSize)) {
1629         errCode = IdentifyCloudType(cloudDataResult, data, log, extraLog);
1630     } else {
1631         errCode = -E_UNFINISHED;
1632     }
1633     return errCode;
1634 }
1635 
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1636 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1637 {
1638     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1639         Asset asset;
1640         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1641         if (errCode != E_OK) {
1642             return errCode;
1643         }
1644         if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1645             return -E_CLOUD_INVALID_ASSET;
1646         }
1647         vBucket.insert_or_assign(field.colName, asset);
1648     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1649         Assets assets;
1650         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1651         if (errCode != E_OK) {
1652             return errCode;
1653         }
1654         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1655             return -E_CLOUD_ERROR;
1656         }
1657         if (!CloudStorageUtils::CheckAssetStatus(assets)) {
1658             return -E_CLOUD_INVALID_ASSET;
1659         }
1660         vBucket.insert_or_assign(field.colName, assets);
1661     } else {
1662         vBucket.insert_or_assign(field.colName, cloudValue);
1663     }
1664     return E_OK;
1665 }
1666 
GetInfoByPrimaryKeyOrGid(const TableSchema & tableSchema,const VBucket & vBucket,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1667 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByPrimaryKeyOrGid(const TableSchema &tableSchema,
1668     const VBucket &vBucket, DataInfoWithLog &dataInfoWithLog, VBucket &assetInfo)
1669 {
1670     std::string querySql;
1671     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
1672     std::vector<Field> assetFields = CloudStorageUtils::GetCloudAsset(tableSchema);
1673     int errCode = GetQueryInfoSql(tableSchema.name, vBucket, pkSet, assetFields, querySql);
1674     if (errCode != E_OK) {
1675         LOGE("Get query log sql fail, %d", errCode);
1676         return errCode;
1677     }
1678 
1679     sqlite3_stmt *selectStmt = nullptr;
1680     errCode = GetQueryLogStatement(tableSchema, vBucket, querySql, pkSet, selectStmt);
1681     if (errCode != E_OK) {
1682         LOGE("Get query log statement fail, %d", errCode);
1683         return errCode;
1684     }
1685 
1686     bool alreadyFound = false;
1687     do {
1688         errCode = SQLiteUtils::StepWithRetry(selectStmt);
1689         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1690             if (alreadyFound) {
1691                 LOGE("found more than one records in log table for one primary key or gid.");
1692                 errCode = -E_CLOUD_ERROR;
1693                 break;
1694             }
1695             alreadyFound = true;
1696             std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1697             errCode = GetInfoByStatement(selectStmt, assetFields, pkMap, dataInfoWithLog, assetInfo);
1698             if (errCode != E_OK) {
1699                 LOGE("Get info by statement fail, %d", errCode);
1700                 break;
1701             }
1702         } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1703             errCode = alreadyFound ? E_OK : -E_NOT_FOUND;
1704             break;
1705         } else {
1706             LOGE("SQLite step failed when query log for cloud sync:%d", errCode);
1707             break;
1708         }
1709     } while (errCode == E_OK);
1710 
1711     int ret = E_OK;
1712     SQLiteUtils::ResetStatement(selectStmt, true, ret);
1713     return errCode != E_OK ? errCode : ret;
1714 }
1715 
GetLogInfoByStatement(sqlite3_stmt * statement,LogInfo & logInfo)1716 void SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByStatement(sqlite3_stmt *statement, LogInfo &logInfo)
1717 {
1718     logInfo.dataKey = sqlite3_column_int64(statement, 0);
1719     std::vector<uint8_t> device;
1720     (void)SQLiteUtils::GetColumnBlobValue(statement, 1, device);    // 1 is device
1721     DBCommon::VectorToString(device, logInfo.device);
1722     std::vector<uint8_t> originDev;
1723     (void)SQLiteUtils::GetColumnBlobValue(statement, 2, originDev); // 2 is originDev
1724     DBCommon::VectorToString(originDev, logInfo.originDev);
1725     logInfo.timestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, 3)); // 3 is timestamp
1726     logInfo.wTimestamp = static_cast<Timestamp>(sqlite3_column_int64(statement, 4)); // 4 is wtimestamp
1727     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int(statement, 5)); // 5 is flag
1728     (void)SQLiteUtils::GetColumnBlobValue(statement, 6, logInfo.hashKey); // 6 is hash_key
1729     (void)SQLiteUtils::GetColumnTextValue(statement, 7, logInfo.cloudGid); // 7 is cloud_gid
1730 }
1731 
GetInfoByStatement(sqlite3_stmt * statement,std::vector<Field> & assetFields,const std::map<std::string,Field> & pkMap,DataInfoWithLog & dataInfoWithLog,VBucket & assetInfo)1732 int SQLiteSingleVerRelationalStorageExecutor::GetInfoByStatement(sqlite3_stmt *statement,
1733     std::vector<Field> &assetFields, const std::map<std::string, Field> &pkMap, DataInfoWithLog &dataInfoWithLog,
1734     VBucket &assetInfo)
1735 {
1736     GetLogInfoByStatement(statement, dataInfoWithLog.logInfo);
1737     int index = 8; // 8 is start index of assetInfo or primary key
1738     int errCode = E_OK;
1739     for (const auto &field: assetFields) {
1740         Type cloudValue;
1741         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, field.type, index++, cloudValue);
1742         if (errCode != E_OK) {
1743             break;
1744         }
1745         errCode = PutVBucketByType(assetInfo, field, cloudValue);
1746         if (errCode != E_OK) {
1747             break;
1748         }
1749     }
1750     if (errCode != E_OK) {
1751         LOGE("set asset field failed, errCode = %d", errCode);
1752         return errCode;
1753     }
1754 
1755     // fill primary key
1756     for (const auto &item : pkMap) {
1757         Type cloudValue;
1758         errCode = SQLiteRelationalUtils::GetCloudValueByType(statement, item.second.type, index++, cloudValue);
1759         if (errCode != E_OK) {
1760             break;
1761         }
1762         errCode = PutVBucketByType(dataInfoWithLog.primaryKeys, item.second, cloudValue);
1763         if (errCode != E_OK) {
1764             break;
1765         }
1766     }
1767     return errCode;
1768 }
1769 
GetInsertSqlForCloudSync(const TableSchema & tableSchema)1770 std::string SQLiteSingleVerRelationalStorageExecutor::GetInsertSqlForCloudSync(const TableSchema &tableSchema)
1771 {
1772     std::string sql = "insert into " + tableSchema.name + "(";
1773     for (const auto &field : tableSchema.fields) {
1774         sql += field.colName + ",";
1775     }
1776     sql.pop_back();
1777     sql += ") values(";
1778     for (size_t i = 0; i < tableSchema.fields.size(); i++) {
1779         sql += "?,";
1780     }
1781     sql.pop_back();
1782     sql += ");";
1783     return sql;
1784 }
1785 
GetPrimaryKeyHashValue(const VBucket & vBucket,const TableSchema & tableSchema,std::vector<uint8_t> & hashValue,bool allowEmpty)1786 int SQLiteSingleVerRelationalStorageExecutor::GetPrimaryKeyHashValue(const VBucket &vBucket,
1787     const TableSchema &tableSchema, std::vector<uint8_t> &hashValue, bool allowEmpty)
1788 {
1789     int errCode = E_OK;
1790     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
1791     if (pkMap.size() == 0) {
1792         int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
1793         std::vector<uint8_t> value;
1794         DBCommon::StringToVector(std::to_string(rowid), value);
1795         errCode = DBCommon::CalcValueHash(value, hashValue);
1796     } else if (pkMap.size() == 1) {
1797         std::vector<Field> pkVec = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
1798         errCode = CloudStorageUtils::CalculateHashKeyForOneField(pkVec.at(0), vBucket, allowEmpty, hashValue);
1799     } else {
1800         std::vector<uint8_t> tempRes;
1801         for (const auto &item: pkMap) {
1802             std::vector<uint8_t> temp;
1803             errCode = CloudStorageUtils::CalculateHashKeyForOneField(item.second, vBucket, allowEmpty, temp);
1804             if (errCode != E_OK) {
1805                 LOGE("calc hash fail when there is more than one primary key. errCode = %d", errCode);
1806                 return errCode;
1807             }
1808             tempRes.insert(tempRes.end(), temp.begin(), temp.end());
1809         }
1810         errCode = DBCommon::CalcValueHash(tempRes, hashValue);
1811     }
1812     return errCode;
1813 }
1814 
GetQueryLogStatement(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & querySql,std::set<std::string> & pkSet,sqlite3_stmt * & selectStmt)1815 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogStatement(const TableSchema &tableSchema,
1816     const VBucket &vBucket, const std::string &querySql, std::set<std::string> &pkSet, sqlite3_stmt *&selectStmt)
1817 {
1818     int errCode = SQLiteUtils::GetStatement(dbHandle_, querySql, selectStmt);
1819     if (errCode != E_OK) {
1820         LOGE("Get select log statement failed, %d", errCode);
1821         return errCode;
1822     }
1823 
1824     std::string cloudGid;
1825     errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
1826     if (errCode != E_OK) {
1827         LOGE("Get cloud gid fail when bind query log statement.");
1828         return errCode;
1829     }
1830 
1831     int index = 0;
1832     if (!cloudGid.empty()) {
1833         index++;
1834         errCode = SQLiteUtils::BindTextToStatement(selectStmt, index, cloudGid);
1835         if (errCode != E_OK) {
1836             LOGE("Bind cloud gid to query log statement failed. %d", errCode);
1837             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1838             return errCode;
1839         }
1840     }
1841 
1842     std::vector<uint8_t> hashValue;
1843     if (!pkSet.empty()) {
1844         errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashValue, true);
1845     }
1846     if (errCode != E_OK) {
1847         LOGE("calc hash fail when get query log statement, errCode = %d", errCode);
1848         SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1849         return errCode;
1850     }
1851 
1852     index++;
1853     errCode = SQLiteUtils::BindBlobToStatement(selectStmt, index, hashValue, true);
1854     int ret = E_OK;
1855     if (errCode != E_OK) {
1856         LOGE("Bind hash key to query log statement failed. %d", errCode);
1857         SQLiteUtils::ResetStatement(selectStmt, true, ret);
1858     }
1859     return errCode != E_OK ? errCode : ret;
1860 }
1861 
GetQueryLogSql(const std::string & tableName,const VBucket & vBucket,std::set<std::string> & pkSet,std::string & querySql)1862 int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string &tableName, const VBucket &vBucket,
1863     std::set<std::string> &pkSet, std::string &querySql)
1864 {
1865     std::string cloudGid;
1866     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
1867     if (errCode != E_OK) {
1868         LOGE("Get cloud gid fail when query log table.");
1869         return errCode;
1870     }
1871 
1872     if (pkSet.empty() && cloudGid.empty()) {
1873         LOGE("query log table failed because of both primary key and gid are empty.");
1874         return -E_CLOUD_ERROR;
1875     }
1876     std::string sql = "select data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid FROM "
1877         + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE ";
1878     if (!cloudGid.empty()) {
1879         sql += "cloud_gid = ? or ";
1880     }
1881     sql += "hash_key = ?";
1882 
1883     querySql = sql;
1884     return E_OK;
1885 }
1886 
ExecutePutCloudData(const std::string & tableName,const TableSchema & tableSchema,DownloadData & downloadData,std::map<int,int> & statisticMap)1887 int SQLiteSingleVerRelationalStorageExecutor::ExecutePutCloudData(const std::string &tableName,
1888     const TableSchema &tableSchema, DownloadData &downloadData, std::map<int, int> &statisticMap)
1889 {
1890     int index = 0;
1891     int errCode = E_OK;
1892     for (OpType op : downloadData.opType) {
1893         VBucket &vBucket = downloadData.data[index];
1894         switch (op) {
1895             case OpType::INSERT:
1896                 errCode = InsertCloudData(tableName, vBucket, tableSchema);
1897                 break;
1898             case OpType::UPDATE:
1899                 errCode = UpdateCloudData(tableName, vBucket, tableSchema);
1900                 break;
1901             case OpType::DELETE:
1902                 errCode = DeleteCloudData(tableName, vBucket, tableSchema);
1903                 break;
1904             case OpType::ONLY_UPDATE_GID:
1905             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO:
1906             case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE:
1907             case OpType::UPDATE_TIMESTAMP:
1908             case OpType::CLEAR_GID:
1909                 errCode = OnlyUpdateLogTable(vBucket, tableSchema, op);
1910                 break;
1911             case OpType::NOT_HANDLE:
1912                 break;
1913             default:
1914                 errCode = -E_CLOUD_ERROR;
1915                 break;
1916         }
1917         if (errCode != E_OK) {
1918             LOGE("put cloud sync data fail: %d", errCode);
1919             return errCode;
1920         }
1921         statisticMap[static_cast<int>(op)]++;
1922         index++;
1923     }
1924     return errCode;
1925 }
1926 
DoCleanInner(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)1927 int SQLiteSingleVerRelationalStorageExecutor::DoCleanInner(ClearMode mode,
1928     const std::vector<std::string> &tableNameList, const RelationalSchemaObject &localSchema,
1929     std::vector<Asset> &assets)
1930 {
1931     int errCode = SetLogTriggerStatus(false);
1932     if (errCode != E_OK) {
1933         LOGE("Fail to set log trigger off when clean cloud data, %d", errCode);
1934         return errCode;
1935     }
1936     if (mode == FLAG_ONLY) {
1937         errCode = DoCleanLogs(tableNameList);
1938         if (errCode != E_OK) {
1939             LOGE("[Storage Executor] Failed to do clean logs when clean cloud data.");
1940             return errCode;
1941         }
1942     } else if (mode == FLAG_AND_DATA) {
1943         errCode = DoCleanLogAndData(tableNameList, localSchema, assets);
1944         if (errCode != E_OK) {
1945             LOGE("[Storage Executor] Failed to do clean log and data when clean cloud data.");
1946             return errCode;
1947         }
1948     }
1949     errCode = SetLogTriggerStatus(true);
1950     if (errCode != E_OK) {
1951         LOGE("Fail to set log trigger on when clean cloud data, %d", errCode);
1952     }
1953 
1954     return errCode;
1955 }
1956 
DoCleanLogs(const std::vector<std::string> & tableNameList)1957 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogs(const std::vector<std::string> &tableNameList)
1958 {
1959     int errCode = E_OK;
1960     int i = 1;
1961     for (const auto &tableName: tableNameList) {
1962         std::string logTableName = DBCommon::GetLogTableName(tableName);
1963         LOGD("[Storage Executor] Start clean cloud data on log table. table index: %d.", i);
1964         errCode = CleanCloudDataOnLogTable(logTableName);
1965         if (errCode != E_OK) {
1966             LOGE("[Storage Executor] failed to clean cloud data on log table, %d", errCode);
1967             return errCode;
1968         }
1969         i++;
1970     }
1971 
1972     return errCode;
1973 }
1974 
CleanCloudDataOnLogTable(const std::string & logTableName)1975 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName)
1976 {
1977     std::string cleanLogSql = "UPDATE '" + logTableName + "' SET " + FLAG + " = " + SET_FLAG_LOCAL + ", " +
1978         DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '' WHERE " + CLOUD_GID_FIELD + " is not NULL and " +
1979         CLOUD_GID_FIELD + " != '';";
1980     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1981 }
1982 
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName)1983 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1984     const std::string &logTableName)
1985 {
1986     std::string sql = "DELETE FROM '" + tableName + "' WHERE " + ROWID +" in (SELECT " + DATAKEY +
1987         " FROM '" + logTableName + "' WHERE CLOUD_GID is not NULL and CLOUD_GID != '' and " + FLAG_IS_CLOUD + ");";
1988     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1989     if (errCode != E_OK) {
1990         LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1991         return errCode;
1992     }
1993     std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD + " is not NULL and " +
1994         CLOUD_GID_FIELD + " != '' AND " + FLAG_IS_CLOUD + ";";
1995     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1996     if (errCode != E_OK) {
1997         LOGE("Failed to delete cloud data on log table, %d.", errCode);
1998         return errCode;
1999     }
2000     errCode = CleanCloudDataOnLogTable(logTableName);
2001     if (errCode != E_OK) {
2002         LOGE("Failed to clean gid on log table, %d.", errCode);
2003     }
2004     return errCode;
2005 }
2006 
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys)2007 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
2008     std::vector<int64_t> &dataKeys)
2009 {
2010     sqlite3_stmt *selectStmt = nullptr;
2011     std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
2012         " is not NULL and " + CLOUD_GID_FIELD + " != '' AND " + FLAG_IS_CLOUD + ";";
2013 
2014     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
2015     if (errCode != E_OK) {
2016         LOGE("Get select data_key statement failed, %d", errCode);
2017         SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2018         return errCode;
2019     }
2020     do {
2021         errCode = SQLiteUtils::StepWithRetry(selectStmt);
2022         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2023             dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
2024         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2025             LOGE("SQLite step failed when query log's data_key : %d", errCode);
2026             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2027             break;
2028         }
2029     } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
2030     SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2031     return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
2032 }
2033 
DoCleanLogAndData(const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema,std::vector<Asset> & assets)2034 int SQLiteSingleVerRelationalStorageExecutor::DoCleanLogAndData(const std::vector<std::string> &tableNameList,
2035     const RelationalSchemaObject &localSchema, std::vector<Asset> &assets)
2036 {
2037     int errCode = E_OK;
2038     for (size_t i = 0; i < tableNameList.size(); i++) {
2039         std::string tableName = tableNameList[i];
2040         std::string logTableName = DBCommon::GetLogTableName(tableName);
2041         std::vector<int64_t> dataKeys;
2042         errCode = GetCleanCloudDataKeys(logTableName, dataKeys);
2043         if (errCode != E_OK) {
2044             LOGE("[Storage Executor] Failed to get clean cloud data keys, %d.", errCode);
2045             return errCode;
2046         }
2047 
2048         std::vector<FieldInfo> fieldInfos = localSchema.GetTable(tableName).GetFieldInfos();
2049         errCode = GetCloudAssets(tableName, fieldInfos, dataKeys, assets);
2050         if (errCode != E_OK) {
2051             LOGE("[Storage Executor] failed to get cloud assets when clean cloud data, %d", errCode);
2052             return errCode;
2053         }
2054 
2055         errCode = CleanCloudDataAndLogOnUserTable(tableName, logTableName);
2056         if (errCode != E_OK) {
2057             LOGE("[Storage Executor] failed to clean cloud data and log on user table, %d.", errCode);
2058             return errCode;
2059         }
2060     }
2061 
2062     return errCode;
2063 }
2064 
GetCloudAssetOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2065 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetOnTable(const std::string &tableName,
2066     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2067 {
2068     int errCode = E_OK;
2069     for (const auto &rowId : dataKeys) {
2070         std::string queryAssetSql = "SELECT " + fieldName + " FROM '" + tableName +
2071             "' WHERE " + ROWID + " = " + std::to_string(rowId) + ";";
2072         sqlite3_stmt *selectStmt = nullptr;
2073         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetSql, selectStmt);
2074         if (errCode != E_OK) {
2075             LOGE("Get select asset statement failed, %d", errCode);
2076             SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2077             return errCode;
2078         }
2079         errCode = SQLiteUtils::StepWithRetry(selectStmt);
2080         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2081             std::vector<uint8_t> blobValue;
2082             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
2083             if (errCode != E_OK) {
2084                 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2085                 return errCode;
2086             }
2087             Asset asset;
2088             errCode = RuntimeContext::GetInstance()->BlobToAsset(blobValue, asset);
2089             if (errCode != E_OK) {
2090                 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2091                 return errCode;
2092             }
2093             assets.push_back(asset);
2094         }
2095         SQLiteUtils::ResetStatement(selectStmt, true, errCode);
2096     }
2097     return errCode;
2098 }
2099 
GetCloudAssetsOnTable(const std::string & tableName,const std::string & fieldName,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2100 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssetsOnTable(const std::string &tableName,
2101     const std::string &fieldName, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2102 {
2103     int errCode = E_OK;
2104     int ret = E_OK;
2105     sqlite3_stmt *selectStmt = nullptr;
2106     for (const auto &rowId : dataKeys) {
2107         std::string queryAssetsSql = "SELECT " + fieldName + " FROM '" + tableName +
2108             "' WHERE " + ROWID + " = " + std::to_string(rowId) + ";";
2109         errCode = SQLiteUtils::GetStatement(dbHandle_, queryAssetsSql, selectStmt);
2110         if (errCode != E_OK) {
2111             LOGE("Get select assets statement failed, %d", errCode);
2112             goto END;
2113         }
2114         errCode = SQLiteUtils::StepWithRetry(selectStmt);
2115         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
2116             std::vector<uint8_t> blobValue;
2117             errCode = SQLiteUtils::GetColumnBlobValue(selectStmt, 0, blobValue);
2118             if (errCode != E_OK) {
2119                 goto END;
2120             }
2121             Assets tmpAssets;
2122             errCode = RuntimeContext::GetInstance()->BlobToAssets(blobValue, tmpAssets);
2123             if (errCode != E_OK) {
2124                 goto END;
2125             }
2126             for (const auto &asset: tmpAssets) {
2127                 assets.push_back(asset);
2128             }
2129         }
2130         SQLiteUtils::ResetStatement(selectStmt, true, ret);
2131     }
2132     return errCode != E_OK ? errCode : ret;
2133 END:
2134     SQLiteUtils::ResetStatement(selectStmt, true, ret);
2135     return errCode != E_OK ? errCode : ret;
2136 }
2137 
GetCloudAssets(const std::string & tableName,const std::vector<FieldInfo> & fieldInfos,const std::vector<int64_t> & dataKeys,std::vector<Asset> & assets)2138 int SQLiteSingleVerRelationalStorageExecutor::GetCloudAssets(const std::string &tableName,
2139     const std::vector<FieldInfo> &fieldInfos, const std::vector<int64_t> &dataKeys, std::vector<Asset> &assets)
2140 {
2141     int errCode = E_OK;
2142     for (const auto &fieldInfo: fieldInfos) {
2143         if (fieldInfo.IsAssetType()) {
2144             errCode = GetCloudAssetOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
2145             if (errCode != E_OK) {
2146                 LOGE("[Storage Executor] failed to get cloud asset on table, %d.", errCode);
2147                 return errCode;
2148             }
2149         } else if (fieldInfo.IsAssetsType()) {
2150             errCode = GetCloudAssetsOnTable(tableName, fieldInfo.GetFieldName(), dataKeys, assets);
2151             if (errCode != E_OK) {
2152                 LOGE("[Storage Executor] failed to get cloud assets on table, %d.", errCode);
2153                 return errCode;
2154             }
2155         }
2156     }
2157     return errCode;
2158 }
2159 
PutCloudSyncData(const std::string & tableName,const TableSchema & tableSchema,DownloadData & downloadData)2160 int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName,
2161     const TableSchema &tableSchema, DownloadData &downloadData)
2162 {
2163     if (downloadData.data.size() != downloadData.opType.size()) {
2164         LOGE("put cloud data, data size = %zu, flag size = %zu.", downloadData.data.size(),
2165              downloadData.opType.size());
2166         return -E_CLOUD_ERROR;
2167     }
2168 
2169     int errCode = SetLogTriggerStatus(false);
2170     if (errCode != E_OK) {
2171         LOGE("Fail to set log trigger off, %d", errCode);
2172         return errCode;
2173     }
2174 
2175     std::map<int, int> statisticMap = {};
2176     errCode = ExecutePutCloudData(tableName, tableSchema, downloadData, statisticMap);
2177     int ret = SetLogTriggerStatus(true);
2178     if (ret != E_OK) {
2179         LOGE("Fail to set log trigger on, %d", ret);
2180     }
2181     LOGD("save cloud data: %d, insert cnt = %d, update cnt = %d, delete cnt = %d, only update gid cnt = %d, "
2182          "set LCC flag zero cnt = %d, set LCC flag one cnt = %d, update timestamp cnt = %d, clear gid count = %d,"
2183          " not handle cnt = %d",
2184          errCode, statisticMap[static_cast<int>(OpType::INSERT)], statisticMap[static_cast<int>(OpType::UPDATE)],
2185          statisticMap[static_cast<int>(OpType::DELETE)], statisticMap[static_cast<int>(OpType::ONLY_UPDATE_GID)],
2186          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO)],
2187          statisticMap[static_cast<int>(OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE)],
2188          statisticMap[static_cast<int>(OpType::UPDATE_TIMESTAMP)], statisticMap[static_cast<int>(OpType::CLEAR_GID)],
2189          statisticMap[static_cast<int>(OpType::NOT_HANDLE)]);
2190     return errCode == E_OK ? ret : errCode;
2191 }
2192 
InsertCloudData(const std::string & tableName,VBucket & vBucket,const TableSchema & tableSchema)2193 int SQLiteSingleVerRelationalStorageExecutor::InsertCloudData(const std::string &tableName, VBucket &vBucket,
2194     const TableSchema &tableSchema)
2195 {
2196     std::string sql = GetInsertSqlForCloudSync(tableSchema);
2197     sqlite3_stmt *insertStmt = nullptr;
2198     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertStmt);
2199     if (errCode != E_OK) {
2200         LOGE("Get insert statement failed when save cloud data, %d", errCode);
2201         return errCode;
2202     }
2203     CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
2204     errCode = BindValueToUpsertStatement(vBucket, tableSchema.fields, insertStmt);
2205     if (errCode != E_OK) {
2206         SQLiteUtils::ResetStatement(insertStmt, true, errCode);
2207         return errCode;
2208     }
2209     // insert data
2210     errCode = SQLiteUtils::StepWithRetry(insertStmt, false);
2211     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2212         errCode = E_OK;
2213     } else {
2214         int ret = E_OK;
2215         SQLiteUtils::ResetStatement(insertStmt, true, ret);
2216         LOGE("insert data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
2217         return errCode;
2218     }
2219     SQLiteUtils::ResetStatement(insertStmt, true, errCode);
2220 
2221     // insert log
2222     return InsertLogRecord(tableSchema, vBucket);
2223 }
2224 
InsertLogRecord(const TableSchema & tableSchema,VBucket & vBucket)2225 int SQLiteSingleVerRelationalStorageExecutor::InsertLogRecord(const TableSchema &tableSchema, VBucket &vBucket)
2226 {
2227     if (!CloudStorageUtils::IsContainsPrimaryKey(tableSchema)) {
2228         // when one data is deleted, "insert or replace" will insert another log record if there is no primary key,
2229         // so we need to delete the old log record according to the gid first
2230         std::string gidStr;
2231         int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2232         if (errCode != E_OK || gidStr.empty()) {
2233             LOGE("Get gid from bucket fail when delete log with no primary key or gid is empty, errCode = %d", errCode);
2234             return errCode;
2235         }
2236         std::string sql = "delete from " + DBCommon::GetLogTableName(tableSchema.name) + " where cloud_gid = '"
2237             + gidStr + "';";
2238         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2239         if (errCode != E_OK) {
2240             LOGE("delete log record according gid fail, errCode = %d", errCode);
2241             return errCode;
2242         }
2243     }
2244 
2245     std::string sql = "insert or replace into " + DBCommon::GetLogTableName(tableSchema.name) +
2246         " values(?, ?, ?, ?, ?, ?, ?, ?)";
2247     sqlite3_stmt *insertLogStmt = nullptr;
2248     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, insertLogStmt);
2249     if (errCode != E_OK) {
2250         LOGE("Get insert log statement failed when save cloud data, %d", errCode);
2251         return errCode;
2252     }
2253 
2254     errCode = BindValueToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
2255     if (errCode != E_OK) {
2256         SQLiteUtils::ResetStatement(insertLogStmt, true, errCode);
2257         return errCode;
2258     }
2259 
2260     errCode = SQLiteUtils::StepWithRetry(insertLogStmt, false);
2261     int ret = E_OK;
2262     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2263         errCode = E_OK;
2264     } else {
2265         SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
2266         LOGE("insert log data failed when save cloud data:%d, reset stmt:%d", errCode, ret);
2267         return errCode;
2268     }
2269 
2270     SQLiteUtils::ResetStatement(insertLogStmt, true, ret);
2271     return errCode != E_OK ? errCode : ret;
2272 }
2273 
BindOneField(int index,const VBucket & vBucket,const Field & field,sqlite3_stmt * updateStmt)2274 int SQLiteSingleVerRelationalStorageExecutor::BindOneField(int index, const VBucket &vBucket, const Field &field,
2275     sqlite3_stmt *updateStmt)
2276 {
2277     auto it = bindCloudFieldFuncMap_.find(field.type);
2278     if (it == bindCloudFieldFuncMap_.end()) {
2279         LOGE("unknown cloud type when bind one field.");
2280         return -E_CLOUD_ERROR;
2281     }
2282     return it->second(index, vBucket, field, updateStmt);
2283 }
2284 
BindValueToUpsertStatement(const VBucket & vBucket,const std::vector<Field> & fields,sqlite3_stmt * upsertStmt)2285 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpsertStatement(const VBucket &vBucket,
2286     const std::vector<Field> &fields, sqlite3_stmt *upsertStmt)
2287 {
2288     int errCode = E_OK;
2289     int index = 0;
2290     for (const auto &field : fields) {
2291         index++;
2292         errCode = BindOneField(index, vBucket, field, upsertStmt);
2293         if (errCode != E_OK) {
2294             return errCode;
2295         }
2296     }
2297     return errCode;
2298 }
2299 
BindHashKeyAndGidToInsertLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)2300 int SQLiteSingleVerRelationalStorageExecutor::BindHashKeyAndGidToInsertLogStatement(const VBucket &vBucket,
2301     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
2302 {
2303     std::vector<uint8_t> hashKey;
2304     int errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey);
2305     if (errCode != E_OK) {
2306         return errCode;
2307     }
2308     errCode = SQLiteUtils::BindBlobToStatement(insertLogStmt, 7, hashKey); // 7 is hash_key
2309     if (errCode != E_OK) {
2310         LOGE("Bind hash_key to insert log statement failed, %d", errCode);
2311         return errCode;
2312     }
2313 
2314     std::string cloudGid;
2315     errCode = CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::GID_FIELD, vBucket, cloudGid);
2316     if (errCode != E_OK) {
2317         LOGE("get gid for insert log statement failed, %d", errCode);
2318         return -E_CLOUD_ERROR;
2319     }
2320 
2321     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 8, cloudGid); // 8 is cloud_gid
2322     if (errCode != E_OK) {
2323         LOGE("Bind cloud_gid to insert log statement failed, %d", errCode);
2324     }
2325     return errCode;
2326 }
2327 
BindValueToInsertLogStatement(VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * insertLogStmt)2328 int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(VBucket &vBucket,
2329     const TableSchema &tableSchema, sqlite3_stmt *insertLogStmt)
2330 {
2331     int64_t rowid = SQLiteUtils::GetLastRowId(dbHandle_);
2332     int errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 1, rowid);
2333     if (errCode != E_OK) {
2334         LOGE("Bind rowid to insert log statement failed, %d", errCode);
2335         return errCode;
2336     }
2337 
2338     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 2, "cloud"); // 2 is device
2339     if (errCode != E_OK) {
2340         LOGE("Bind device to insert log statement failed, %d", errCode);
2341         return errCode;
2342     }
2343 
2344     errCode = SQLiteUtils::BindTextToStatement(insertLogStmt, 3, "cloud"); // 3 is ori_device
2345     if (errCode != E_OK) {
2346         LOGE("Bind ori_device to insert log statement failed, %d", errCode);
2347         return errCode;
2348     }
2349 
2350     int64_t val = 0;
2351     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::MODIFY_FIELD, vBucket, val);
2352     if (errCode != E_OK) {
2353         LOGE("get modify time for insert log statement failed, %d", errCode);
2354         return -E_CLOUD_ERROR;
2355     }
2356 
2357     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 4, val); // 4 is timestamp
2358     if (errCode != E_OK) {
2359         LOGE("Bind timestamp to insert log statement failed, %d", errCode);
2360         return errCode;
2361     }
2362 
2363     errCode = CloudStorageUtils::GetValueFromVBucket<int64_t>(CloudDbConstant::CREATE_FIELD, vBucket, val);
2364     if (errCode != E_OK) {
2365         LOGE("get create time for insert log statement failed, %d", errCode);
2366         return -E_CLOUD_ERROR;
2367     }
2368 
2369     errCode = SQLiteUtils::BindInt64ToStatement(insertLogStmt, 5, val); // 5 is wtimestamp
2370     if (errCode != E_OK) {
2371         LOGE("Bind wtimestamp to insert log statement failed, %d", errCode);
2372         return errCode;
2373     }
2374 
2375     errCode = SQLiteUtils::MapSQLiteErrno(sqlite3_bind_int(insertLogStmt, 6, 0)); // 6 is flag
2376     if (errCode != E_OK) {
2377         LOGE("Bind flag to insert log statement failed, %d", errCode);
2378         return errCode;
2379     }
2380 
2381     vBucket[CloudDbConstant::ROW_ID_FIELD_NAME] = rowid; // fill rowid to cloud data to notify user
2382     return BindHashKeyAndGidToInsertLogStatement(vBucket, tableSchema, insertLogStmt);
2383 }
2384 
GetWhereConditionForDataTable(const std::string & gidStr,const std::set<std::string> & pkSet,const std::string & tableName,bool queryByPk)2385 std::string SQLiteSingleVerRelationalStorageExecutor::GetWhereConditionForDataTable(const std::string &gidStr,
2386     const std::set<std::string> &pkSet, const std::string &tableName, bool queryByPk)
2387 {
2388     std::string where = " where";
2389     if (!gidStr.empty()) { // gid has higher priority, because primary key may be modified
2390         where += " rowid = (select data_key from " + DBCommon::GetLogTableName(tableName) +
2391             " where cloud_gid = '" + gidStr + "')";
2392     }
2393     if (!pkSet.empty() && queryByPk) {
2394         if (!gidStr.empty()) {
2395             where += " or";
2396         }
2397         where += " (1 = 1";
2398         for (const auto &pk : pkSet) {
2399             where += (" and " + pk + " = ?");
2400         }
2401         where += ");";
2402     }
2403     return where;
2404 }
2405 
GetUpdateSqlForCloudSync(const TableSchema & tableSchema,const VBucket & vBucket,const std::string & gidStr,const std::set<std::string> & pkSet,std::string & updateSql)2406 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateSqlForCloudSync(const TableSchema &tableSchema,
2407     const VBucket &vBucket, const std::string &gidStr, const std::set<std::string> &pkSet, std::string &updateSql)
2408 {
2409     if (pkSet.empty() && gidStr.empty()) {
2410         LOGE("update data fail because both primary key and gid is empty.");
2411         return -E_CLOUD_ERROR;
2412     }
2413     std::string sql = "update " + tableSchema.name + " set";
2414     for (const auto &field : tableSchema.fields) {
2415         sql +=  " " + field.colName + " = ?,";
2416     }
2417     sql.pop_back();
2418     sql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name);
2419     updateSql = sql;
2420     return E_OK;
2421 }
2422 
IsGidValid(const std::string & gidStr)2423 static bool IsGidValid(const std::string &gidStr)
2424 {
2425     if (!gidStr.empty()) {
2426         return gidStr.find("'") == std::string::npos;
2427     }
2428     return true;
2429 }
2430 
GetUpdateDataTableStatement(const VBucket & vBucket,const TableSchema & tableSchema,sqlite3_stmt * & updateStmt)2431 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateDataTableStatement(const VBucket &vBucket,
2432     const TableSchema &tableSchema, sqlite3_stmt *&updateStmt)
2433 {
2434     std::string gidStr;
2435     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2436     if (errCode != E_OK) {
2437         LOGE("Get gid from cloud data fail when construct update data sql, errCode = %d", errCode);
2438         return errCode;
2439     }
2440     if (!IsGidValid(gidStr)) {
2441         LOGE("invalid char in cloud gid");
2442         return -E_CLOUD_ERROR;
2443     }
2444 
2445     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
2446     std::string updateSql;
2447     errCode = GetUpdateSqlForCloudSync(tableSchema, vBucket, gidStr, pkSet, updateSql);
2448     if (errCode != E_OK) {
2449         return errCode;
2450     }
2451 
2452     errCode = SQLiteUtils::GetStatement(dbHandle_, updateSql, updateStmt);
2453     if (errCode != E_OK) {
2454         LOGE("Get update statement failed when update cloud data, %d", errCode);
2455         return errCode;
2456     }
2457 
2458     // bind value
2459     std::vector<Field> fields = tableSchema.fields;
2460     if (!pkSet.empty()) {
2461         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
2462         fields.insert(fields.end(), pkFields.begin(), pkFields.end());
2463     }
2464     errCode = BindValueToUpsertStatement(vBucket, fields, updateStmt);
2465     if (errCode != E_OK) {
2466         LOGE("bind value to update statement failed when update cloud data, %d", errCode);
2467         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2468     }
2469     return errCode;
2470 }
2471 
UpdateCloudData(const std::string & tableName,VBucket & vBucket,const TableSchema & tableSchema)2472 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudData(const std::string &tableName, VBucket &vBucket,
2473     const TableSchema &tableSchema)
2474 {
2475     CloudStorageUtils::PrepareToFillAssetFromVBucket(vBucket, CloudStorageUtils::FillAssetBeforeDownload);
2476     sqlite3_stmt *updateStmt = nullptr;
2477     int errCode = GetUpdateDataTableStatement(vBucket, tableSchema, updateStmt);
2478     if (errCode != E_OK) {
2479         LOGE("Get update data table statement fail, %d", errCode);
2480         return errCode;
2481     }
2482 
2483     // update data
2484     errCode = SQLiteUtils::StepWithRetry(updateStmt, false);
2485     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2486         errCode = E_OK;
2487     } else {
2488         LOGE("update data failed when save cloud data:%d", errCode);
2489         SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2490         return errCode;
2491     }
2492     SQLiteUtils::ResetStatement(updateStmt, true, errCode);
2493 
2494     // update log
2495     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::UPDATE);
2496     if (errCode != E_OK) {
2497         LOGE("update log record failed when update cloud data, errCode = %d", errCode);
2498     }
2499     return errCode;
2500 }
2501 
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)2502 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
2503     const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
2504 {
2505     std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
2506     if (opType == OpType::ONLY_UPDATE_GID) {
2507         updateLogSql += "cloud_gid = ?";
2508         updateColName.push_back(CloudDbConstant::GID_FIELD);
2509     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
2510         updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
2511     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
2512         updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
2513     }  else if (opType == OpType::UPDATE_TIMESTAMP) {
2514         updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
2515             ", timestamp = ?, cloud_gid = ''";
2516         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
2517     } else if (opType == OpType::CLEAR_GID) {
2518         updateLogSql += "cloud_gid = '', flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK);
2519     } else {
2520         if (opType == OpType::DELETE) {
2521             updateLogSql += "data_key = -1, flag = 1, cloud_gid = '', ";
2522         } else {
2523             updateLogSql += "flag = 0, cloud_gid = ?, ";
2524             updateColName.push_back(CloudDbConstant::GID_FIELD);
2525         }
2526         updateLogSql += "device = 'cloud', timestamp = ?";
2527         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
2528     }
2529 
2530     std::string gidStr;
2531     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2532     if (errCode != E_OK) {
2533         LOGE("Get gid from cloud data fail when construct update log sql, errCode = %d", errCode);
2534         return errCode;
2535     }
2536 
2537     updateLogSql += " where ";
2538     if (!gidStr.empty()) {
2539         updateLogSql += "cloud_gid = '" + gidStr + "'";
2540     }
2541     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
2542     if (!pkMap.empty()) {
2543         if (!gidStr.empty()) {
2544             updateLogSql += " or ";
2545         }
2546         updateLogSql += "(hash_key = ?);";
2547     }
2548 
2549     errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
2550     if (errCode != E_OK) {
2551         LOGE("Get update log statement failed when update cloud data, %d", errCode);
2552     }
2553     return errCode;
2554 }
2555 
IsAllowWithPrimaryKey(OpType opType)2556 static inline bool IsAllowWithPrimaryKey(OpType opType)
2557 {
2558     return (opType == OpType::DELETE || opType == OpType::UPDATE_TIMESTAMP || opType == OpType::CLEAR_GID ||
2559         opType == OpType::ONLY_UPDATE_GID);
2560 }
2561 
UpdateLogRecord(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)2562 int SQLiteSingleVerRelationalStorageExecutor::UpdateLogRecord(const VBucket &vBucket, const TableSchema &tableSchema,
2563     OpType opType)
2564 {
2565     sqlite3_stmt *updateLogStmt = nullptr;
2566     std::vector<std::string> updateColName;
2567     int errCode = GetUpdateLogRecordStatement(tableSchema, vBucket, opType, updateColName, updateLogStmt);
2568     if (errCode != E_OK) {
2569         LOGE("Get update log statement failed, errCode = %d", errCode);
2570         return errCode;
2571     }
2572 
2573     errCode = BindValueToUpdateLogStatement(vBucket, tableSchema, updateColName, IsAllowWithPrimaryKey(opType),
2574         updateLogStmt);
2575     int ret = E_OK;
2576     if (errCode != E_OK) {
2577         LOGE("bind value to update log statement failed when update cloud data, %d", errCode);
2578         SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
2579         return errCode != E_OK ? errCode : ret;
2580     }
2581 
2582     errCode = SQLiteUtils::StepWithRetry(updateLogStmt, false);
2583     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2584         errCode = E_OK;
2585     } else {
2586         LOGE("update log record failed when update cloud data:%d", errCode);
2587     }
2588     SQLiteUtils::ResetStatement(updateLogStmt, true, ret);
2589     return errCode != E_OK ? errCode : ret;
2590 }
2591 
BindValueToUpdateLogStatement(const VBucket & vBucket,const TableSchema & tableSchema,const std::vector<std::string> & colNames,bool allowPrimaryKeyEmpty,sqlite3_stmt * updateLogStmt)2592 int SQLiteSingleVerRelationalStorageExecutor::BindValueToUpdateLogStatement(const VBucket &vBucket,
2593     const TableSchema &tableSchema, const std::vector<std::string> &colNames, bool allowPrimaryKeyEmpty,
2594     sqlite3_stmt *updateLogStmt)
2595 {
2596     int index = 0;
2597     int errCode = E_OK;
2598     for (const auto &colName : colNames) {
2599         index++;
2600         if (colName == CloudDbConstant::GID_FIELD) {
2601             if (vBucket.find(colName) == vBucket.end()) {
2602                 LOGE("cloud data doesn't contain gid field when bind update log stmt.");
2603                 return -E_CLOUD_ERROR;
2604             }
2605             errCode = SQLiteUtils::BindTextToStatement(updateLogStmt, index,
2606                 std::get<std::string>(vBucket.at(colName)));
2607         } else if (colName == CloudDbConstant::MODIFY_FIELD) {
2608             if (vBucket.find(colName) == vBucket.end()) {
2609                 LOGE("cloud data doesn't contain modify field when bind update log stmt.");
2610                 return -E_CLOUD_ERROR;
2611             }
2612             errCode = SQLiteUtils::BindInt64ToStatement(updateLogStmt, index, std::get<int64_t>(vBucket.at(colName)));
2613         } else {
2614             LOGE("invalid col name when bind value to update log statement.");
2615             return -E_INTERNAL_ERROR;
2616         }
2617         if (errCode != E_OK) {
2618             LOGE("fail to bind value to update log statement.");
2619             return errCode;
2620         }
2621     }
2622     std::map<std::string, Field> pkMap = CloudStorageUtils::GetCloudPrimaryKeyFieldMap(tableSchema);
2623     if (pkMap.empty()) {
2624         return E_OK;
2625     }
2626 
2627     std::vector<uint8_t> hashKey;
2628     errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey, allowPrimaryKeyEmpty);
2629     if (errCode != E_OK) {
2630         return errCode;
2631     }
2632     return SQLiteUtils::BindBlobToStatement(updateLogStmt, index + 1, hashKey);
2633 }
2634 
GetDeleteStatementForCloudSync(const TableSchema & tableSchema,const std::set<std::string> & pkSet,const VBucket & vBucket,sqlite3_stmt * & deleteStmt)2635 int SQLiteSingleVerRelationalStorageExecutor::GetDeleteStatementForCloudSync(const TableSchema &tableSchema,
2636     const std::set<std::string> &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt)
2637 {
2638     std::string gidStr;
2639     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::GID_FIELD, vBucket, gidStr);
2640     if (errCode != E_OK) {
2641         LOGE("Get gid from cloud data fail when construct delete sql, errCode = %d", errCode);
2642         return errCode;
2643     }
2644     if (gidStr.empty() || gidStr.find("'") != std::string::npos) {
2645         LOGE("empty or invalid char in cloud gid");
2646         return -E_CLOUD_ERROR;
2647     }
2648 
2649     bool queryByPk = CloudStorageUtils::IsVbucketContainsAllPK(vBucket, pkSet);
2650     std::string deleteSql = "delete from " + tableSchema.name;
2651     deleteSql += GetWhereConditionForDataTable(gidStr, pkSet, tableSchema.name, queryByPk);
2652     errCode = SQLiteUtils::GetStatement(dbHandle_, deleteSql, deleteStmt);
2653     if (errCode != E_OK) {
2654         LOGE("Get delete statement failed when delete data, %d", errCode);
2655         return errCode;
2656     }
2657 
2658     int ret = E_OK;
2659     if (!pkSet.empty() && queryByPk) {
2660         std::vector<Field> pkFields = CloudStorageUtils::GetCloudPrimaryKeyField(tableSchema);
2661         errCode = BindValueToUpsertStatement(vBucket, pkFields, deleteStmt);
2662         if (errCode != E_OK) {
2663             LOGE("bind value to delete statement failed when delete cloud data, %d", errCode);
2664             SQLiteUtils::ResetStatement(deleteStmt, true, ret);
2665         }
2666     }
2667     return errCode != E_OK ? errCode : ret;
2668 }
2669 
DeleteCloudData(const std::string & tableName,const VBucket & vBucket,const TableSchema & tableSchema)2670 int SQLiteSingleVerRelationalStorageExecutor::DeleteCloudData(const std::string &tableName, const VBucket &vBucket,
2671     const TableSchema &tableSchema)
2672 {
2673     std::set<std::string> pkSet = CloudStorageUtils::GetCloudPrimaryKey(tableSchema);
2674     sqlite3_stmt *deleteStmt = nullptr;
2675     int errCode = GetDeleteStatementForCloudSync(tableSchema, pkSet, vBucket, deleteStmt);
2676     if (errCode != E_OK) {
2677         return errCode;
2678     }
2679 
2680     errCode = SQLiteUtils::StepWithRetry(deleteStmt, false);
2681     int ret = E_OK;
2682     SQLiteUtils::ResetStatement(deleteStmt, true, ret);
2683     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
2684         LOGE("delete data failed when sync with cloud:%d", errCode);
2685         return errCode;
2686     }
2687     if (ret != E_OK) {
2688         LOGE("reset delete statement failed:%d", ret);
2689         return ret;
2690     }
2691 
2692     // update log
2693     errCode = UpdateLogRecord(vBucket, tableSchema, OpType::DELETE);
2694     if (errCode != E_OK) {
2695         LOGE("update log record failed when delete cloud data, errCode = %d", errCode);
2696     }
2697     return errCode;
2698 }
2699 
OnlyUpdateLogTable(const VBucket & vBucket,const TableSchema & tableSchema,OpType opType)2700 int SQLiteSingleVerRelationalStorageExecutor::OnlyUpdateLogTable(const VBucket &vBucket,
2701     const TableSchema &tableSchema, OpType opType)
2702 {
2703     return UpdateLogRecord(vBucket, tableSchema, opType);
2704 }
2705 } // namespace DistributedDB
2706 #endif
2707