• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33 
34 namespace DistributedDB {
35 namespace {
36 static constexpr const char *DATAKEY = "DATA_KEY";
37 static constexpr const char *DEVICE_FIELD = "DEVICE";
38 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
39 static constexpr const char *VERSION = "VERSION";
40 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
41 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
42 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
43 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
44 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
45     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
46 // clean 5th bit of flag to one which is wait compensated sync
47 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
48     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
49 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
50 // set data logic delete and exist passport
51 static constexpr const char *SET_FLAG_LOGIC_DELETE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x02)";
52 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
53 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
54 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
55 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
56 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
57 static constexpr const int DATA_KEY_INDEX = 0;
58 static constexpr const int TIMESTAMP_INDEX = 3;
59 static constexpr const int W_TIMESTAMP_INDEX = 4;
60 static constexpr const int FLAG_INDEX = 5;
61 static constexpr const int HASH_KEY_INDEX = 6;
62 static constexpr const int CLOUD_GID_INDEX = 7;
63 static constexpr const int VERSION_INDEX = 8;
64 static constexpr const int STATUS_INDEX = 9;
65 
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)66 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
67 {
68     if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
69         return SQLITE_DENY;
70     }
71     if (b == SQLITE_FUNCTION) {
72         if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
73             LOGE("Deny fts3_tokenizer in remote query");
74             return SQLITE_DENY;
75         }
76     }
77     return SQLITE_OK;
78 }
79 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)80 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
81     DistributedTableMode mode)
82     : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
83       assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
84       maxUploadCount_(0), maxUploadSize_(0)
85 {
86     bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
87     bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
88     bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
89     bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
90     bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
91     bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
92     bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
93 }
94 
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)95 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
96 {
97     std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
98     if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
99         LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
100         return -E_NOT_SUPPORT;
101     }
102     std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
103     for (const auto &field : fieldInfos) {
104         if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
105             LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
106             return -E_NOT_SUPPORT;
107         }
108     }
109 
110     if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
111         if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
112             LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
113             return -E_NOT_SUPPORT;
114         }
115 
116         if (DBCommon::HasConstraint(trimedSql, "ON CONFLICT", " )", " ")) {
117             LOGE("[CreateDistributedTable] Not support create distributed table with 'ON CONFLICT' constraint.");
118             return -E_NOT_SUPPORT;
119         }
120 
121         if (mode == DistributedTableMode::COLLABORATION) {
122             if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
123                 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
124                 return -E_NOT_SUPPORT;
125             }
126         }
127 
128         if (syncType == CLOUD_COOPERATION) {
129             int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
130             if (errCode != E_OK) {
131                 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
132                 return errCode;
133             }
134         }
135     }
136 
137     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
138         if (table.GetPrimaryKey().size() > 1) {
139             LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
140             return -E_NOT_SUPPORT;
141         }
142     }
143 
144     return E_OK;
145 }
146 
147 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)148 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
149 {
150     std::string sql = "SELECT get_sys_time(0) - max(" + std::string(DBConstant::SQLITE_INNER_ROWID) + ") - 1 FROM '" +
151         tableName + "';";
152     sqlite3_stmt *stmt = nullptr;
153     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
154     if (errCode != E_OK) {
155         return errCode;
156     }
157     errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
158     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
159         timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
160         errCode = E_OK;
161     }
162     SQLiteUtils::ResetStatement(stmt, true, errCode);
163     return errCode;
164 }
165 }
166 
GeneLogInfoForExistedData(sqlite3 * db,const std::string & tableName,const std::string & calPrimaryKeyHash,TableInfo & tableInfo)167 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName,
168     const std::string &calPrimaryKeyHash, TableInfo &tableInfo)
169 {
170     int64_t timeOffset = 0;
171     int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
172     if (errCode != E_OK) {
173         return errCode;
174     }
175     errCode = SetLogTriggerStatus(false);
176     if (errCode != E_OK) {
177         return errCode;
178     }
179     std::string timeOffsetStr = std::to_string(timeOffset);
180     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
181     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
182     std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
183         static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_CONSISTENCY));
184     if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
185         std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + timeOffsetStr +
186             " + " + rowid + ", " + timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', " +
187             "'', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
188         errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
189         if (errCode != E_OK) {
190             LOGE("Failed to initialize device type log data.%d", errCode);
191         }
192         return errCode;
193     }
194     TrackerTable trackerTable = tableInfo.GetTrackerTable();
195     trackerTable.SetTableName(tableName);
196     std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid +
197         ", '', '', " + timeOffsetStr + " + " + rowid + ", " +
198         timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', ";
199     sql += tableInfo.GetTrackerTable().GetExtendName().empty() ? "''" : tableInfo.GetTrackerTable().GetExtendName();
200     sql += ", 0, '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
201     errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
202         int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
203         if (ret != E_OK) {
204             LOGE("Failed to initialize cloud type log data.%d", ret);
205         }
206         return ret;
207     });
208     return errCode;
209 }
210 
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table,TableSyncType syncType)211 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
212     const std::string &identity, TableInfo &table, TableSyncType syncType)
213 {
214     if (dbHandle_ == nullptr) {
215         return -E_INVALID_DB;
216     }
217 
218     const std::string tableName = table.GetTableName();
219     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
220     if (errCode != E_OK) {
221         LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
222         return errCode;
223     }
224 
225     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
226         bool isEmpty = false;
227         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
228         if (errCode != E_OK) {
229             LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
230             return -E_NOT_SUPPORT;
231         }
232         if (!isEmpty) {
233             LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
234                 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
235                 static_cast<int>(syncType));
236         }
237     }
238 
239     errCode = CheckTableConstraint(table, mode, syncType);
240     if (errCode != E_OK) {
241         LOGE("[CreateDistributedTable] check table constraint failed.");
242         return errCode;
243     }
244 
245     // create log table
246     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
247     errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
248     if (errCode != E_OK) {
249         LOGE("[CreateDistributedTable] create log table failed");
250         return errCode;
251     }
252 
253     if (!isUpgraded) {
254         std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, identity);
255         errCode = GeneLogInfoForExistedData(dbHandle_, tableName, calPrimaryKeyHash, table);
256         if (errCode != E_OK) {
257             return errCode;
258         }
259     }
260 
261     // add trigger
262     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
263     if (errCode != E_OK) {
264         LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
265         return errCode;
266     }
267     return SetLogTriggerStatus(true);
268 }
269 
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)270 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
271     DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
272 {
273     if (dbHandle_ == nullptr) {
274         return -E_INVALID_DB;
275     }
276     TableInfo newTableInfo;
277     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
278     if (errCode != E_OK) {
279         LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
280         return errCode;
281     }
282 
283     if (CheckTableConstraint(newTableInfo, mode, syncType)) {
284         LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
285         return -E_NOT_SUPPORT;
286     }
287 
288     // new table should has same or compatible upgrade
289     TableInfo tableInfo = schema.GetTable(tableName);
290     errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
291     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
292         LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
293         return -E_SCHEMA_MISMATCH;
294     } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
295         LOGD("[UpgradeDistributedTable] schema has not changed.");
296         // update table if tableName changed
297         schema.RemoveRelationalTable(tableName);
298         tableInfo.SetTableName(tableName);
299         schema.AddRelationalTable(tableInfo);
300         return E_OK;
301     }
302 
303     schemaChanged = true;
304     errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
305     if (errCode != E_OK) {
306         LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
307     }
308 
309     schema.AddRelationalTable(newTableInfo);
310     return errCode;
311 }
312 
313 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)314 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
315     std::vector<std::string> &deviceTables)
316 {
317     if (device.empty() && tableName.empty()) { // device and table name should not both be empty
318         return -E_INVALID_ARGS;
319     }
320     std::string devicePattern = device.empty() ? "%" : device;
321     std::string tablePattern = tableName.empty() ? "%" : tableName;
322     std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
323 
324     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
325         deviceTableName + "';";
326     sqlite3_stmt *stmt = nullptr;
327     int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
328     if (errCode != E_OK) {
329         return errCode;
330     }
331 
332     do {
333         errCode = SQLiteUtils::StepWithRetry(stmt, false);
334         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
335             errCode = E_OK;
336             break;
337         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
338             LOGE("Get table name failed. %d", errCode);
339             break;
340         }
341         std::string realTableName;
342         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
343         if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
344             continue;
345         }
346         if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
347             continue;
348         }
349         deviceTables.emplace_back(realTableName);
350     } while (true);
351 
352     SQLiteUtils::ResetStatement(stmt, true, errCode);
353     return errCode;
354 }
355 
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)356 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
357 {
358     std::vector<FieldInfo> fields;
359     auto itOld = oldTableInfo.GetFields().begin();
360     auto itNew = newTableInfo.GetFields().begin();
361     for (; itNew != newTableInfo.GetFields().end(); itNew++) {
362         if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
363             fields.emplace_back(itNew->second);
364             continue;
365         }
366         itOld++;
367     }
368     return fields;
369 }
370 
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)371 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
372 {
373     if (db == nullptr) {
374         return -E_INVALID_ARGS;
375     }
376 
377     std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
378         return a.GetColumnId()< b.GetColumnId();
379     });
380     int errCode = E_OK;
381     for (const auto &table : tables) {
382         for (const auto &field : fields) {
383             std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
384             alterSql += "'" + field.GetDataType() + "'";
385             alterSql += field.IsNotNull() ? " NOT NULL" : "";
386             alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
387             alterSql += ";";
388             errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
389             if (errCode != E_OK) {
390                 LOGE("Alter table failed. %d", errCode);
391                 break;
392             }
393         }
394     }
395     return errCode;
396 }
397 
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)398 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
399 {
400     IndexInfoMap indexes;
401     auto itOld = oldTableInfo.GetIndexDefine().begin();
402     auto itNew = newTableInfo.GetIndexDefine().begin();
403     auto itOldEnd = oldTableInfo.GetIndexDefine().end();
404     auto itNewEnd = newTableInfo.GetIndexDefine().end();
405 
406     while (itOld != itOldEnd && itNew != itNewEnd) {
407         if (itOld->first == itNew->first) {
408             if (itOld->second != itNew->second) {
409                 indexes.insert({itNew->first, itNew->second});
410             }
411             itOld++;
412             itNew++;
413         } else if (itOld->first < itNew->first) {
414             indexes.insert({itOld->first, {}});
415             itOld++;
416         } else {
417             indexes.insert({itNew->first, itNew->second});
418             itNew++;
419         }
420     }
421 
422     while (itOld != itOldEnd) {
423         indexes.insert({itOld->first, {}});
424         itOld++;
425     }
426 
427     while (itNew != itNewEnd) {
428         indexes.insert({itNew->first, itNew->second});
429         itNew++;
430     }
431 
432     return indexes;
433 }
434 
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)435 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
436 {
437     if (db == nullptr) {
438         return -E_INVALID_ARGS;
439     }
440 
441     int errCode = E_OK;
442     for (const auto &table : tables) {
443         for (const auto &index : indexes) {
444             if (index.first.empty()) {
445                 continue;
446             }
447             std::string realIndexName = table + "_" + index.first;
448             std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
449             errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
450             if (errCode != E_OK) {
451                 LOGE("Drop index failed. %d", errCode);
452                 return errCode;
453             }
454 
455             if (index.second.empty()) { // empty means drop index only
456                 continue;
457             }
458 
459             auto it = index.second.begin();
460             std::string indexDefine = *it++;
461             while (it != index.second.end()) {
462                 indexDefine += ", " + *it++;
463             }
464             std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
465                 "(" + indexDefine + ");";
466             errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
467             if (errCode != E_OK) {
468                 LOGE("Create index failed. %d", errCode);
469                 break;
470             }
471         }
472     }
473     return errCode;
474 }
475 }
476 
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)477 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
478     const TableInfo &newTableInfo)
479 {
480     std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
481     IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
482     std::vector<std::string> deviceTables;
483     int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
484     if (errCode != E_OK) {
485         LOGE("Get device table name for alter table failed. %d", errCode);
486         return errCode;
487     }
488 
489     LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
490         upgradeIndexes.size(), deviceTables.size());
491     errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
492     if (errCode != E_OK) {
493         LOGE("upgrade fields failed. %d", errCode);
494         return errCode;
495     }
496 
497     errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
498     if (errCode != E_OK) {
499         LOGE("upgrade indexes failed. %d", errCode);
500     }
501 
502     return errCode;
503 }
504 
StartTransaction(TransactType type)505 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
506 {
507     if (dbHandle_ == nullptr) {
508         LOGE("Begin transaction failed, dbHandle is null.");
509         return -E_INVALID_DB;
510     }
511     int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
512     if (errCode != E_OK) {
513         LOGE("Begin transaction failed, errCode = %d", errCode);
514     }
515     return errCode;
516 }
517 
Commit()518 int SQLiteSingleVerRelationalStorageExecutor::Commit()
519 {
520     if (dbHandle_ == nullptr) {
521         return -E_INVALID_DB;
522     }
523 
524     return SQLiteUtils::CommitTransaction(dbHandle_);
525 }
526 
Rollback()527 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
528 {
529     if (dbHandle_ == nullptr) {
530         return -E_INVALID_DB;
531     }
532     int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
533     if (errCode != E_OK) {
534         LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
535     }
536     return errCode;
537 }
538 
SetTableInfo(const TableInfo & tableInfo)539 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
540 {
541     table_ = tableInfo;
542 }
543 
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)544 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
545 {
546     logInfo.dataKey = sqlite3_column_int64(logStatement, 0);  // 0 means dataKey index
547 
548     std::vector<uint8_t> dev;
549     int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev);  // 1 means dev index
550     if (errCode != E_OK) {
551         return errCode;
552     }
553     logInfo.device = std::string(dev.begin(), dev.end());
554 
555     std::vector<uint8_t> oriDev;
556     errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev);  // 2 means ori_dev index
557     if (errCode != E_OK) {
558         return errCode;
559     }
560     logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
561     logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3));  // 3 means timestamp index
562     logInfo.wTimestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4));  // 4 means w_timestamp index
563     logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5));  // 5 means flag index
564     logInfo.flag &= (~DataItem::LOCAL_FLAG);
565     logInfo.flag &= (~DataItem::UPDATE_FLAG);
566     return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey);  // 6 means hashKey index
567 }
568 
569 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize)570 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize)
571 {
572     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
573     uint64_t curTime = 0;
574     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
575         if (modifyTime > static_cast<int64_t>(curTime)) {
576             modifyTime = static_cast<int64_t>(curTime);
577         }
578     } else {
579         LOGW("[Relational] get raw sys time failed.");
580     }
581     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
582     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
583         static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
584     totalSize += sizeof(int64_t) + sizeof(int64_t);
585     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
586         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
587             sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
588         if (!cloudGid.empty()) {
589             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
590             totalSize += cloudGid.size();
591         }
592     }
593     std::string version;
594     SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
595     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
596     totalSize += version.size();
597 }
598 
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)599 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
600 {
601     flags.insert_or_assign(CloudDbConstant::ROWID,
602         static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
603     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
604         static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
605     flags.insert_or_assign(CloudDbConstant::FLAG,
606         static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
607     Bytes hashKey;
608     (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
609     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
610     flags.insert_or_assign(CloudDbConstant::STATUS,
611         static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
612 }
613 
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)614 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
615 {
616     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
617         return;
618     }
619     std::string gid = reinterpret_cast<const std::string::value_type *>(
620         sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
621     if (gid.empty()) {
622         LOGW("[Relational] Get cloud gid is null.");
623         return;
624     }
625     cloudGid.emplace_back(gid);
626 }
627 }
628 
GetDataItemSerialSize(DataItem & item,size_t appendLen)629 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
630 {
631     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
632     // the size would not be very large.
633     static const size_t maxOrigDevLength = 40;
634     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
635     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
636         Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
637     return dataSize;
638 }
639 
GetKvData(const Key & key,Value & value) const640 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
641 {
642     static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
643         "metadata WHERE key=?;";
644     sqlite3_stmt *statement = nullptr;
645     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
646     if (errCode != E_OK) {
647         goto END;
648     }
649 
650     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
651     if (errCode != E_OK) {
652         goto END;
653     }
654 
655     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
656     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
657         errCode = -E_NOT_FOUND;
658         goto END;
659     } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
660         goto END;
661     }
662 
663     errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
664     END:
665     SQLiteUtils::ResetStatement(statement, true, errCode);
666     return errCode;
667 }
668 
PutKvData(const Key & key,const Value & value) const669 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
670 {
671     static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
672         "metadata VALUES(?,?);";
673     sqlite3_stmt *statement = nullptr;
674     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
675     if (errCode != E_OK) {
676         return errCode;
677     }
678 
679     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);  // 1 means key index
680     if (errCode != E_OK) {
681         LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
682         goto ERROR;
683     }
684 
685     errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true);  // 2 means value index
686     if (errCode != E_OK) {
687         LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
688         goto ERROR;
689     }
690     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
691     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
692         errCode = E_OK;
693     }
694 ERROR:
695     SQLiteUtils::ResetStatement(statement, true, errCode);
696     return errCode;
697 }
698 
DeleteMetaData(const std::vector<Key> & keys) const699 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
700 {
701     static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
702         "metadata WHERE key=?;";
703     sqlite3_stmt *statement = nullptr;
704     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
705     if (errCode != E_OK) {
706         return errCode;
707     }
708 
709     for (const auto &key : keys) {
710         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
711         if (errCode != E_OK) {
712             break;
713         }
714 
715         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
716         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
717             break;
718         }
719         errCode = E_OK;
720         SQLiteUtils::ResetStatement(statement, false, errCode);
721     }
722     SQLiteUtils::ResetStatement(statement, true, errCode);
723     return CheckCorruptedStatus(errCode);
724 }
725 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const726 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
727 {
728     static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
729         "metadata WHERE key>=? AND key<=?;";
730     sqlite3_stmt *statement = nullptr;
731     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
732     if (errCode != E_OK) {
733         return errCode;
734     }
735 
736     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
737     if (errCode == E_OK) {
738         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
739         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
740             errCode = E_OK;
741         }
742     }
743     SQLiteUtils::ResetStatement(statement, true, errCode);
744     return CheckCorruptedStatus(errCode);
745 }
746 
GetAllMetaKeys(std::vector<Key> & keys) const747 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
748 {
749     static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
750     sqlite3_stmt *statement = nullptr;
751     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
752     if (errCode != E_OK) {
753         LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
754         return errCode;
755     }
756     errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
757     SQLiteUtils::ResetStatement(statement, true, errCode);
758     return errCode;
759 }
760 
GetLogInfoPre(sqlite3_stmt * queryStmt,const DataItem & dataItem,LogInfo & logInfoGet)761 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem,
762     LogInfo &logInfoGet)
763 {
764     if (queryStmt == nullptr) {
765         return -E_INVALID_ARGS;
766     }
767     int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey);  // 1 means hashkey index.
768     if (errCode != E_OK) {
769         return errCode;
770     }
771     if (mode_ != DistributedTableMode::COLLABORATION) {
772         errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev);  // 2 means device index.
773         if (errCode != E_OK) {
774             return errCode;
775         }
776     }
777 
778     errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
779     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
780         errCode = -E_NOT_FOUND;
781     } else {
782         errCode = GetLogData(queryStmt, logInfoGet);
783     }
784     return errCode;
785 }
786 
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)787 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
788     const DataItem &dataItem, int64_t rowid)
789 {
790     LogInfo logInfoGet;
791     int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
792     LogInfo logInfoBind;
793     logInfoBind.hashKey = dataItem.hashKey;
794     logInfoBind.device = dataItem.dev;
795     logInfoBind.timestamp = dataItem.timestamp;
796     logInfoBind.flag = dataItem.flag;
797 
798     if (errCode == -E_NOT_FOUND) { // insert
799         logInfoBind.wTimestamp = dataItem.writeTimestamp;
800         logInfoBind.originDev = dataItem.dev;
801     } else if (errCode == E_OK) { // update
802         logInfoBind.wTimestamp = logInfoGet.wTimestamp;
803         logInfoBind.originDev = logInfoGet.originDev;
804     } else {
805         return errCode;
806     }
807 
808     // bind
809     SQLiteUtils::BindInt64ToStatement(statement, 1, rowid);  // 1 means dataKey index
810     std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
811     SQLiteUtils::BindBlobToStatement(statement, 2, originDev);  // 2 means ori_dev index
812     SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp);  // 3 means timestamp index
813     SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp);  // 4 means w_timestamp index
814     SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag);  // 5 means flag index
815     SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey);  // 6 means hashKey index
816     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
817     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
818         return E_OK;
819     }
820     return errCode;
821 }
822 
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)823 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
824     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
825 {
826     if (stmt == nullptr) {
827         int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
828         if (errCode != E_OK) {
829             LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
830             return errCode;
831         }
832     }
833 
834     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
835     if (errCode != E_OK) {
836         SQLiteUtils::ResetStatement(stmt, true, errCode);
837         return errCode;
838     }
839     if (mode_ != DistributedTableMode::COLLABORATION) {
840         errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
841         if (errCode != E_OK) {
842             SQLiteUtils::ResetStatement(stmt, true, errCode);
843             return errCode;
844         }
845     }
846     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
847     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
848         errCode = E_OK;
849     }
850     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
851     return errCode;
852 }
853 
SaveSyncDataItem(const DataItem & dataItem,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)854 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt,
855     RelationalSyncDataInserter &inserter, int64_t &rowid)
856 {
857     if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
858         return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
859     }
860     if ((mode_ == DistributedTableMode::COLLABORATION && inserter.GetLocalTable().GetIdentifyKey().size() == 1u &&
861         inserter.GetLocalTable().GetIdentifyKey().at(0) == "rowid") ||
862         (mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().GetPrimaryKey().size() == 1u &&
863         inserter.GetLocalTable().GetPrimaryKey().at(0) == "rowid") ||
864         inserter.GetLocalTable().GetAutoIncrement()) {  // No primary key of auto increment
865         int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
866         if (errCode != E_OK) {
867             LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
868             return errCode;
869         }
870     }
871 
872     int errCode = inserter.BindInsertStatement(saveStmt.saveDataStmt, dataItem);
873     if (errCode != E_OK) {
874         LOGE("Bind data failed, errCode=%d.", errCode);
875         return errCode;
876     }
877     errCode = SQLiteUtils::StepWithRetry(saveStmt.saveDataStmt, isMemDb_);
878     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
879         rowid = SQLiteUtils::GetLastRowId(dbHandle_);
880         errCode = E_OK;
881     }
882     return errCode;
883 }
884 
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)885 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
886     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
887 {
888     if (stmt == nullptr) {
889         int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
890         if (errCode != E_OK) {
891             LOGE("[DeleteSyncLog] Get statement fail!");
892             return errCode;
893         }
894     }
895 
896     int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
897     if (errCode != E_OK) {
898         SQLiteUtils::ResetStatement(stmt, true, errCode);
899         return errCode;
900     }
901     if (mode_ != DistributedTableMode::COLLABORATION) {
902         errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
903         if (errCode != E_OK) {
904             SQLiteUtils::ResetStatement(stmt, true, errCode);
905             return errCode;
906         }
907     }
908     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
909     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
910         errCode = E_OK;
911     }
912     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
913     return errCode;
914 }
915 
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)916 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
917     RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
918 {
919     int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
920     if (errCode != E_OK) {
921         return errCode;
922     }
923     return DeleteSyncLog(item, inserter, rmLogStmt);
924 }
925 
GetSyncDataPre(const DataItem & dataItem,sqlite3_stmt * queryStmt,DataItem & itemGet)926 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt,
927     DataItem &itemGet)
928 {
929     LogInfo logInfoGet;
930     int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
931     itemGet.timestamp = logInfoGet.timestamp;
932     SQLiteUtils::ResetStatement(queryStmt, false, errCode);
933     return errCode;
934 }
935 
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated)936 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
937     sqlite3_stmt *queryStmt, bool &isDefeated)
938 {
939     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
940         mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
941         isDefeated = false; // no need to solve conflict except miss query data
942         return E_OK;
943     }
944 
945     DataItem itemGet;
946     int errCode = GetSyncDataPre(dataItem, queryStmt, itemGet);
947     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
948         LOGE("Failed to get raw data. %d", errCode);
949         return errCode;
950     }
951     isDefeated = (dataItem.timestamp <= itemGet.timestamp); // defeated if item timestamp is earlier then raw data
952     return E_OK;
953 }
954 
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)955 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
956     SaveSyncDataStmt &saveStmt, DataItem &item)
957 {
958     bool isDefeated = false;
959     int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated);
960     if (errCode != E_OK) {
961         LOGE("check data conflict failed. %d", errCode);
962         return errCode;
963     }
964 
965     if (isDefeated) {
966         LOGD("Data was defeated.");
967         return E_OK;
968     }
969     if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
970         return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
971     }
972     int64_t rowid = -1;
973     errCode = SaveSyncDataItem(item, saveStmt, inserter, rowid);
974     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
975         errCode = SaveSyncLog(saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
976     }
977     return errCode;
978 }
979 
SaveSyncDataItems(RelationalSyncDataInserter & inserter)980 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
981 {
982     SaveSyncDataStmt saveStmt;
983     int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
984     if (errCode != E_OK) {
985         LOGE("Prepare insert sync data statement failed.");
986         return errCode;
987     }
988 
989     errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
990         if (item.neglect) { // Do not save this record if it is neglected
991             return E_OK;
992         }
993         int errCode = SaveSyncDataItem(inserter, saveStmt, item);
994         if (errCode != E_OK) {
995             LOGE("save sync data item failed. err=%d", errCode);
996             return errCode;
997         }
998         // Need not reset rmDataStmt and rmLogStmt here.
999         return saveStmt.ResetStatements(false);
1000     });
1001 
1002     int ret = saveStmt.ResetStatements(true);
1003     return errCode != E_OK ? errCode : ret;
1004 }
1005 
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)1006 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
1007 {
1008     if (useTrans) {
1009         int errCode = StartTransaction(TransactType::IMMEDIATE);
1010         if (errCode != E_OK) {
1011             return errCode;
1012         }
1013     }
1014 
1015     int errCode = SetLogTriggerStatus(false);
1016     if (errCode != E_OK) {
1017         goto END;
1018     }
1019 
1020     errCode = SaveSyncDataItems(inserter);
1021     if (errCode != E_OK) {
1022         LOGE("Save sync data items failed. errCode=%d", errCode);
1023         goto END;
1024     }
1025 
1026     errCode = SetLogTriggerStatus(true);
1027 END:
1028     if (useTrans) {
1029         if (errCode == E_OK) {
1030             errCode = Commit();
1031         } else {
1032             (void)Rollback(); // Keep the error code of the first scene
1033         }
1034     }
1035     return errCode;
1036 }
1037 
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1038 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1039     bool isGettingDeletedData) const
1040 {
1041     RowDataWithLog data;
1042     int errCode = GetLogData(stmt, data.logInfo);
1043     if (errCode != E_OK) {
1044         LOGE("relational data value transfer to kv fail");
1045         return errCode;
1046     }
1047 
1048     if (!isGettingDeletedData) {
1049         for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
1050             DataValue value;
1051             errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1052                 value);
1053             if (errCode != E_OK) {
1054                 return errCode;
1055             }
1056             data.rowData.push_back(std::move(value)); // sorted by cid
1057         }
1058     }
1059 
1060     errCode = DataTransformer::SerializeDataItem(data,
1061         isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
1062     if (errCode != E_OK) {
1063         LOGE("relational data value transfer to kv fail");
1064     }
1065     return errCode;
1066 }
1067 
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1068 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1069 {
1070     int errCode = GetDataItemForSync(fullStmt, item, false);
1071     if (errCode != E_OK) {
1072         return errCode;
1073     }
1074     item.value = {};
1075     item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1076     return E_OK;
1077 }
1078 
1079 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1080 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp &timestamp)
1081 {
1082     if (stmt == nullptr) {
1083         return -E_INVALID_ARGS;
1084     }
1085     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1086     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1087         timestamp = INT64_MAX;
1088         errCode = E_OK;
1089     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1090         timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3));  // 3 means timestamp index
1091         errCode = E_OK;
1092     }
1093     return errCode;
1094 }
1095 
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1096 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1097     std::vector<DataItem> &dataItems, DataItem &&item)
1098 {
1099     // If one record is over 4M, ignore it.
1100     if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1101         overLongSize++;
1102     } else {
1103         // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1104         dataTotalSize += GetDataItemSerialSize(item, appendLength);
1105         if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1106             return -E_UNFINISHED;
1107         } else {
1108             dataItems.push_back(item);
1109         }
1110     }
1111     return E_OK;
1112 }
1113 }
1114 
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1115 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1116     sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1117 {
1118     if (!isFirstTime) { // For the first time, never step before, can get nothing
1119         int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1120         if (errCode != E_OK) {
1121             return errCode;
1122         }
1123     }
1124     return StepNext(isMemDb_, queryStmt, queryTime);
1125 }
1126 
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1127 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1128     Timestamp &missQueryTime)
1129 {
1130     int errCode = GetMissQueryData(fullStmt, item);
1131     if (errCode != E_OK) {
1132         return errCode;
1133     }
1134     return StepNext(isMemDb_, fullStmt, missQueryTime);
1135 }
1136 
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1137 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1138     const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1139     const TableInfo &tableInfo)
1140 {
1141     baseTblName_ = tableInfo.GetTableName();
1142     SetTableInfo(tableInfo);
1143     sqlite3_stmt *queryStmt = nullptr;
1144     sqlite3_stmt *fullStmt = nullptr;
1145     bool isGettingDeletedData = false;
1146     int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1147     if (errCode != E_OK) {
1148         return errCode;
1149     }
1150 
1151     Timestamp queryTime = 0;
1152     Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1153 
1154     bool isFirstTime = true;
1155     size_t dataTotalSize = 0;
1156     size_t overLongSize = 0;
1157     do {
1158         DataItem item;
1159         if (queryTime < missQueryTime) {
1160             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1161         } else if (queryTime == missQueryTime) {
1162             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1163             if (errCode != E_OK) {
1164                 break;
1165             }
1166             errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1167         } else {
1168             errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1169         }
1170 
1171         if (errCode == E_OK && !isFirstTime) {
1172             errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1173         }
1174 
1175         if (errCode != E_OK) {
1176             break;
1177         }
1178 
1179         isFirstTime = false;
1180         if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1181             errCode = -E_FINISHED;
1182             break;
1183         }
1184     } while (true);
1185     LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1186         errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1187     SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1188     SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1189     return errCode;
1190 }
1191 
CheckDBModeForRelational()1192 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1193 {
1194     std::string journalMode;
1195     int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1196 
1197     for (auto &c : journalMode) { // convert to lowercase
1198         c = static_cast<char>(std::tolower(c));
1199     }
1200 
1201     if (errCode == E_OK && journalMode != "wal") {
1202         LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1203         return -E_NOT_SUPPORT;
1204     }
1205     return errCode;
1206 }
1207 
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1208 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1209     const std::string &tableName)
1210 {
1211     std::vector<std::string> deviceTables;
1212     int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1213     if (errCode != E_OK) {
1214         LOGE("Get device table name for alter table failed. %d", errCode);
1215         return errCode;
1216     }
1217 
1218     LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1219     for (const auto &table : deviceTables) {
1220         std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1221         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1222         if (errCode != E_OK) {
1223             LOGE("Delete device data failed. %d", errCode);
1224             break;
1225         }
1226     }
1227     return errCode;
1228 }
1229 
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1230 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1231 {
1232     std::string deleteLogSql =
1233         "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName +
1234         "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1235     return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1236 }
1237 
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1238 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1239     const std::string &tableName)
1240 {
1241     std::string deleteLogSql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE device = ?";
1242     sqlite3_stmt *deleteLogStmt = nullptr;
1243     int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1244     if (errCode != E_OK) {
1245         LOGE("Get delete device data log statement failed. %d", errCode);
1246         return errCode;
1247     }
1248 
1249     errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1250     if (errCode != E_OK) {
1251         LOGE("Bind device to delete data log statement failed. %d", errCode);
1252         SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1253         return errCode;
1254     }
1255 
1256     errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1257     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1258         errCode = E_OK;
1259     } else {
1260         LOGE("Delete data log failed. %d", errCode);
1261     }
1262 
1263     SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1264     return errCode;
1265 }
1266 
DeleteDistributedLogTable(const std::string & tableName)1267 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1268 {
1269     if (tableName.empty()) {
1270         return -E_INVALID_ARGS;
1271     }
1272     std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1273     std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1274     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1275     if (errCode != E_OK) {
1276         LOGE("Delete distributed log table failed. %d", errCode);
1277     }
1278     return errCode;
1279 }
1280 
IsTableOnceDropped(const std::string & tableName,int execCode,bool & onceDropped)1281 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, int execCode,
1282     bool &onceDropped)
1283 {
1284     if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1285         onceDropped = true;
1286         return E_OK;
1287     } else if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1288         std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1289         Key key;
1290         DBCommon::StringToVector(keyStr, key);
1291         Value value;
1292 
1293         int errCode = GetKvData(key, value);
1294         if (errCode == E_OK) {
1295             // if user drop table first, then create again(but don't create distributed table), will reach this branch
1296             onceDropped = true;
1297             return E_OK;
1298         } else if (errCode == -E_NOT_FOUND) {
1299             onceDropped = false;
1300             return E_OK;
1301         } else {
1302             LOGE("[IsTableOnceDropped] query is table dropped failed, %d", errCode);
1303             return errCode;
1304         }
1305     } else {
1306         return execCode;
1307     }
1308 }
1309 
CleanResourceForDroppedTable(const std::string & tableName)1310 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1311 {
1312     int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1313     if (errCode != E_OK) {
1314         LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1315         return errCode;
1316     }
1317     errCode = DeleteDistributedLogTable(tableName);
1318     if (errCode != E_OK) {
1319         LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1320         return errCode;
1321     }
1322     errCode = DeleteTableTrigger(tableName);
1323     if (errCode != E_OK) {
1324         LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1325     }
1326     return errCode;
1327 }
1328 
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1329 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1330     std::vector<std::string> &missingTables)
1331 {
1332     if (tableNames.empty()) {
1333         return E_OK;
1334     }
1335     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1336     sqlite3_stmt *stmt = nullptr;
1337     int ret = E_OK;
1338     int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1339     if (errCode != E_OK) {
1340         SQLiteUtils::ResetStatement(stmt, true, ret);
1341         return errCode;
1342     }
1343     for (const auto &tableName : tableNames) {
1344         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1345         if (errCode != E_OK) {
1346             LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1347             break;
1348         }
1349 
1350         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1351         bool onceDropped = false;
1352         errCode = IsTableOnceDropped(tableName, errCode, onceDropped);
1353         if (errCode != E_OK) {
1354             LOGE("query is table once dropped failed. %d", errCode);
1355             break;
1356         }
1357         SQLiteUtils::ResetStatement(stmt, false, ret);
1358         if (onceDropped) { // The table in schema was once dropped
1359             errCode = CleanResourceForDroppedTable(tableName);
1360             if (errCode != E_OK) {
1361                 break;
1362             }
1363             missingTables.emplace_back(tableName);
1364         }
1365     }
1366     SQLiteUtils::ResetStatement(stmt, true, ret);
1367     return CheckCorruptedStatus(errCode);
1368 }
1369 
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1370 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1371     const TableInfo &baseTbl, const StoreInfo &info)
1372 {
1373     if (dbHandle_ == nullptr) {
1374         return -E_INVALID_DB;
1375     }
1376 
1377     if (device.empty() || !baseTbl.IsValid()) {
1378         return -E_INVALID_ARGS;
1379     }
1380 
1381     std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1382     int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1383     if (errCode != E_OK) {
1384         LOGE("Create device table failed. %d", errCode);
1385         return errCode;
1386     }
1387 
1388     errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1389     if (errCode != E_OK) {
1390         LOGE("Copy index to device table failed. %d", errCode);
1391     }
1392     return errCode;
1393 }
1394 
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1395 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1396     const std::string &schemaVersion)
1397 {
1398     if (dbHandle_ == nullptr) {
1399         return -E_INVALID_DB;
1400     }
1401 
1402     TableInfo newTable;
1403     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1404     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1405         LOGE("Check new schema failed. %d", errCode);
1406         return errCode;
1407     } else {
1408         errCode = table.CompareWithTable(newTable, schemaVersion);
1409         if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1410             LOGE("Check schema failed, schema was changed. %d", errCode);
1411             return -E_DISTRIBUTED_SCHEMA_CHANGED;
1412         } else {
1413             errCode = E_OK;
1414         }
1415     }
1416 
1417     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1418     if (errCode != E_OK) {
1419         LOGE("Get query helper for check query failed. %d", errCode);
1420         return errCode;
1421     }
1422 
1423     if (!query.IsQueryForRelationalDB()) {
1424         LOGE("Not support for this query type.");
1425         return -E_NOT_SUPPORT;
1426     }
1427 
1428     SyncTimeRange defaultTimeRange;
1429     sqlite3_stmt *stmt = nullptr;
1430     errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1431         stmt);
1432     if (errCode != E_OK) {
1433         LOGE("Get query statement for check query failed. %d", errCode);
1434     }
1435 
1436     SQLiteUtils::ResetStatement(stmt, true, errCode);
1437     return errCode;
1438 }
1439 
CheckQueryObjectLegal(const QuerySyncObject & query)1440 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1441 {
1442     if (dbHandle_ == nullptr) {
1443         return -E_INVALID_DB;
1444     }
1445     TableInfo newTable;
1446     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1447     if (errCode != E_OK) {
1448         LOGE("Check new schema failed. %d", errCode);
1449     }
1450     return errCode;
1451 }
1452 
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1453 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1454     Timestamp &maxTimestamp) const
1455 {
1456     maxTimestamp = 0;
1457     for (const auto &tableName : tableNames) {
1458         const std::string sql = "SELECT MAX(timestamp) FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log;";
1459         sqlite3_stmt *stmt = nullptr;
1460         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1461         if (errCode != E_OK) {
1462             return errCode;
1463         }
1464         errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1465         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1466             maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1467             errCode = E_OK;
1468         }
1469         SQLiteUtils::ResetStatement(stmt, true, errCode);
1470         if (errCode != E_OK) {
1471             maxTimestamp = 0;
1472             return errCode;
1473         }
1474     }
1475     return E_OK;
1476 }
1477 
SetLogTriggerStatus(bool status)1478 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1479 {
1480     const std::string key = "log_trigger_switch";
1481     std::string val = status ? "true" : "false";
1482     std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
1483         " VALUES ('" + key + "', '" + val + "')";
1484     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1485     if (errCode != E_OK) {
1486         LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1487     }
1488     return errCode;
1489 }
1490 
1491 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1492 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1493     std::vector<RelationalRowData *> &data)
1494 {
1495     size_t totalLength = 0;
1496     do {
1497         int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1498         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1499             return E_OK;
1500         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1501             LOGE("Get data by bind sql failed:%d", errCode);
1502             return errCode;
1503         }
1504 
1505         if (colNames.empty()) {
1506             SQLiteUtils::GetSelectCols(stmt, colNames);  // Get column names.
1507         }
1508         auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1509         if (relaRowData == nullptr) {
1510             LOGE("ExecuteQueryBySqlStmt OOM");
1511             return -E_OUT_OF_MEMORY;
1512         }
1513 
1514         auto dataSz = relaRowData->CalcLength();
1515         if (dataSz == 0) {  // invalid data
1516             delete relaRowData;
1517             relaRowData = nullptr;
1518             continue;
1519         }
1520 
1521         totalLength += static_cast<size_t>(dataSz);
1522         if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) {  // the set has been full
1523             delete relaRowData;
1524             relaRowData = nullptr;
1525             LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1526             return -E_REMOTE_OVER_SIZE;
1527         }
1528         data.push_back(relaRowData);
1529     } while (true);
1530     return E_OK;
1531 }
1532 }
1533 
1534 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1535 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1536     const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1537     std::vector<RelationalRowData *> &data)
1538 {
1539     int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1540     if (errCode != E_OK) {
1541         return errCode;
1542     }
1543 
1544     sqlite3_stmt *stmt = nullptr;
1545     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1546     if (errCode != E_OK) {
1547         (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1548         return errCode;
1549     }
1550     ResFinalizer finalizer([this, &stmt, &errCode] {
1551         (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1552         SQLiteUtils::ResetStatement(stmt, true, errCode);
1553     });
1554     for (size_t i = 0; i < bindArgs.size(); ++i) {
1555         errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1556         if (errCode != E_OK) {
1557             return errCode;
1558         }
1559     }
1560     return GetRowDatas(stmt, isMemDb_, colNames, data);
1561 }
1562 
CheckEncryptedOrCorrupted() const1563 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1564 {
1565     if (dbHandle_ == nullptr) {
1566         return -E_INVALID_DB;
1567     }
1568 
1569     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1570     if (errCode != E_OK) {
1571         LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1572     }
1573     return errCode;
1574 }
1575 
GetExistsDeviceList(std::set<std::string> & devices) const1576 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1577 {
1578     return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1579         isMemDb_, devices);
1580 }
1581 
GetUploadCountInner(const Timestamp & timestamp,SqliteQueryHelper & helper,std::string & sql,int64_t & count)1582 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCountInner(const Timestamp &timestamp,
1583     SqliteQueryHelper &helper, std::string &sql, int64_t &count)
1584 {
1585     sqlite3_stmt *stmt = nullptr;
1586     int errCode = helper.GetCloudQueryStatement(false, dbHandle_, timestamp, sql, stmt);
1587     if (errCode != E_OK) {
1588         LOGE("failed to get count statement %d", errCode);
1589         return errCode;
1590     }
1591     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1592     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1593         count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1594         errCode = E_OK;
1595     } else {
1596         LOGE("Failed to get the count to be uploaded. %d", errCode);
1597     }
1598     SQLiteUtils::ResetStatement(stmt, true, errCode);
1599     return errCode;
1600 }
1601 
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1602 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp &timestamp, bool isCloudForcePush,
1603     bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1604 {
1605     int errCode;
1606     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1607     if (errCode != E_OK) {
1608         return errCode;
1609     }
1610     std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask,
1611         CloudWaterType::DELETE);
1612     return GetUploadCountInner(timestamp, helper, sql, count);
1613 }
1614 
GetAllUploadCount(const std::vector<Timestamp> & timestampVec,bool isCloudForcePush,bool isCompensatedTask,QuerySyncObject & query,int64_t & count)1615 int SQLiteSingleVerRelationalStorageExecutor::GetAllUploadCount(const std::vector<Timestamp> &timestampVec,
1616     bool isCloudForcePush, bool isCompensatedTask, QuerySyncObject &query, int64_t &count)
1617 {
1618     std::vector<CloudWaterType> typeVec = DBCommon::GetWaterTypeVec();
1619     if (timestampVec.size() != typeVec.size()) {
1620         return -E_INVALID_ARGS;
1621     }
1622     int errCode;
1623     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1624     if (errCode != E_OK) {
1625         return errCode;
1626     }
1627     count = 0;
1628     for (size_t i = 0; i < typeVec.size(); i++) {
1629         std::string sql = helper.GetCountRelationalCloudQuerySql(isCloudForcePush, isCompensatedTask, typeVec[i]);
1630         int64_t tempCount = 0;
1631         helper.AppendCloudQueryToGetDiffData(sql, typeVec[i]);
1632         errCode = GetUploadCountInner(timestampVec[i], helper, sql, tempCount);
1633         if (errCode != E_OK) {
1634             return errCode;
1635         }
1636         count += tempCount;
1637     }
1638     return E_OK;
1639 }
1640 
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1641 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1642     bool ignoreEmptyGid)
1643 {
1644     if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1645         cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1646         return -E_INVALID_ARGS;
1647     }
1648     std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1649         + "' SET cloud_gid = ? WHERE data_key = ? ";
1650     sqlite3_stmt *stmt = nullptr;
1651     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1652     if (errCode != E_OK) {
1653         return errCode;
1654     }
1655     errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1656     int resetCode = E_OK;
1657     SQLiteUtils::ResetStatement(stmt, true, resetCode);
1658     return errCode == E_OK ? resetCode : errCode;
1659 }
1660 
GetSyncCloudData(const CloudUploadRecorder & uploadRecorder,CloudSyncData & cloudDataResult,SQLiteSingleVerRelationalContinueToken & token)1661 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(const CloudUploadRecorder &uploadRecorder,
1662     CloudSyncData &cloudDataResult, SQLiteSingleVerRelationalContinueToken &token)
1663 {
1664     token.GetCloudTableSchema(tableSchema_);
1665     sqlite3_stmt *queryStmt = nullptr;
1666     bool isStepNext = false;
1667     int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult, queryStmt, isStepNext);
1668     if (errCode != E_OK) {
1669         (void)token.ReleaseCloudStatement();
1670         return errCode;
1671     }
1672     uint32_t totalSize = 0;
1673     uint32_t stepNum = -1;
1674     do {
1675         if (isStepNext) {
1676             errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1677             if (errCode != E_OK) {
1678                 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1679                 break;
1680             }
1681         }
1682         isStepNext = true;
1683         errCode = GetCloudDataForSync(uploadRecorder, queryStmt, cloudDataResult, ++stepNum, totalSize);
1684     } while (errCode == E_OK);
1685     if (errCode != -E_UNFINISHED) {
1686         (void)token.ReleaseCloudStatement();
1687     }
1688     return errCode;
1689 }
1690 
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1691 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1692     const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1693     bool isCompensatedTask, std::vector<std::string> &cloudGid)
1694 {
1695     sqlite3_stmt *queryStmt = nullptr;
1696     int errCode = E_OK;
1697     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1698     if (errCode != E_OK) {
1699         return errCode;
1700     }
1701     std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1702         isCompensatedTask);
1703     errCode = helper.GetCloudQueryStatement(false, dbHandle_, syncTimeRange.beginTime, sql, queryStmt);
1704     if (errCode != E_OK) {
1705         return errCode;
1706     }
1707     do {
1708         errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1709         if (errCode != E_OK) {
1710             errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1711             break;
1712         }
1713         GetCloudGid(queryStmt, cloudGid);
1714     } while (errCode == E_OK);
1715     int resetStatementErrCode = E_OK;
1716     SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1717     queryStmt = nullptr;
1718     return (errCode == E_OK ? resetStatementErrCode : errCode);
1719 }
1720 
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1721 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1722     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1723 {
1724     VBucket log;
1725     VBucket extraLog;
1726     uint32_t preSize = totalSize;
1727     GetCloudLog(statement, log, totalSize);
1728     GetCloudExtraLog(statement, extraLog);
1729 
1730     VBucket data;
1731     int64_t flag = 0;
1732     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1733     if (errCode != E_OK) {
1734         return errCode;
1735     }
1736     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1737         for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1738             Type cloudValue;
1739             errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1740                 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1741             if (errCode != E_OK) {
1742                 return errCode;
1743             }
1744             SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1745             errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1746             if (errCode != E_OK) {
1747                 return errCode;
1748             }
1749         }
1750     }
1751 
1752     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1753         errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1754     } else {
1755         errCode = -E_UNFINISHED;
1756     }
1757     if (errCode == -E_IGNORE_DATA) {
1758         errCode = E_OK;
1759         totalSize = preSize;
1760         stepNum--;
1761     }
1762     return errCode;
1763 }
1764 
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1765 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1766 {
1767     if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1768         Asset asset;
1769         int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1770         if (errCode != E_OK) {
1771             return errCode;
1772         }
1773         if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1774             return -E_CLOUD_INVALID_ASSET;
1775         }
1776         vBucket.insert_or_assign(field.colName, asset);
1777     } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1778         Assets assets;
1779         int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1780         if (errCode != E_OK) {
1781             return errCode;
1782         }
1783         if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1784             return -E_CLOUD_ERROR;
1785         }
1786         if (!CloudStorageUtils::CheckAssetStatus(assets)) {
1787             return -E_CLOUD_INVALID_ASSET;
1788         }
1789         vBucket.insert_or_assign(field.colName, assets);
1790     } else {
1791         vBucket.insert_or_assign(field.colName, cloudValue);
1792     }
1793     return E_OK;
1794 }
1795 
SetLocalSchema(const RelationalSchemaObject & localSchema)1796 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1797 {
1798     localSchema_ = localSchema;
1799 }
1800 
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1801 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1802 {
1803     std::string setFlag;
1804     if (mode == FLAG_ONLY && isLogicDelete_) {
1805         setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1806     } else {
1807         setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1808     }
1809     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1810         ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1811         SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1812         CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1813     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1814     if (errCode != E_OK) {
1815         LOGE("clean cloud log failed, %d", errCode);
1816         return errCode;
1817     }
1818     cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1819     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1820     if (errCode != E_OK) {
1821         LOGE("delete cloud log failed, %d", errCode);
1822         return errCode;
1823     }
1824     // set all flag logout and data upload is not finished.
1825     cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1826     if (mode == FLAG_ONLY) {
1827         cleanLogSql += " = flag | 0x800 & ~0x400;";
1828     } else {
1829         cleanLogSql += " = flag & ~0x400;";
1830     }
1831     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1832 }
1833 
CleanUploadFinishedFlag(const std::string & tableName)1834 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1835 {
1836     // unset upload finished flag
1837     std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1838         CloudDbConstant::FLAG + " = flag & ~0x400;";
1839     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1840 }
1841 
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1842 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1843     const std::string &logTableName, const RelationalSchemaObject &localSchema)
1844 {
1845     std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1846         " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1847         ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1848         "));";
1849     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1850     if (errCode != E_OK) {
1851         LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1852         return errCode;
1853     }
1854     std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1855         FLAG_IS_CLOUD_CONSISTENCY + ";";
1856     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1857     if (errCode != E_OK) {
1858         LOGE("Failed to delete cloud data on log table, %d.", errCode);
1859         return errCode;
1860     }
1861     errCode = DoCleanAssetId(tableName, localSchema);
1862     if (errCode != E_OK) {
1863         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1864         return errCode;
1865     }
1866     errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1867     if (errCode != E_OK) {
1868         LOGE("Failed to clean gid on log table, %d.", errCode);
1869     }
1870     return errCode;
1871 }
1872 
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1873 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1874 {
1875     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1876         SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1877         CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1878     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1879     if (errCode != E_OK) {
1880         LOGE("change cloud log flag failed, %d", errCode);
1881     }
1882     return errCode;
1883 }
1884 
SetDataOnUserTablWithLogicDelete(const std::string & tableName,const std::string & logTableName)1885 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTablWithLogicDelete(const std::string &tableName,
1886     const std::string &logTableName)
1887 {
1888     UpdateCursorContext context;
1889     context.cursor = GetCursor(tableName);
1890     LOGI("removeData start and cursor is, %d.", context.cursor);
1891     int errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1892     std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_LOGIC_DELETE +
1893                       ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1894                       SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1895                       " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND " + FLAG_IS_CLOUD_CONSISTENCY + ");";
1896     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1897     if (errCode != E_OK) {
1898         LOGE("Failed to change cloud data flag on usertable, %d.", errCode);
1899         CreateFuncUpdateCursor(context, nullptr);
1900         return errCode;
1901     }
1902     // deal when data is logicDelete
1903     sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1904           " = '', " + SHARING_RESOURCE + " = '' WHERE " + FLAG_IS_LOGIC_DELETE + ";";
1905     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1906     if (errCode != E_OK) {
1907         LOGE("Failed to deal logic delete data flag on usertable, %d.", errCode);
1908         CreateFuncUpdateCursor(context, nullptr);
1909         return errCode;
1910     }
1911     LOGI("removeData finish and cursor is %d.", context.cursor);
1912     errCode = SetCursor(tableName, context.cursor);
1913     if (errCode != E_OK) {
1914         CreateFuncUpdateCursor(context, nullptr);
1915         LOGE("set new cursor after removeData error %d.", errCode);
1916         return errCode;
1917     }
1918     errCode = CreateFuncUpdateCursor(context, nullptr);
1919     if (errCode != E_OK) {
1920         LOGE("Clear FuncUpdateCursor error %d.", errCode);
1921     }
1922     return ChangeCloudDataFlagOnLogTable(logTableName);
1923 }
1924 
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1925 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1926     std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1927 {
1928     sqlite3_stmt *selectStmt = nullptr;
1929     std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1930         " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1931     if (distinguishCloudFlag) {
1932         sql += " AND (";
1933         sql += FLAG_IS_CLOUD;
1934         sql += " OR ";
1935         sql +=  FLAG_IS_CLOUD_CONSISTENCY;
1936         sql += " ) ";
1937     }
1938     sql += ";";
1939     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1940     if (errCode != E_OK) {
1941         LOGE("Get select data_key statement failed, %d", errCode);
1942         return errCode;
1943     }
1944     do {
1945         errCode = SQLiteUtils::StepWithRetry(selectStmt);
1946         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1947             dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1948         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1949             LOGE("SQLite step failed when query log's data_key : %d", errCode);
1950             break;
1951         }
1952     } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1953     SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1954     return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1955 }
1956 
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1957 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1958     const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1959 {
1960     std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1961     if (opType == OpType::ONLY_UPDATE_GID) {
1962         updateLogSql += "cloud_gid = ?";
1963         updateColName.push_back(CloudDbConstant::GID_FIELD);
1964         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1965     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1966         updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1967         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1968     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1969         updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1970         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1971     }  else if (opType == OpType::UPDATE_TIMESTAMP) {
1972         updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1973             ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1974         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1975     } else if (opType == OpType::CLEAR_GID) {
1976         updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1977             std::to_string(SET_FLAG_ZERO_MASK);
1978     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1979         updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1980         updateColName.push_back(CloudDbConstant::GID_FIELD);
1981     } else {
1982         updateLogSql += " device = 'cloud', timestamp = ?,";
1983         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1984         if (opType == OpType::DELETE) {
1985             updateLogSql += GetCloudDeleteSql(DBCommon::GetLogTableName(tableSchema.name));
1986         } else {
1987             updateLogSql += GetUpdateDataFlagSql() + ", cloud_gid = ?";
1988             updateColName.push_back(CloudDbConstant::GID_FIELD);
1989             CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1990         }
1991     }
1992 
1993     int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1994     if (errCode != E_OK) {
1995         return errCode;
1996     }
1997 
1998     errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1999     if (errCode != E_OK) {
2000         LOGE("Get update log statement failed when update cloud data, %d", errCode);
2001     }
2002     return errCode;
2003 }
2004 } // namespace DistributedDB
2005 #endif
2006