• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33 #include "device_tracker_log_table_manager.h"
34 
35 namespace DistributedDB {
36 namespace {
37 static constexpr const char *DATAKEY = "DATA_KEY";
38 static constexpr const char *DEVICE_FIELD = "DEVICE";
39 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
40 static constexpr const char *VERSION = "VERSION";
41 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
42 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
43 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
44 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
45 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
46     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
47 // clean 5th bit of flag to one which is wait compensated sync
48 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
49     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
50 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
51 // set data logic delete, exist passport, delete, not compensated, cloud and consistency
52 static constexpr const char *SET_FLAG_WHEN_LOGOUT = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12) & (~0x20)";
53 // set data logic delete, exist passport, delete, not compensated and cloud
54 static constexpr const char *SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12)";
55 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
56 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
57 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
58 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
59 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
60 static constexpr const int DATA_KEY_INDEX = 0;
61 static constexpr const int TIMESTAMP_INDEX = 3;
62 static constexpr const int W_TIMESTAMP_INDEX = 4;
63 static constexpr const int FLAG_INDEX = 5;
64 static constexpr const int HASH_KEY_INDEX = 6;
65 static constexpr const int CLOUD_GID_INDEX = 7;
66 static constexpr const int VERSION_INDEX = 8;
67 static constexpr const int STATUS_INDEX = 9;
68 
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)69 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
70 {
71     if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
72         return SQLITE_DENY;
73     }
74     if (b == SQLITE_FUNCTION) {
75         if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
76             LOGE("Deny fts3_tokenizer in remote query");
77             return SQLITE_DENY;
78         }
79     }
80     return SQLITE_OK;
81 }
82 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)83 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
84     DistributedTableMode mode)
85     : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
86       assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
87       maxUploadCount_(0), maxUploadSize_(0)
88 {
89     bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
90     bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
91     bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
92     bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
93     bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
94     bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
95     bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
96 }
97 
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)98 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
99 {
100     std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
101     if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
102         LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
103         return -E_NOT_SUPPORT;
104     }
105     std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
106     for (const auto &field : fieldInfos) {
107         if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
108             LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
109             return -E_NOT_SUPPORT;
110         }
111     }
112 
113     if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
114         if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
115             LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
116             return -E_NOT_SUPPORT;
117         }
118 
119         if (mode == DistributedTableMode::COLLABORATION) {
120             if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
121                 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
122                 return -E_NOT_SUPPORT;
123             }
124         }
125 
126         if (syncType == CLOUD_COOPERATION) {
127             int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
128             if (errCode != E_OK) {
129                 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
130                 return errCode;
131             }
132         }
133     }
134 
135     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
136         if (table.GetPrimaryKey().size() > 1) {
137             LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
138             return -E_NOT_SUPPORT;
139         }
140     }
141 
142     return E_OK;
143 }
144 
145 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)146 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
147 {
148     std::string sql = "SELECT get_sys_time(0) - max(" + std::string(DBConstant::SQLITE_INNER_ROWID) + ") - 1 FROM '" +
149         tableName + "';";
150     sqlite3_stmt *stmt = nullptr;
151     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
152     if (errCode != E_OK) {
153         return errCode;
154     }
155     errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
156     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
157         timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
158         errCode = E_OK;
159     }
160     SQLiteUtils::ResetStatement(stmt, true, errCode);
161     return errCode;
162 }
163 
GetMetaLocalTimeOffset(sqlite3 * db,int64_t & timeOffset)164 int GetMetaLocalTimeOffset(sqlite3 *db, int64_t &timeOffset)
165 {
166     std::string sql = "SELECT value FROM " + DBCommon::GetMetaTableName() + " WHERE key=x'" +
167         DBCommon::TransferStringToHex(std::string(DBConstant::LOCALTIME_OFFSET_KEY)) + "';";
168     sqlite3_stmt *stmt = nullptr;
169     int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
170     if (errCode != E_OK) {
171         return errCode;
172     }
173     errCode = SQLiteUtils::StepWithRetry(stmt);
174     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
175         timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
176         if (timeOffset < 0) {
177             LOGE("TimeOffset %" PRId64 "is invalid.", timeOffset);
178             SQLiteUtils::ResetStatement(stmt, true, errCode);
179             return -E_INTERNAL_ERROR;
180         }
181         errCode = E_OK;
182     }
183     SQLiteUtils::ResetStatement(stmt, true, errCode);
184     return errCode;
185 }
186 }
187 
GetExtendValue(const TrackerTable & trackerTable)188 std::string GetExtendValue(const TrackerTable &trackerTable)
189 {
190     std::string extendValue;
191     const std::set<std::string> &extendNames = trackerTable.GetExtendNames();
192     if (!extendNames.empty()) {
193         extendValue += "json_object(";
194         for (const auto &extendName : extendNames) {
195             extendValue += "'" + extendName + "'," + extendName + ",";
196         }
197         extendValue.pop_back();
198         extendValue += ")";
199     } else {
200         extendValue = "''";
201     }
202     return extendValue;
203 }
204 
GeneLogInfoForExistedDataInner(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)205 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedDataInner(sqlite3 *db, const std::string &identity,
206     const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
207 {
208     std::string tableName = tableInfo.GetTableName();
209     int64_t timeOffset = 0;
210     int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
211     if (errCode != E_OK) {
212         return errCode;
213     }
214     errCode = SetLogTriggerStatus(false);
215     if (errCode != E_OK) {
216         return errCode;
217     }
218     std::string timeOffsetStr = std::to_string(timeOffset);
219     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
220     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
221     std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
222         static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
223     TrackerTable trackerTable = tableInfo.GetTrackerTable();
224     trackerTable.SetTableName(tableName);
225     std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
226     std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid +
227         ", '', '', " + timeOffsetStr + " + " + rowid + ", " +
228         timeOffsetStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash + ", '', ";
229     sql += GetExtendValue(tableInfo.GetTrackerTable());
230     sql += ", 0, '', '', 0 FROM '" + tableName + "' AS a ";
231     if (isTrackerTable) {
232         sql += " WHERE 1 = 1;";
233     } else {
234         sql += "WHERE NOT EXISTS (SELECT 1 FROM " + logTable + " WHERE data_key = a._rowid_);";
235     }
236     errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
237         int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
238         if (ret != E_OK) {
239             LOGE("Failed to initialize cloud type log data.%d", ret);
240         }
241         return ret;
242     });
243     return errCode;
244 }
245 
GeneLogInfoForExistedData(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)246 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &identity,
247     const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
248 {
249     if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
250         return UpdateTrackerTable(db, identity, tableInfo, logMgrPtr, false);
251     }
252     return GeneLogInfoForExistedDataInner(db, identity, tableInfo, logMgrPtr, isTrackerTable);
253 }
254 
ResetLogStatus(std::string & tableName)255 int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(std::string &tableName)
256 {
257     int errCode = SetLogTriggerStatus(false);
258     if (errCode != E_OK) {
259         LOGE("Fail to set log trigger on when reset log status, %d", errCode);
260         return errCode;
261     }
262     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
263     std::string sql = "UPDATE " + logTable + " SET status = 0;";
264     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
265     if (errCode != E_OK) {
266         LOGE("Failed to initialize cloud type log data.%d", errCode);
267     }
268     return errCode;
269 }
270 
UpdateTrackerTable(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTimestampOnly)271 int SQLiteSingleVerRelationalStorageExecutor::UpdateTrackerTable(sqlite3 *db, const std::string &identity,
272     const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTimestampOnly)
273 {
274     int64_t errCode = SetLogTriggerStatus(false);
275     if (errCode != E_OK) {
276         return errCode;
277     }
278     int64_t localTimeOffset = 0;
279     errCode = GetMetaLocalTimeOffset(db, localTimeOffset);
280     if (errCode != E_OK) {
281         LOGE("Failed to get local timeOffset.%d", errCode);
282         return errCode;
283     }
284     std::string tableName = tableInfo.GetTableName();
285     std::string logTable = DBCommon::GetLogTableName(tableName);
286     Timestamp currentSysTime = TimeHelper::GetSysCurrentTime();
287     Timestamp currentLocalTime = currentSysTime + static_cast<uint64_t>(localTimeOffset);
288     std::string currentLocalTimeStr = std::to_string(currentLocalTime);
289     if (isTimestampOnly) {
290         std::string sql = "update " + logTable + " set timestamp = " + currentLocalTimeStr +
291             " + data_key, wtimestamp = " + currentLocalTimeStr + " + data_key where data_key != -1;";
292         errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
293         if (errCode != E_OK) {
294             LOGE("Failed to initialize device type log data.%d", errCode);
295         }
296         return errCode;
297     }
298     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
299     std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
300         static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
301     std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
302     std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + currentLocalTimeStr +
303         " + " + rowid + ", " + currentLocalTimeStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash +
304         ", '', '', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
305     errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
306     if (errCode != E_OK) {
307         LOGE("Failed to initialize device type log data.%d", errCode);
308     }
309     return errCode;
310 }
311 
CreateRelationalLogTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)312 int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded,
313     const std::string &identity, TableInfo &table)
314 {
315     // create log table
316     std::unique_ptr<SqliteLogTableManager> tableManager;
317     if (!table.GetTrackerTable().IsEmpty() && table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
318         tableManager = std::make_unique<DeviceTrackerLogTableManager>();
319     } else {
320         tableManager = LogTableManagerFactory::GetTableManager(mode, table.GetTableSyncType());
321     }
322     if (tableManager == nullptr) {
323         LOGE("[CreateRelationalLogTable] get table manager failed");
324         return -E_INVALID_DB;
325     }
326     int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
327     if (errCode != E_OK) {
328         LOGE("[CreateDistributedTable] create log table failed");
329         return errCode;
330     }
331     std::string tableName = table.GetTableName();
332     if (!isUpgraded) {
333         if (table.GetTrackerTable().GetTableName().empty()) {
334             errCode = GeneLogInfoForExistedData(dbHandle_, identity, table, tableManager, false);
335         } else if (table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
336             // tracker table -> distributed device table
337             errCode = UpdateTrackerTable(dbHandle_, tableName, table, tableManager, true);
338         } else {
339             // tracker table -> distributed cloud table
340             errCode = ResetLogStatus(tableName);
341         }
342     } else {
343         if (table.GetTrackerTable().GetTableName().empty()) {
344             errCode = GeneLogInfoForExistedDataInner(dbHandle_, identity, table, tableManager, false);
345         }
346     }
347     if (errCode != E_OK) {
348         LOGE("[CreateDistributedTable] generate log isUpgraded %d failed %d.", static_cast<int>(isUpgraded), errCode);
349         return errCode;
350     }
351 
352     // add trigger
353     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
354     if (errCode != E_OK) {
355         LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
356         return errCode;
357     }
358     return SetLogTriggerStatus(true);
359 }
360 
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)361 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
362     const std::string &identity, TableInfo &table)
363 {
364     if (dbHandle_ == nullptr) {
365         return -E_INVALID_DB;
366     }
367 
368     const std::string tableName = table.GetTableName();
369     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
370     if (errCode != E_OK) {
371         LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
372         return errCode;
373     }
374 
375     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
376         bool isEmpty = false;
377         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
378         if (errCode != E_OK) {
379             LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
380             return -E_NOT_SUPPORT;
381         }
382         if (!isEmpty) {
383             LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
384                 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
385                 static_cast<int>(table.GetTableSyncType()));
386         }
387     }
388 
389     errCode = CheckTableConstraint(table, mode, table.GetTableSyncType());
390     if (errCode != E_OK) {
391         LOGE("[CreateDistributedTable] check table constraint failed.");
392         return errCode;
393     }
394 
395     return CreateRelationalLogTable(mode, isUpgraded, identity, table);
396 }
397 
CompareSchemaTableColumns(const std::string & tableName)398 int SQLiteSingleVerRelationalStorageExecutor::CompareSchemaTableColumns(const std::string &tableName)
399 {
400     bool onceDropped = false;
401     int errCode = IsTableOnceDropped(tableName, onceDropped);
402     if (!onceDropped) {
403         // do not return error code to make sure main procedure will continue
404         return E_OK;
405     }
406     LOGI("[CompareSchemaTableColumns] table once dropped check schema. table name is %s length is %zu",
407         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
408     TableInfo newTableInfo;
409     errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
410     if (errCode != E_OK) {
411         LOGE("[CompareSchemaTableColumns] analysis table schema failed. %d", errCode);
412         return errCode;
413     }
414     // new table should has same or compatible upgrade
415     TableInfo tableInfo = localSchema_.GetTable(tableName);
416     errCode = tableInfo.CompareWithTable(newTableInfo, localSchema_.GetSchemaVersion());
417     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
418         LOGE("[CompareSchemaTableColumns] Not support with incompatible table.");
419         return -E_SCHEMA_MISMATCH;
420     }
421     return errCode;
422 }
423 
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)424 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
425     DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
426 {
427     if (dbHandle_ == nullptr) {
428         return -E_INVALID_DB;
429     }
430     TableInfo newTableInfo;
431     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
432     if (errCode != E_OK) {
433         LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
434         return errCode;
435     }
436 
437     if (CheckTableConstraint(newTableInfo, mode, syncType)) {
438         LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
439         return -E_NOT_SUPPORT;
440     }
441 
442     // new table should has same or compatible upgrade
443     TableInfo tableInfo = schema.GetTable(tableName);
444     errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
445     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
446         LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
447         return -E_SCHEMA_MISMATCH;
448     } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
449         LOGD("[UpgradeDistributedTable] schema has not changed.");
450         // update table if tableName changed
451         schema.RemoveRelationalTable(tableName);
452         tableInfo.SetTableName(tableName);
453         schema.AddRelationalTable(tableInfo);
454         return E_OK;
455     }
456 
457     schemaChanged = true;
458     errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
459     if (errCode != E_OK) {
460         LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
461     }
462 
463     schema.AddRelationalTable(newTableInfo);
464     return errCode;
465 }
466 
467 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)468 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
469     std::vector<std::string> &deviceTables)
470 {
471     if (device.empty() && tableName.empty()) { // device and table name should not both be empty
472         return -E_INVALID_ARGS;
473     }
474     std::string devicePattern = device.empty() ? "%" : device;
475     std::string tablePattern = tableName.empty() ? "%" : tableName;
476     std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
477 
478     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
479         deviceTableName + "';";
480     sqlite3_stmt *stmt = nullptr;
481     int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
482     if (errCode != E_OK) {
483         return errCode;
484     }
485 
486     do {
487         errCode = SQLiteUtils::StepWithRetry(stmt, false);
488         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
489             errCode = E_OK;
490             break;
491         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
492             LOGE("Get table name failed. %d", errCode);
493             break;
494         }
495         std::string realTableName;
496         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
497         if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
498             continue;
499         }
500         if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
501             continue;
502         }
503         deviceTables.emplace_back(realTableName);
504     } while (true);
505 
506     SQLiteUtils::ResetStatement(stmt, true, errCode);
507     return errCode;
508 }
509 
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)510 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
511 {
512     std::vector<FieldInfo> fields;
513     auto itOld = oldTableInfo.GetFields().begin();
514     auto itNew = newTableInfo.GetFields().begin();
515     for (; itNew != newTableInfo.GetFields().end(); itNew++) {
516         if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
517             fields.emplace_back(itNew->second);
518             continue;
519         }
520         itOld++;
521     }
522     return fields;
523 }
524 
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)525 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
526 {
527     if (db == nullptr) {
528         return -E_INVALID_ARGS;
529     }
530 
531     std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
532         return a.GetColumnId()< b.GetColumnId();
533     });
534     int errCode = E_OK;
535     for (const auto &table : tables) {
536         for (const auto &field : fields) {
537             std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
538             alterSql += "'" + field.GetDataType() + "'";
539             alterSql += field.IsNotNull() ? " NOT NULL" : "";
540             alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
541             alterSql += ";";
542             errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
543             if (errCode != E_OK) {
544                 LOGE("Alter table failed. %d", errCode);
545                 break;
546             }
547         }
548     }
549     return errCode;
550 }
551 
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)552 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
553 {
554     IndexInfoMap indexes;
555     auto itOld = oldTableInfo.GetIndexDefine().begin();
556     auto itNew = newTableInfo.GetIndexDefine().begin();
557     auto itOldEnd = oldTableInfo.GetIndexDefine().end();
558     auto itNewEnd = newTableInfo.GetIndexDefine().end();
559 
560     while (itOld != itOldEnd && itNew != itNewEnd) {
561         if (itOld->first == itNew->first) {
562             if (itOld->second != itNew->second) {
563                 indexes.insert({itNew->first, itNew->second});
564             }
565             itOld++;
566             itNew++;
567         } else if (itOld->first < itNew->first) {
568             indexes.insert({itOld->first, {}});
569             itOld++;
570         } else {
571             indexes.insert({itNew->first, itNew->second});
572             itNew++;
573         }
574     }
575 
576     while (itOld != itOldEnd) {
577         indexes.insert({itOld->first, {}});
578         itOld++;
579     }
580 
581     while (itNew != itNewEnd) {
582         indexes.insert({itNew->first, itNew->second});
583         itNew++;
584     }
585 
586     return indexes;
587 }
588 
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)589 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
590 {
591     if (db == nullptr) {
592         return -E_INVALID_ARGS;
593     }
594 
595     int errCode = E_OK;
596     for (const auto &table : tables) {
597         for (const auto &index : indexes) {
598             if (index.first.empty()) {
599                 continue;
600             }
601             std::string realIndexName = table + "_" + index.first;
602             std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
603             errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
604             if (errCode != E_OK) {
605                 LOGE("Drop index failed. %d", errCode);
606                 return errCode;
607             }
608 
609             if (index.second.empty()) { // empty means drop index only
610                 continue;
611             }
612 
613             auto it = index.second.begin();
614             std::string indexDefine = *it++;
615             while (it != index.second.end()) {
616                 indexDefine += ", " + *it++;
617             }
618             std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
619                 "(" + indexDefine + ");";
620             errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
621             if (errCode != E_OK) {
622                 LOGE("Create index failed. %d", errCode);
623                 break;
624             }
625         }
626     }
627     return errCode;
628 }
629 }
630 
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)631 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
632     const TableInfo &newTableInfo)
633 {
634     std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
635     IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
636     std::vector<std::string> deviceTables;
637     int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
638     if (errCode != E_OK) {
639         LOGE("Get device table name for alter table failed. %d", errCode);
640         return errCode;
641     }
642 
643     LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
644         upgradeIndexes.size(), deviceTables.size());
645     errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
646     if (errCode != E_OK) {
647         LOGE("upgrade fields failed. %d", errCode);
648         return errCode;
649     }
650 
651     errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
652     if (errCode != E_OK) {
653         LOGE("upgrade indexes failed. %d", errCode);
654     }
655 
656     return errCode;
657 }
658 
StartTransaction(TransactType type)659 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
660 {
661     if (dbHandle_ == nullptr) {
662         LOGE("Begin transaction failed, dbHandle is null.");
663         return -E_INVALID_DB;
664     }
665     int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
666     if (errCode != E_OK) {
667         LOGE("Begin transaction failed, errCode = %d", errCode);
668     }
669     return errCode;
670 }
671 
Commit()672 int SQLiteSingleVerRelationalStorageExecutor::Commit()
673 {
674     if (dbHandle_ == nullptr) {
675         return -E_INVALID_DB;
676     }
677 
678     return SQLiteUtils::CommitTransaction(dbHandle_);
679 }
680 
Rollback()681 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
682 {
683     if (dbHandle_ == nullptr) {
684         return -E_INVALID_DB;
685     }
686     int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
687     if (errCode != E_OK) {
688         LOGE("sqlite single ver relation storage executor rollback fail! errCode = [%d]", errCode);
689     }
690     return errCode;
691 }
692 
SetTableInfo(const TableInfo & tableInfo)693 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
694 {
695     table_ = tableInfo;
696 }
697 
698 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)699 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize,
700     int64_t &revisedTime, int64_t &invalidTime)
701 {
702     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
703     uint64_t curTime = 0;
704     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
705         if (modifyTime > static_cast<int64_t>(curTime)) {
706             invalidTime = modifyTime;
707             modifyTime = static_cast<int64_t>(curTime);
708             revisedTime = modifyTime;
709         }
710     } else {
711         LOGW("[Relational] get raw sys time failed.");
712     }
713     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
714     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
715         static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
716     totalSize += sizeof(int64_t) + sizeof(int64_t);
717     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
718         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
719             sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
720         if (!cloudGid.empty()) {
721             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
722             totalSize += cloudGid.size();
723         }
724     }
725     if (revisedTime != 0) {
726         std::string cloudGid;
727         if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
728             cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
729         }
730         LOGW("[Relational] Found invalid mod time: %lld, curTime: %lld, gid: %s", invalidTime, revisedTime,
731             DBCommon::StringMiddleMasking(cloudGid).c_str());
732     }
733     std::string version;
734     SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
735     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
736     totalSize += version.size();
737 }
738 
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)739 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
740 {
741     flags.insert_or_assign(DBConstant::ROWID,
742         static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
743     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
744         static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
745     flags.insert_or_assign(CloudDbConstant::FLAG,
746         static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
747     Bytes hashKey;
748     (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
749     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
750     flags.insert_or_assign(CloudDbConstant::STATUS,
751         static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
752 }
753 
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)754 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
755 {
756     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
757         return;
758     }
759     std::string gid = reinterpret_cast<const std::string::value_type *>(
760         sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
761     if (gid.empty()) {
762         LOGW("[Relational] Get cloud gid is null.");
763         return;
764     }
765     cloudGid.emplace_back(gid);
766 }
767 }
768 
GetDataItemSerialSize(DataItem & item,size_t appendLen)769 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
770 {
771     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
772     // the size would not be very large.
773     static const size_t maxOrigDevLength = 40;
774     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
775     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
776         Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
777     return dataSize;
778 }
779 
GetKvData(const Key & key,Value & value) const780 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
781 {
782     static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
783         "metadata WHERE key=?;";
784     sqlite3_stmt *statement = nullptr;
785     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
786     if (errCode != E_OK) {
787         goto END;
788     }
789 
790     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
791     if (errCode != E_OK) {
792         goto END;
793     }
794 
795     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
796     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
797         errCode = -E_NOT_FOUND;
798         goto END;
799     } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
800         goto END;
801     }
802 
803     errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
804     END:
805     SQLiteUtils::ResetStatement(statement, true, errCode);
806     return errCode;
807 }
808 
PutKvData(const Key & key,const Value & value) const809 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
810 {
811     static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) +
812         "metadata VALUES(?,?);";
813     sqlite3_stmt *statement = nullptr;
814     int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
815     if (errCode != E_OK) {
816         return errCode;
817     }
818 
819     errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false);  // 1 means key index
820     if (errCode != E_OK) {
821         LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
822         goto ERROR;
823     }
824 
825     errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true);  // 2 means value index
826     if (errCode != E_OK) {
827         LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
828         goto ERROR;
829     }
830     errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
831     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
832         errCode = E_OK;
833     }
834 ERROR:
835     SQLiteUtils::ResetStatement(statement, true, errCode);
836     return errCode;
837 }
838 
DeleteMetaData(const std::vector<Key> & keys) const839 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
840 {
841     static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
842         "metadata WHERE key=?;";
843     sqlite3_stmt *statement = nullptr;
844     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
845     if (errCode != E_OK) {
846         return errCode;
847     }
848 
849     for (const auto &key : keys) {
850         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
851         if (errCode != E_OK) {
852             break;
853         }
854 
855         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
856         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
857             break;
858         }
859         errCode = E_OK;
860         SQLiteUtils::ResetStatement(statement, false, errCode);
861     }
862     SQLiteUtils::ResetStatement(statement, true, errCode);
863     return CheckCorruptedStatus(errCode);
864 }
865 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const866 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
867 {
868     static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " +
869         std::string(DBConstant::RELATIONAL_PREFIX) + "metadata WHERE key>=? AND key<=?;";
870     sqlite3_stmt *statement = nullptr;
871     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
872     if (errCode != E_OK) {
873         return errCode;
874     }
875 
876     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
877     if (errCode == E_OK) {
878         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
879         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
880             errCode = E_OK;
881         }
882     }
883     SQLiteUtils::ResetStatement(statement, true, errCode);
884     return CheckCorruptedStatus(errCode);
885 }
886 
GetAllMetaKeys(std::vector<Key> & keys) const887 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
888 {
889     static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
890         "metadata;";
891     sqlite3_stmt *statement = nullptr;
892     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
893     if (errCode != E_OK) {
894         LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
895         return errCode;
896     }
897     errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
898     SQLiteUtils::ResetStatement(statement, true, errCode);
899     return errCode;
900 }
901 
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)902 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
903     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
904 {
905     if (stmt == nullptr) {
906         int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
907         if (errCode != E_OK) {
908             LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
909             return errCode;
910         }
911     }
912 
913     int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
914     if (errCode != E_OK) {
915         return errCode;
916     }
917     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
918     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
919         errCode = E_OK;
920     }
921     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
922     return errCode;
923 }
924 
SaveSyncDataItem(const DataItem & dataItem,bool isUpdate,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)925 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, bool isUpdate,
926     SaveSyncDataStmt &saveStmt, RelationalSyncDataInserter &inserter, int64_t &rowid)
927 {
928     auto &data = inserter.GetChangedData();
929     if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
930         std::vector<Type> primaryValues;
931         primaryValues.push_back(rowid);
932         data.primaryData[ChangeType::OP_DELETE].push_back(primaryValues);
933         rowid = -1;
934         return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
935     }
936     // we don't know the rowid if user drop device table
937     // SPLIT_BY_DEVICE use insert or replace to update data
938     // no pk table should delete by hash key(rowid) first
939     if ((mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().IsNoPkTable())) {
940         int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
941         if (errCode != E_OK) {
942             LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
943             return errCode;
944         }
945         std::vector<Type> primaryValues;
946         primaryValues.push_back(rowid);
947         data.primaryData[ChangeType::OP_DELETE].push_back(primaryValues);
948     }
949     std::map<std::string, Type> pkVals;
950     int errCode = inserter.SaveData(isUpdate, dataItem, saveStmt, pkVals);
951     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
952         if (!isUpdate) {
953             rowid = SQLiteUtils::GetLastRowId(dbHandle_);
954         }
955         std::vector<Type> primaryValues;
956         std::vector<std::string> primaryKeys;
957         if (inserter.GetLocalTable().IsMultiPkTable() || inserter.GetLocalTable().IsNoPkTable()) {
958             primaryKeys.push_back(DBConstant::ROWID);
959             primaryValues.push_back(rowid);
960         }
961         for (const auto &pkVal : pkVals) {
962             primaryKeys.push_back(pkVal.first);
963             primaryValues.push_back(pkVal.second);
964         }
965         ChangeType type = isUpdate ? ChangeType::OP_UPDATE : ChangeType::OP_INSERT;
966         data.primaryData[type].push_back(primaryValues);
967         data.field = primaryKeys;
968         errCode = E_OK;
969     }
970     return errCode;
971 }
972 
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)973 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
974     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
975 {
976     if (stmt == nullptr) {
977         int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
978         if (errCode != E_OK) {
979             LOGE("[DeleteSyncLog] Get statement fail!");
980             return errCode;
981         }
982     }
983 
984     int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
985     if (errCode != E_OK) {
986         return errCode;
987     }
988     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
989     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
990         errCode = E_OK;
991     }
992     SQLiteUtils::ResetStatement(stmt, false, errCode);  // Finalize outside.
993     return errCode;
994 }
995 
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)996 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
997     RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
998 {
999     int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
1000     if (errCode != E_OK) {
1001         return errCode;
1002     }
1003     return DeleteSyncLog(item, inserter, rmLogStmt);
1004 }
1005 
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated,bool & isExist,int64_t & rowId)1006 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
1007     sqlite3_stmt *queryStmt, bool &isDefeated, bool &isExist, int64_t &rowId)
1008 {
1009     LogInfo logInfoGet;
1010     int errCode = SQLiteRelationalUtils::GetLogInfoPre(queryStmt, mode_, dataItem, logInfoGet);
1011     int ret = E_OK;
1012     SQLiteUtils::ResetStatement(queryStmt, false, ret);
1013     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1014         LOGE("Failed to get raw data. %d", errCode);
1015         return errCode;
1016     }
1017     rowId = logInfoGet.dataKey;
1018     isExist = (errCode != -E_NOT_FOUND) && ((logInfoGet.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) == 0);
1019     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
1020         mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
1021         isDefeated = false; // no need to solve conflict except miss query data
1022         return E_OK;
1023     }
1024     if (!isExist || dataItem.dev != logInfoGet.device) {
1025         // defeated if item timestamp is earlier than raw data
1026         isDefeated = (dataItem.timestamp <= logInfoGet.timestamp);
1027     }
1028     return E_OK;
1029 }
1030 
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)1031 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
1032     SaveSyncDataStmt &saveStmt, DataItem &item)
1033 {
1034     bool isDefeated = false;
1035     bool isExist = false;
1036     int64_t rowid = -1;
1037     int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated, isExist, rowid);
1038     if (errCode != E_OK) {
1039         LOGE("check data conflict failed. %d", errCode);
1040         return errCode;
1041     }
1042 
1043     if (isDefeated) {
1044         LOGD("Data was defeated.");
1045         return E_OK;
1046     }
1047     if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
1048         return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
1049     }
1050     bool isUpdate = isExist && mode_ == DistributedTableMode::COLLABORATION;
1051     errCode = SaveSyncDataItem(item, isUpdate, saveStmt, inserter, rowid);
1052     if (errCode == E_OK || errCode == -E_NOT_FOUND) {
1053         errCode = inserter.SaveSyncLog(dbHandle_, saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
1054     }
1055     return errCode;
1056 }
1057 
SaveSyncDataItems(RelationalSyncDataInserter & inserter)1058 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
1059 {
1060     SaveSyncDataStmt saveStmt;
1061     int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
1062     if (errCode != E_OK) {
1063         LOGE("Prepare insert sync data statement failed.");
1064         return errCode;
1065     }
1066 
1067     errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
1068         if (item.neglect) { // Do not save this record if it is neglected
1069             return E_OK;
1070         }
1071         int errCode = SaveSyncDataItem(inserter, saveStmt, item);
1072         if (errCode != E_OK) {
1073             LOGE("save sync data item failed. err=%d", errCode);
1074             return errCode;
1075         }
1076         // Need not reset rmDataStmt and rmLogStmt here.
1077         return saveStmt.ResetStatements(false);
1078     });
1079 
1080     int ret = saveStmt.ResetStatements(true);
1081     return errCode != E_OK ? errCode : ret;
1082 }
1083 
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)1084 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
1085 {
1086     if (useTrans) {
1087         int errCode = StartTransaction(TransactType::IMMEDIATE);
1088         if (errCode != E_OK) {
1089             return errCode;
1090         }
1091     }
1092 
1093     int errCode = SaveSyncDataItems(inserter);
1094     if (errCode != E_OK) {
1095         LOGE("Save sync data items failed. errCode=%d", errCode);
1096         goto END;
1097     }
1098 END:
1099     if (useTrans) {
1100         if (errCode == E_OK) {
1101             errCode = Commit();
1102         } else {
1103             (void)Rollback(); // Keep the error code of the first scene
1104         }
1105     }
1106     return errCode;
1107 }
1108 
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1109 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1110     bool isGettingDeletedData) const
1111 {
1112     RowDataWithLog data;
1113     int errCode = SQLiteRelationalUtils::GetLogData(stmt, data.logInfo);
1114     if (errCode != E_OK) {
1115         LOGE("relational data value transfer to kv fail");
1116         return errCode;
1117     }
1118     std::vector<FieldInfo> serializeFields;
1119     if (!isGettingDeletedData) {
1120         auto fields = table_.GetFieldInfos();
1121         for (size_t cid = 0; cid < fields.size(); ++cid) {
1122             if (localSchema_.IsNeedSkipSyncField(fields[cid], table_.GetTableName())) {
1123                 continue;
1124             }
1125             DataValue value;
1126             errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1127                 value);
1128             if (errCode != E_OK) {
1129                 return errCode;
1130             }
1131             data.rowData.push_back(std::move(value)); // sorted by cid
1132             serializeFields.push_back(fields[cid]);
1133         }
1134     }
1135 
1136     errCode = DataTransformer::SerializeDataItem(data,
1137         isGettingDeletedData ? std::vector<FieldInfo>() : serializeFields, dataItem);
1138     if (errCode != E_OK) {
1139         LOGE("relational data value transfer to kv fail");
1140     }
1141     return errCode;
1142 }
1143 
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1144 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1145 {
1146     int errCode = GetDataItemForSync(fullStmt, item, false);
1147     if (errCode != E_OK) {
1148         return errCode;
1149     }
1150     item.value = {};
1151     item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1152     return E_OK;
1153 }
1154 
1155 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1156 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp &timestamp)
1157 {
1158     if (stmt == nullptr) {
1159         return -E_INVALID_ARGS;
1160     }
1161     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1162     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1163         timestamp = INT64_MAX;
1164         errCode = E_OK;
1165     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1166         timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3));  // 3 means timestamp index
1167         errCode = E_OK;
1168     }
1169     return errCode;
1170 }
1171 
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1172 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1173     std::vector<DataItem> &dataItems, DataItem &&item)
1174 {
1175     // If one record is over 4M, ignore it.
1176     if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1177         overLongSize++;
1178     } else {
1179         // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1180         dataTotalSize += GetDataItemSerialSize(item, appendLength);
1181         if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1182             return -E_UNFINISHED;
1183         } else {
1184             dataItems.push_back(item);
1185         }
1186     }
1187     return E_OK;
1188 }
1189 }
1190 
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1191 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1192     sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1193 {
1194     if (!isFirstTime) { // For the first time, never step before, can get nothing
1195         int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1196         if (errCode != E_OK) {
1197             return errCode;
1198         }
1199     }
1200     return StepNext(isMemDb_, queryStmt, queryTime);
1201 }
1202 
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1203 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1204     Timestamp &missQueryTime)
1205 {
1206     int errCode = GetMissQueryData(fullStmt, item);
1207     if (errCode != E_OK) {
1208         return errCode;
1209     }
1210     return StepNext(isMemDb_, fullStmt, missQueryTime);
1211 }
1212 
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1213 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1214     const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1215     const TableInfo &tableInfo)
1216 {
1217     baseTblName_ = tableInfo.GetTableName();
1218     SetTableInfo(tableInfo);
1219     sqlite3_stmt *queryStmt = nullptr;
1220     sqlite3_stmt *fullStmt = nullptr;
1221     bool isGettingDeletedData = false;
1222     int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1223     if (errCode != E_OK) {
1224         return errCode;
1225     }
1226 
1227     Timestamp queryTime = 0;
1228     Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1229 
1230     bool isFirstTime = true;
1231     size_t dataTotalSize = 0;
1232     size_t overLongSize = 0;
1233     do {
1234         DataItem item;
1235         if (queryTime < missQueryTime) {
1236             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1237         } else if (queryTime == missQueryTime) {
1238             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1239             if (errCode != E_OK) {
1240                 break;
1241             }
1242             errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1243         } else {
1244             errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1245         }
1246 
1247         if (errCode == E_OK && !isFirstTime) {
1248             errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1249         }
1250 
1251         if (errCode != E_OK) {
1252             break;
1253         }
1254 
1255         isFirstTime = false;
1256         if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1257             errCode = -E_FINISHED;
1258             break;
1259         }
1260     } while (true);
1261     LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1262         errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1263     SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1264     SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1265     return errCode;
1266 }
1267 
CheckDBModeForRelational()1268 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1269 {
1270     std::string journalMode;
1271     int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1272 
1273     for (auto &c : journalMode) { // convert to lowercase
1274         c = static_cast<char>(std::tolower(c));
1275     }
1276 
1277     if (errCode == E_OK && journalMode != "wal") {
1278         LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1279         return -E_NOT_SUPPORT;
1280     }
1281     return errCode;
1282 }
1283 
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1284 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1285     const std::string &tableName)
1286 {
1287     std::vector<std::string> deviceTables;
1288     int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1289     if (errCode != E_OK) {
1290         LOGE("Get device table name for alter table failed. %d", errCode);
1291         return errCode;
1292     }
1293 
1294     LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1295     for (const auto &table : deviceTables) {
1296         std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1297         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1298         if (errCode != E_OK) {
1299             LOGE("Delete device data failed. %d", errCode);
1300             break;
1301         }
1302     }
1303     return errCode;
1304 }
1305 
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1306 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1307 {
1308     std::string deleteLogSql =
1309         "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1310         "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1311     return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1312 }
1313 
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1314 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1315     const std::string &tableName)
1316 {
1317     std::string deleteLogSql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1318         "_log WHERE device = ?";
1319     sqlite3_stmt *deleteLogStmt = nullptr;
1320     int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1321     if (errCode != E_OK) {
1322         LOGE("Get delete device data log statement failed. %d", errCode);
1323         return errCode;
1324     }
1325 
1326     errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1327     if (errCode != E_OK) {
1328         LOGE("Bind device to delete data log statement failed. %d", errCode);
1329         SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1330         return errCode;
1331     }
1332 
1333     errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1334     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1335         errCode = E_OK;
1336     } else {
1337         LOGE("Delete data log failed. %d", errCode);
1338     }
1339 
1340     SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1341     return errCode;
1342 }
1343 
DeleteDistributedLogTable(const std::string & tableName)1344 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1345 {
1346     if (tableName.empty()) {
1347         return -E_INVALID_ARGS;
1348     }
1349     std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1350     std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1351     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1352     if (errCode != E_OK) {
1353         LOGE("Delete distributed log table failed. %d", errCode);
1354     }
1355     return errCode;
1356 }
1357 
IsTableOnceDropped(const std::string & tableName,bool & onceDropped)1358 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, bool &onceDropped)
1359 {
1360     std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1361     Key key;
1362     DBCommon::StringToVector(keyStr, key);
1363     Value value;
1364 
1365     int errCode = GetKvData(key, value);
1366     if (errCode == E_OK) {
1367         onceDropped = true;
1368         return E_OK;
1369     } else if (errCode == -E_NOT_FOUND) {
1370         onceDropped = false;
1371         return E_OK;
1372     }
1373     return errCode;
1374 }
1375 
CleanResourceForDroppedTable(const std::string & tableName)1376 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1377 {
1378     int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1379     if (errCode != E_OK) {
1380         LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1381         return errCode;
1382     }
1383     errCode = DeleteDistributedLogTable(tableName);
1384     if (errCode != E_OK) {
1385         LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1386         return errCode;
1387     }
1388     errCode = DeleteTableTrigger(tableName);
1389     if (errCode != E_OK) {
1390         LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1391     }
1392     return errCode;
1393 }
1394 
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1395 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1396     std::vector<std::string> &missingTables)
1397 {
1398     if (tableNames.empty()) {
1399         return E_OK;
1400     }
1401     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1402     sqlite3_stmt *stmt = nullptr;
1403     int ret = E_OK;
1404     int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1405     if (errCode != E_OK) {
1406         SQLiteUtils::ResetStatement(stmt, true, ret);
1407         return errCode;
1408     }
1409     for (const auto &tableName : tableNames) {
1410         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1411         if (errCode != E_OK) {
1412             LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1413             break;
1414         }
1415 
1416         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1417         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1418             errCode = CleanResourceForDroppedTable(tableName);
1419             if (errCode != E_OK) {
1420                 LOGE("Clean resource for dropped table failed. %d", errCode);
1421                 break;
1422             }
1423             missingTables.emplace_back(tableName);
1424         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1425             LOGE("[CheckAndCleanDisTable]Check distributed table failed. %d", errCode);
1426             break;
1427         }
1428         errCode = E_OK; // Check result ok for distributed table is still exists
1429         SQLiteUtils::ResetStatement(stmt, false, ret);
1430     }
1431     SQLiteUtils::ResetStatement(stmt, true, ret);
1432     return CheckCorruptedStatus(errCode);
1433 }
1434 
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1435 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1436     const TableInfo &baseTbl, const StoreInfo &info)
1437 {
1438     if (dbHandle_ == nullptr) {
1439         return -E_INVALID_DB;
1440     }
1441 
1442     if (device.empty() || !baseTbl.IsValid()) {
1443         return -E_INVALID_ARGS;
1444     }
1445 
1446     std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1447     int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1448     if (errCode != E_OK) {
1449         LOGE("Create device table failed. %d", errCode);
1450         return errCode;
1451     }
1452 
1453     errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1454     if (errCode != E_OK) {
1455         LOGE("Copy index to device table failed. %d", errCode);
1456     }
1457     return errCode;
1458 }
1459 
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1460 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1461     const std::string &schemaVersion)
1462 {
1463     if (dbHandle_ == nullptr) {
1464         return -E_INVALID_DB;
1465     }
1466 
1467     TableInfo newTable;
1468     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1469     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1470         LOGE("Check new schema failed. %d", errCode);
1471         return errCode;
1472     } else {
1473         errCode = table.CompareWithTable(newTable, schemaVersion);
1474         if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1475             LOGE("Check schema failed, schema was changed. %d", errCode);
1476             return -E_DISTRIBUTED_SCHEMA_CHANGED;
1477         } else {
1478             errCode = E_OK;
1479         }
1480     }
1481 
1482     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1483     if (errCode != E_OK) {
1484         LOGE("Get query helper for check query failed. %d", errCode);
1485         return errCode;
1486     }
1487 
1488     if (!query.IsQueryForRelationalDB()) {
1489         LOGE("Not support for this query type.");
1490         return -E_NOT_SUPPORT;
1491     }
1492 
1493     SyncTimeRange defaultTimeRange;
1494     sqlite3_stmt *stmt = nullptr;
1495     errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1496         stmt);
1497     if (errCode != E_OK) {
1498         LOGE("Get query statement for check query failed. %d", errCode);
1499     }
1500 
1501     SQLiteUtils::ResetStatement(stmt, true, errCode);
1502     return errCode;
1503 }
1504 
CheckQueryObjectLegal(const QuerySyncObject & query)1505 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1506 {
1507     if (dbHandle_ == nullptr) {
1508         return -E_INVALID_DB;
1509     }
1510     TableInfo newTable;
1511     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1512     if (errCode != E_OK) {
1513         LOGE("Check new schema failed. %d", errCode);
1514     }
1515     return errCode;
1516 }
1517 
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1518 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1519     Timestamp &maxTimestamp) const
1520 {
1521     maxTimestamp = 0;
1522     for (const auto &tableName : tableNames) {
1523         const std::string sql = "SELECT MAX(timestamp) FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1524             "_log;";
1525         sqlite3_stmt *stmt = nullptr;
1526         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1527         if (errCode != E_OK) {
1528             return errCode;
1529         }
1530         errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1531         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1532             maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1533             errCode = E_OK;
1534         }
1535         SQLiteUtils::ResetStatement(stmt, true, errCode);
1536         if (errCode != E_OK) {
1537             maxTimestamp = 0;
1538             return errCode;
1539         }
1540     }
1541     return E_OK;
1542 }
1543 
SetLogTriggerStatus(bool status)1544 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1545 {
1546     const std::string key = "log_trigger_switch";
1547     std::string val = status ? "true" : "false";
1548     std::string sql = "INSERT OR REPLACE INTO " + std::string(DBConstant::RELATIONAL_PREFIX) + "metadata" +
1549         " VALUES ('" + key + "', '" + val + "')";
1550     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1551     if (errCode != E_OK) {
1552         LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1553     }
1554     return errCode;
1555 }
1556 
1557 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1558 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1559     std::vector<RelationalRowData *> &data)
1560 {
1561     size_t totalLength = 0;
1562     do {
1563         int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1564         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1565             return E_OK;
1566         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1567             LOGE("Get data by bind sql failed:%d", errCode);
1568             return errCode;
1569         }
1570 
1571         if (colNames.empty()) {
1572             SQLiteUtils::GetSelectCols(stmt, colNames);  // Get column names.
1573         }
1574         auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1575         if (relaRowData == nullptr) {
1576             LOGE("ExecuteQueryBySqlStmt OOM");
1577             return -E_OUT_OF_MEMORY;
1578         }
1579 
1580         auto dataSz = relaRowData->CalcLength();
1581         if (dataSz == 0) {  // invalid data
1582             delete relaRowData;
1583             relaRowData = nullptr;
1584             continue;
1585         }
1586 
1587         totalLength += static_cast<size_t>(dataSz);
1588         if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) {  // the set has been full
1589             delete relaRowData;
1590             relaRowData = nullptr;
1591             LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1592             return -E_REMOTE_OVER_SIZE;
1593         }
1594         data.push_back(relaRowData);
1595     } while (true);
1596     return E_OK;
1597 }
1598 }
1599 
1600 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1601 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1602     const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1603     std::vector<RelationalRowData *> &data)
1604 {
1605     int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1606     if (errCode != E_OK) {
1607         return errCode;
1608     }
1609 
1610     sqlite3_stmt *stmt = nullptr;
1611     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1612     if (errCode != E_OK) {
1613         (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1614         return errCode;
1615     }
1616     ResFinalizer finalizer([this, &stmt, &errCode] {
1617         (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1618         SQLiteUtils::ResetStatement(stmt, true, errCode);
1619     });
1620     for (size_t i = 0; i < bindArgs.size(); ++i) {
1621         errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1622         if (errCode != E_OK) {
1623             return errCode;
1624         }
1625     }
1626     return GetRowDatas(stmt, isMemDb_, colNames, data);
1627 }
1628 
CheckEncryptedOrCorrupted() const1629 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1630 {
1631     if (dbHandle_ == nullptr) {
1632         return -E_INVALID_DB;
1633     }
1634 
1635     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1636     if (errCode != E_OK) {
1637         LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1638     }
1639     return errCode;
1640 }
1641 
GetExistsDeviceList(std::set<std::string> & devices) const1642 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1643 {
1644     return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1645         isMemDb_, devices);
1646 }
1647 
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1648 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1649     const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1650     bool isCompensatedTask, std::vector<std::string> &cloudGid)
1651 {
1652     sqlite3_stmt *queryStmt = nullptr;
1653     int errCode = E_OK;
1654     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1655     if (errCode != E_OK) {
1656         return errCode;
1657     }
1658     std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1659         isCompensatedTask);
1660     errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, queryStmt);
1661     if (errCode != E_OK) {
1662         return errCode;
1663     }
1664     do {
1665         errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1666         if (errCode != E_OK) {
1667             errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1668             break;
1669         }
1670         GetCloudGid(queryStmt, cloudGid);
1671     } while (errCode == E_OK);
1672     int resetStatementErrCode = E_OK;
1673     SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1674     queryStmt = nullptr;
1675     return (errCode == E_OK ? resetStatementErrCode : errCode);
1676 }
1677 
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1678 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1679     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1680 {
1681     VBucket log;
1682     VBucket extraLog;
1683     uint32_t preSize = totalSize;
1684     int64_t revisedTime = 0;
1685     int64_t invalidTime = 0;
1686     GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
1687     GetCloudExtraLog(statement, extraLog);
1688     if (revisedTime != 0) {
1689         Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
1690         cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
1691     }
1692 
1693     VBucket data;
1694     int64_t flag = 0;
1695     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1696     if (errCode != E_OK) {
1697         return errCode;
1698     }
1699     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1700         for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1701             Type cloudValue;
1702             errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1703                 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1704             if (errCode != E_OK) {
1705                 return errCode;
1706             }
1707             SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1708             errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1709             if (errCode != E_OK) {
1710                 return errCode;
1711             }
1712         }
1713     }
1714 
1715     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1716         errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1717     } else {
1718         errCode = -E_UNFINISHED;
1719     }
1720     if (errCode == -E_IGNORE_DATA) {
1721         errCode = E_OK;
1722         totalSize = preSize;
1723         stepNum--;
1724     }
1725     return errCode;
1726 }
1727 
SetLocalSchema(const RelationalSchemaObject & localSchema)1728 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1729 {
1730     localSchema_ = localSchema;
1731 }
1732 
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1733 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1734 {
1735     std::string setFlag;
1736     if (mode == FLAG_ONLY && isLogicDelete_) {
1737         setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1738     } else {
1739         setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1740     }
1741     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1742         ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1743         SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1744         CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1745     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1746     if (errCode != E_OK) {
1747         LOGE("clean cloud log failed, %d", errCode);
1748         return errCode;
1749     }
1750     cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1751     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1752     if (errCode != E_OK) {
1753         LOGE("delete cloud log failed, %d", errCode);
1754         return errCode;
1755     }
1756     // set all flag logout and data upload is not finished.
1757     cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1758     if (mode == FLAG_ONLY) {
1759         cleanLogSql += " = flag | 0x800 & ~0x400;";
1760     } else {
1761         cleanLogSql += " = flag & ~0x400;";
1762     }
1763     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1764 }
1765 
CleanUploadFinishedFlag(const std::string & tableName)1766 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1767 {
1768     // unset upload finished flag
1769     std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1770         CloudDbConstant::FLAG + " = flag & ~0x400;";
1771     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1772 }
1773 
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1774 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1775     const std::string &logTableName, const RelationalSchemaObject &localSchema)
1776 {
1777     std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1778         " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1779         ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1780         "));";
1781     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1782     if (errCode != E_OK) {
1783         LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1784         return errCode;
1785     }
1786     std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1787         FLAG_IS_CLOUD_CONSISTENCY + ";";
1788     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1789     if (errCode != E_OK) {
1790         LOGE("Failed to delete cloud data on log table, %d.", errCode);
1791         return errCode;
1792     }
1793     errCode = DoCleanAssetId(tableName, localSchema);
1794     if (errCode != E_OK) {
1795         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1796         return errCode;
1797     }
1798     errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1799     if (errCode != E_OK) {
1800         LOGE("Failed to clean gid on log table, %d.", errCode);
1801     }
1802     return errCode;
1803 }
1804 
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1805 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1806 {
1807     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1808         SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1809         CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1810     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1811     if (errCode != E_OK) {
1812         LOGE("change cloud log flag failed, %d", errCode);
1813     }
1814     return errCode;
1815 }
1816 
SetDataOnUserTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1817 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTableWithLogicDelete(const std::string &tableName,
1818     const std::string &logTableName)
1819 {
1820     UpdateCursorContext context;
1821     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1822     LOGI("removeData on userTable:%s length:%d start and cursor is %llu.",
1823         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1824     errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1825     if (errCode != E_OK) {
1826         LOGE("Failed to create updateCursor func on userTable errCode=%d.", errCode);
1827         return errCode;
1828     }
1829     // data from cloud and not modified by local or consistency with cloud to flag logout
1830     std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_WHEN_LOGOUT +
1831                       ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1832                       SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1833                       " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD_CONSISTENCY + " OR " +
1834                       FLAG_IS_CLOUD + ") AND NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE
1835                       + "));";
1836     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1837     // here just clear updateCursor func, fail will not influence other function
1838     (void)CreateFuncUpdateCursor(context, nullptr);
1839     if (errCode != E_OK) {
1840         LOGE("Failed to change cloud data flag on userTable, %d.", errCode);
1841         return errCode;
1842     }
1843     // clear some column when data is logicDelete or physical delete
1844     sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1845           " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1846     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1847     if (errCode != E_OK) {
1848         LOGE("Failed to deal logic delete data flag on userTable, %d.", errCode);
1849         return errCode;
1850     }
1851     LOGI("removeData on userTable:%s length:%d finish and cursor is %llu.",
1852         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1853     errCode = SetCursor(tableName, context.cursor);
1854     if (errCode != E_OK) {
1855         LOGE("set new cursor after removeData error %d.", errCode);
1856         return errCode;
1857     }
1858     return ChangeCloudDataFlagOnLogTable(logTableName);
1859 }
1860 
SetDataOnShareTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1861 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnShareTableWithLogicDelete(const std::string &tableName,
1862     const std::string &logTableName)
1863 {
1864     UpdateCursorContext context;
1865     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1866     LOGI("removeData on shareTable:%s length:%d start and cursor is %llu.",
1867         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1868     errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1869     if (errCode != E_OK) {
1870         LOGE("Failed to create updateCursor func on shareTable errCode=%d.", errCode);
1871         return errCode;
1872     }
1873     std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " +
1874                       SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1875                       CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1876                       " WHERE (NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE + "));";
1877     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1878     // here just clear updateCursor func, fail will not influence other function
1879     (void)CreateFuncUpdateCursor(context, nullptr);
1880     if (errCode != E_OK) {
1881         LOGE("Failed to change cloud data flag on shareTable, %d.", errCode);
1882         return errCode;
1883     }
1884     // clear some column when data is logicDelete or physical delete
1885     sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1886           " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1887     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1888     if (errCode != E_OK) {
1889         LOGE("Failed to deal logic delete data flag on shareTable, %d.", errCode);
1890         return errCode;
1891     }
1892     LOGI("removeData on shareTable:%s length:%d finish and cursor is %llu.",
1893         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1894     return SetCursor(tableName, context.cursor);
1895 }
1896 
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1897 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1898     std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1899 {
1900     sqlite3_stmt *selectStmt = nullptr;
1901     std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1902         " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1903     if (distinguishCloudFlag) {
1904         sql += " AND (";
1905         sql += FLAG_IS_CLOUD;
1906         sql += " OR ";
1907         sql += FLAG_IS_CLOUD_CONSISTENCY;
1908         sql += " )";
1909     }
1910     sql += ";";
1911     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1912     if (errCode != E_OK) {
1913         LOGE("Get select data_key statement failed, %d", errCode);
1914         return errCode;
1915     }
1916     do {
1917         errCode = SQLiteUtils::StepWithRetry(selectStmt);
1918         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1919             dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1920         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1921             LOGE("SQLite step failed when query log's data_key : %d", errCode);
1922             break;
1923         }
1924     } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1925     SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1926     return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1927 }
1928 
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1929 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1930     const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1931 {
1932     std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1933     if (opType == OpType::ONLY_UPDATE_GID) {
1934         updateLogSql += "cloud_gid = ?";
1935         updateColName.push_back(CloudDbConstant::GID_FIELD);
1936         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1937     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1938         updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1939         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1940     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1941         updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1942         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1943     }  else if (opType == OpType::UPDATE_TIMESTAMP) {
1944         updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1945             ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1946         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1947     } else if (opType == OpType::CLEAR_GID) {
1948         updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1949             std::to_string(SET_FLAG_ZERO_MASK);
1950     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1951         updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1952         updateColName.push_back(CloudDbConstant::GID_FIELD);
1953         updateLogSql += ", version = ?";
1954         updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1955     } else {
1956         updateLogSql += " device = 'cloud', timestamp = ?,";
1957         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1958         if (opType == OpType::DELETE) {
1959             int errCode = GetCloudDeleteSql(tableSchema.name, updateLogSql);
1960             if (errCode != E_OK) {
1961                 return errCode;
1962             }
1963         } else {
1964             updateLogSql += GetUpdateDataFlagSql(vBucket) + ", cloud_gid = ?";
1965             updateColName.push_back(CloudDbConstant::GID_FIELD);
1966             CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1967         }
1968     }
1969 
1970     int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1971     if (errCode != E_OK) {
1972         return errCode;
1973     }
1974 
1975     errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1976     if (errCode != E_OK) {
1977         LOGE("Get update log statement failed when update cloud data, %d", errCode);
1978     }
1979     return errCode;
1980 }
1981 
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)1982 int SQLiteSingleVerRelationalStorageExecutor::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
1983     LockStatus &status)
1984 {
1985     std::string sql = "select status from " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = '" +
1986         gid + "';";
1987     sqlite3_stmt *stmt = nullptr;
1988     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1989     if (errCode != E_OK) {
1990         LOGE("Get stmt failed when get lock status: %d", errCode);
1991         return errCode;
1992     }
1993     errCode = SQLiteUtils::StepWithRetry(stmt);
1994     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1995         status = static_cast<LockStatus>(sqlite3_column_int(stmt, 0));
1996         errCode = E_OK;
1997     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1998         LOGE("Not found lock status.");
1999         errCode = -E_NOT_FOUND;
2000     } else {
2001         LOGE("Get lock status failed: %d", errCode);
2002     }
2003     int resetRet;
2004     SQLiteUtils::ResetStatement(stmt, true, resetRet);
2005     return errCode;
2006 }
2007 
UpdateHashKey(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType)2008 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKey(DistributedTableMode mode, const TableInfo &tableInfo,
2009     TableSyncType syncType)
2010 {
2011     auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
2012     auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
2013     std::string sql = "UPDATE " + logName + " SET hash_key = hash_key || '_old' where data_key in " +
2014         "(select _rowid_ from '" + tableInfo.GetTableName() + "');";
2015     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2016     if (errCode != E_OK) {
2017         return errCode;
2018     }
2019     sql = "UPDATE " + logName + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
2020         tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, "") + " AS hash_value " +
2021         "FROM '" + tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + logName + ".data_key;";
2022     return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
2023 }
2024 
SetTableMode(DistributedTableMode mode)2025 void SQLiteSingleVerRelationalStorageExecutor::SetTableMode(DistributedTableMode mode)
2026 {
2027     mode_ = mode;
2028 }
2029 } // namespace DistributedDB
2030 #endif
2031