• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17 
18 #include <algorithm>
19 #include <optional>
20 
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "time_helper.h"
32 #include "value_hash_calc.h"
33 #include "device_tracker_log_table_manager.h"
34 
35 namespace DistributedDB {
36 namespace {
37 static constexpr const char *DATAKEY = "DATA_KEY";
38 static constexpr const char *DEVICE_FIELD = "DEVICE";
39 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
40 static constexpr const char *VERSION = "VERSION";
41 static constexpr const char *SHARING_RESOURCE = "SHARING_RESOURCE";
42 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
43 static constexpr const char *FLAG_IS_CLOUD_CONSISTENCY = "FLAG & 0x20 = 0"; // see if flag is cloud_consistency
44 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
45 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
46     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x02 | 0x20) & (~0x10) END)";
47 // clean 5th bit of flag to one which is wait compensated sync
48 static constexpr const char *SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC = "(CASE WHEN data_key = -1 and "
49     "FLAG & 0x02 = 0x02 THEN FLAG & (~0x10) & (~0x20) ELSE (FLAG | 0x20) & (~0x10) END)";
50 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
51 // set data logic delete, exist passport, delete, not compensated, cloud and consistency
52 static constexpr const char *SET_FLAG_WHEN_LOGOUT = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12) & (~0x20)";
53 // set data logic delete, exist passport, delete, not compensated and cloud
54 static constexpr const char *SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE = "(FLAG | 0x08 | 0x800 | 0x01) & (~0x12)";
55 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
56 static constexpr const char *UPDATE_CURSOR_SQL = "cursor=update_cursor()";
57 static constexpr const int SET_FLAG_ZERO_MASK = ~0x04; // clear 2th bit of flag
58 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag
59 static constexpr const int SET_CLOUD_FLAG = ~0x02; // set 1th bit of flag to 0
60 static constexpr const int DATA_KEY_INDEX = 0;
61 static constexpr const int TIMESTAMP_INDEX = 3;
62 static constexpr const int W_TIMESTAMP_INDEX = 4;
63 static constexpr const int FLAG_INDEX = 5;
64 static constexpr const int HASH_KEY_INDEX = 6;
65 static constexpr const int CLOUD_GID_INDEX = 7;
66 static constexpr const int VERSION_INDEX = 8;
67 static constexpr const int STATUS_INDEX = 9;
68 
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)69 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
70 {
71     if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
72         return SQLITE_DENY;
73     }
74     if (b == SQLITE_FUNCTION) {
75         if (d != nullptr && (strcmp(d, "fts3_tokenizer") == 0)) {
76             LOGE("Deny fts3_tokenizer in remote query");
77             return SQLITE_DENY;
78         }
79     }
80     return SQLITE_OK;
81 }
82 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)83 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
84     DistributedTableMode mode)
85     : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
86       assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT),
87       maxUploadCount_(0), maxUploadSize_(0)
88 {
89     bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
90     bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
91     bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
92     bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
93     bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
94     bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
95     bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
96 }
97 
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)98 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
99 {
100     std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
101     if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
102         LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
103         return -E_NOT_SUPPORT;
104     }
105     std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
106     for (const auto &field : fieldInfos) {
107         if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
108             LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
109             return -E_NOT_SUPPORT;
110         }
111     }
112 
113     if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
114         if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
115             LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
116             return -E_NOT_SUPPORT;
117         }
118 
119         if (mode == DistributedTableMode::COLLABORATION) {
120             if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
121                 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
122                 return -E_NOT_SUPPORT;
123             }
124         }
125 
126         if (syncType == CLOUD_COOPERATION) {
127             int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
128             if (errCode != E_OK) {
129                 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
130                 return errCode;
131             }
132         }
133     }
134 
135     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
136         if (table.GetPrimaryKey().size() > 1) {
137             LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
138             return -E_NOT_SUPPORT;
139         }
140     }
141 
142     return E_OK;
143 }
144 
GeneLogInfoForExistedData(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTrackerTable)145 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &identity,
146     const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTrackerTable)
147 {
148     if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
149         return UpdateTrackerTable(db, identity, tableInfo, logMgrPtr, false);
150     }
151     SQLiteRelationalUtils::GenLogParam param = {db, isMemDb_, isTrackerTable};
152     return SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, tableInfo, logMgrPtr, param);
153 }
154 
ResetLogStatus(std::string & tableName)155 int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(std::string &tableName)
156 {
157     int errCode = SetLogTriggerStatus(false);
158     if (errCode != E_OK) {
159         LOGE("Fail to set log trigger on when reset log status, %d", errCode);
160         return errCode;
161     }
162     std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
163     std::string sql = "UPDATE " + logTable + " SET status = 0;";
164     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
165     if (errCode != E_OK) {
166         LOGE("Failed to initialize cloud type log data.%d", errCode);
167     }
168     return errCode;
169 }
170 
UpdateTrackerTable(sqlite3 * db,const std::string & identity,const TableInfo & tableInfo,std::unique_ptr<SqliteLogTableManager> & logMgrPtr,bool isTimestampOnly)171 int SQLiteSingleVerRelationalStorageExecutor::UpdateTrackerTable(sqlite3 *db, const std::string &identity,
172     const TableInfo &tableInfo, std::unique_ptr<SqliteLogTableManager> &logMgrPtr, bool isTimestampOnly)
173 {
174     int errCode = SetLogTriggerStatus(false);
175     if (errCode != E_OK) {
176         return errCode;
177     }
178     std::string currentLocalTimeStr;
179     std::tie(errCode, currentLocalTimeStr) = SQLiteRelationalUtils::GetCurrentVirtualTime(db);
180     if (errCode != E_OK) {
181         LOGE("Failed to get local timeOffset.%d", errCode);
182         return errCode;
183     }
184     std::string tableName = tableInfo.GetTableName();
185     std::string logTable = DBCommon::GetLogTableName(tableName);
186     TrackerTable trackerTable = tableInfo.GetTrackerTable();
187     trackerTable.SetTableName(tableName);
188     if (isTimestampOnly) {
189         std::string sql = "update " + logTable + " set timestamp = " + currentLocalTimeStr +
190             " + data_key, wtimestamp = " + currentLocalTimeStr + " + data_key where data_key != -1;";
191         errCode = SQLiteUtils::ExecuteRawSQL(db, sql);
192         if (errCode != E_OK) {
193             LOGE("Failed to initialize device type log data.%d", errCode);
194             return errCode;
195         }
196         return UpgradedLogForExistedData(tableInfo, false);
197     }
198     std::string rowid = std::string(DBConstant::SQLITE_INNER_ROWID);
199     std::string flag = std::to_string(static_cast<uint32_t>(LogInfoFlag::FLAG_LOCAL) |
200         static_cast<uint32_t>(LogInfoFlag::FLAG_DEVICE_CLOUD_INCONSISTENCY));
201     std::string calPrimaryKeyHash = logMgrPtr->CalcPrimaryKeyHash("a.", tableInfo, identity);
202     std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + rowid + ", '', '', " + currentLocalTimeStr +
203         " + " + rowid + ", " + currentLocalTimeStr + " + " + rowid + ", " + flag + ", " + calPrimaryKeyHash +
204         ", '', '', '', '', '', 0 FROM '" + tableName + "' AS a WHERE 1=1;";
205     errCode = trackerTable.ReBuildTempTrigger(db, TriggerMode::TriggerModeEnum::INSERT, [db, &sql]() {
206         int ret = SQLiteUtils::ExecuteRawSQL(db, sql);
207         if (ret != E_OK) {
208             LOGE("Failed to initialize cloud type log data.%d", ret);
209         }
210         return ret;
211     });
212     if (errCode != E_OK) {
213         LOGE("Failed to initialize device type log data.%d", errCode);
214     }
215     return errCode;
216 }
217 
CreateRelationalLogTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)218 int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded,
219     const std::string &identity, TableInfo &table)
220 {
221     // create log table
222     std::unique_ptr<SqliteLogTableManager> tableManager;
223     tableManager = LogTableManagerFactory::GetTableManager(table, mode, table.GetTableSyncType());
224     if (tableManager == nullptr) {
225         LOGE("[CreateRelationalLogTable] get table manager failed");
226         return -E_INVALID_DB;
227     }
228     int errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
229     if (errCode != E_OK) {
230         LOGE("[CreateDistributedTable] create log table failed");
231         return errCode;
232     }
233     std::string tableName = table.GetTableName();
234     bool isOnceDropped = false;
235     (void)IsTableOnceDropped(tableName, isOnceDropped);
236     if (isOnceDropped) {
237         SQLiteRelationalUtils::GenLogParam param = {
238             dbHandle_, isMemDb_, table.GetTrackerTable().GetTableName().empty()
239         };
240         errCode = SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, table, tableManager, param);
241     } else if (!isUpgraded) {
242         if (table.GetTrackerTable().GetTableName().empty()) {
243             errCode = GeneLogInfoForExistedData(dbHandle_, identity, table, tableManager, false);
244         } else if (table.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
245             // tracker table -> distributed device table
246             errCode = UpdateTrackerTable(dbHandle_, tableName, table, tableManager, true);
247         } else {
248             // tracker table -> distributed cloud table
249             errCode = ResetLogStatus(tableName);
250         }
251     }
252     if (errCode != E_OK) {
253         LOGE("[CreateDistributedTable] generate log isUpgraded %d failed %d.", static_cast<int>(isUpgraded), errCode);
254         return errCode;
255     }
256 
257     // add trigger
258     errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
259     if (errCode != E_OK) {
260         LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
261         return errCode;
262     }
263     return SetLogTriggerStatus(true);
264 }
265 
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table)266 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
267     const std::string &identity, TableInfo &table)
268 {
269     if (dbHandle_ == nullptr) {
270         return -E_INVALID_DB;
271     }
272 
273     const std::string tableName = table.GetTableName();
274     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
275     if (errCode != E_OK) {
276         LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
277         return errCode;
278     }
279 
280     if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
281         bool isEmpty = false;
282         errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
283         if (errCode != E_OK) {
284             LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
285             return -E_NOT_SUPPORT;
286         }
287         if (!isEmpty) {
288             LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
289                 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
290                 static_cast<int>(table.GetTableSyncType()));
291         }
292     }
293 
294     errCode = CheckTableConstraint(table, mode, table.GetTableSyncType());
295     if (errCode != E_OK) {
296         LOGE("[CreateDistributedTable] check table constraint failed.");
297         return errCode;
298     }
299 
300     return CreateRelationalLogTable(mode, isUpgraded, identity, table);
301 }
302 
CompareSchemaTableColumns(const std::string & tableName)303 int SQLiteSingleVerRelationalStorageExecutor::CompareSchemaTableColumns(const std::string &tableName)
304 {
305     bool onceDropped = false;
306     int errCode = IsTableOnceDropped(tableName, onceDropped);
307     if (!onceDropped) {
308         // do not return error code to make sure main procedure will continue
309         return E_OK;
310     }
311     LOGI("[CompareSchemaTableColumns] table once dropped check schema. table name is %s length is %zu",
312         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size());
313     TableInfo newTableInfo;
314     errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
315     if (errCode != E_OK) {
316         LOGE("[CompareSchemaTableColumns] analysis table schema failed. %d", errCode);
317         return errCode;
318     }
319     // new table should has same or compatible upgrade
320     TableInfo tableInfo = localSchema_.GetTable(tableName);
321     errCode = tableInfo.CompareWithTable(newTableInfo, localSchema_.GetSchemaVersion());
322     if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
323         LOGE("[CompareSchemaTableColumns] Not support with incompatible table.");
324         return -E_SCHEMA_MISMATCH;
325     }
326     return errCode;
327 }
328 
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)329 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
330     DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
331 {
332     if (dbHandle_ == nullptr) {
333         return -E_INVALID_DB;
334     }
335     TableInfo newTableInfo;
336     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
337     if (errCode != E_OK) {
338         LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
339         return errCode;
340     }
341 
342     if (CheckTableConstraint(newTableInfo, mode, syncType)) {
343         LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
344         return -E_NOT_SUPPORT;
345     }
346 
347     // new table should has same or compatible upgrade
348     TableInfo tableInfo = schema.GetTable(tableName);
349     errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
350     bool onceDropped = false;
351     int ret = IsTableOnceDropped(tableName, onceDropped);
352     if (ret != E_OK) {
353         LOGE("[UpgradeDistributedTable] Get table %s dropped status fail %d.",
354             DBCommon::StringMiddleMasking(tableName).c_str(), ret);
355         return ret;
356     }
357     LOGI("[UpgradeDistributedTable] table name is %s length is %zu, the dropped status is %d, errCode is %d",
358         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), onceDropped, errCode);
359     if (!onceDropped) {
360         if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
361             LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
362             return -E_SCHEMA_MISMATCH;
363         } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
364             LOGD("[UpgradeDistributedTable] schema has not changed.");
365             // update table if tableName changed
366             schema.RemoveRelationalTable(tableName);
367             tableInfo.SetTableName(tableName);
368             schema.AddRelationalTable(tableInfo);
369             return E_OK;
370         }
371     }
372 
373     schemaChanged = true;
374     errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
375     if (errCode != E_OK) {
376         LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
377     }
378 
379     schema.AddRelationalTable(newTableInfo);
380     return errCode;
381 }
382 
383 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)384 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
385     std::vector<std::string> &deviceTables)
386 {
387     if (device.empty() && tableName.empty()) { // device and table name should not both be empty
388         return -E_INVALID_ARGS;
389     }
390     std::string devicePattern = device.empty() ? "%" : device;
391     std::string tablePattern = tableName.empty() ? "%" : tableName;
392     std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
393 
394     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
395         deviceTableName + "';";
396     sqlite3_stmt *stmt = nullptr;
397     int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
398     if (errCode != E_OK) {
399         return errCode;
400     }
401 
402     do {
403         errCode = SQLiteUtils::StepWithRetry(stmt, false);
404         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
405             errCode = E_OK;
406             break;
407         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
408             LOGE("Get table name failed. %d", errCode);
409             break;
410         }
411         std::string realTableName;
412         errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
413         if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
414             continue;
415         }
416         if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
417             continue;
418         }
419         deviceTables.emplace_back(realTableName);
420     } while (true);
421 
422     int ret = E_OK;
423     SQLiteUtils::ResetStatement(stmt, true, ret);
424     return errCode != E_OK ? errCode : ret;
425 }
426 
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)427 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
428 {
429     std::vector<FieldInfo> fields;
430     auto itOld = oldTableInfo.GetFields().begin();
431     auto itNew = newTableInfo.GetFields().begin();
432     for (; itNew != newTableInfo.GetFields().end(); itNew++) {
433         if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
434             fields.emplace_back(itNew->second);
435             continue;
436         }
437         itOld++;
438     }
439     return fields;
440 }
441 
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)442 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
443 {
444     if (db == nullptr) {
445         return -E_INVALID_ARGS;
446     }
447 
448     std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
449         return a.GetColumnId()< b.GetColumnId();
450     });
451     int errCode = E_OK;
452     for (const auto &table : tables) {
453         for (const auto &field : fields) {
454             std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
455             alterSql += "'" + field.GetDataType() + "'";
456             alterSql += field.IsNotNull() ? " NOT NULL" : "";
457             alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
458             alterSql += ";";
459             errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
460             if (errCode != E_OK) {
461                 LOGE("Alter table failed. %d", errCode);
462                 break;
463             }
464         }
465     }
466     return errCode;
467 }
468 
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)469 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
470 {
471     IndexInfoMap indexes;
472     auto itOld = oldTableInfo.GetIndexDefine().begin();
473     auto itNew = newTableInfo.GetIndexDefine().begin();
474     auto itOldEnd = oldTableInfo.GetIndexDefine().end();
475     auto itNewEnd = newTableInfo.GetIndexDefine().end();
476 
477     while (itOld != itOldEnd && itNew != itNewEnd) {
478         if (itOld->first == itNew->first) {
479             if (itOld->second != itNew->second) {
480                 indexes.insert({itNew->first, itNew->second});
481             }
482             itOld++;
483             itNew++;
484         } else if (itOld->first < itNew->first) {
485             indexes.insert({itOld->first, {}});
486             itOld++;
487         } else {
488             indexes.insert({itNew->first, itNew->second});
489             itNew++;
490         }
491     }
492 
493     while (itOld != itOldEnd) {
494         indexes.insert({itOld->first, {}});
495         itOld++;
496     }
497 
498     while (itNew != itNewEnd) {
499         indexes.insert({itNew->first, itNew->second});
500         itNew++;
501     }
502 
503     return indexes;
504 }
505 
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)506 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
507 {
508     if (db == nullptr) {
509         return -E_INVALID_ARGS;
510     }
511 
512     int errCode = E_OK;
513     for (const auto &table : tables) {
514         for (const auto &index : indexes) {
515             if (index.first.empty()) {
516                 continue;
517             }
518             std::string realIndexName = table + "_" + index.first;
519             std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
520             errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
521             if (errCode != E_OK) {
522                 LOGE("Drop index failed. %d", errCode);
523                 return errCode;
524             }
525 
526             if (index.second.empty()) { // empty means drop index only
527                 continue;
528             }
529 
530             auto it = index.second.begin();
531             std::string indexDefine = *it++;
532             while (it != index.second.end()) {
533                 indexDefine += ", " + *it++;
534             }
535             std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
536                 "(" + indexDefine + ");";
537             errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
538             if (errCode != E_OK) {
539                 LOGE("Create index failed. %d", errCode);
540                 break;
541             }
542         }
543     }
544     return errCode;
545 }
546 }
547 
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)548 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
549     const TableInfo &newTableInfo)
550 {
551     std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
552     IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
553     std::vector<std::string> deviceTables;
554     int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
555     if (errCode != E_OK) {
556         LOGE("Get device table name for alter table failed. %d", errCode);
557         return errCode;
558     }
559 
560     LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
561         upgradeIndexes.size(), deviceTables.size());
562     errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
563     if (errCode != E_OK) {
564         LOGE("upgrade fields failed. %d", errCode);
565         return errCode;
566     }
567 
568     errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
569     if (errCode != E_OK) {
570         LOGE("upgrade indexes failed. %d", errCode);
571     }
572 
573     return errCode;
574 }
575 
StartTransaction(TransactType type)576 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
577 {
578     if (dbHandle_ == nullptr) {
579         LOGE("Begin transaction failed, dbHandle is null.");
580         return -E_INVALID_DB;
581     }
582     int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
583     if (errCode != E_OK) {
584         LOGE("Begin transaction failed, errCode = %d", errCode);
585     }
586     return errCode;
587 }
588 
Commit()589 int SQLiteSingleVerRelationalStorageExecutor::Commit()
590 {
591     if (dbHandle_ == nullptr) {
592         return -E_INVALID_DB;
593     }
594 
595     return SQLiteUtils::CommitTransaction(dbHandle_);
596 }
597 
Rollback()598 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
599 {
600     if (dbHandle_ == nullptr) {
601         return -E_INVALID_DB;
602     }
603     int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
604     if (errCode != E_OK) {
605         LOGE("sqlite single ver relation storage executor rollback fail! errCode = [%d]", errCode);
606     }
607     return errCode;
608 }
609 
SetTableInfo(const TableInfo & tableInfo)610 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
611 {
612     table_ = tableInfo;
613 }
614 
615 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize,int64_t & revisedTime,int64_t & invalidTime)616 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize,
617     int64_t &revisedTime, int64_t &invalidTime)
618 {
619     int64_t modifyTime = static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX));
620     uint64_t curTime = 0;
621     if (TimeHelper::GetSysCurrentRawTime(curTime) == E_OK) {
622         if (modifyTime > static_cast<int64_t>(curTime)) {
623             invalidTime = modifyTime;
624             modifyTime = static_cast<int64_t>(curTime);
625             revisedTime = modifyTime;
626         }
627     } else {
628         LOGW("[Relational] get raw sys time failed.");
629     }
630     logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
631     logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
632         static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
633     totalSize += sizeof(int64_t) + sizeof(int64_t);
634     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
635         std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
636             sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
637         if (!cloudGid.empty()) {
638             logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
639             totalSize += cloudGid.size();
640         }
641     }
642     if (revisedTime != 0) {
643         std::string cloudGid;
644         if (logInfo.count(CloudDbConstant::GID_FIELD) != 0) {
645             cloudGid = std::get<std::string>(logInfo[CloudDbConstant::GID_FIELD]);
646         }
647         LOGW("[Relational] Found invalid mod time: %lld, curTime: %lld, gid: %s", invalidTime, revisedTime,
648             DBCommon::StringMiddleMasking(cloudGid).c_str());
649     }
650     std::string version;
651     SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
652     logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
653     totalSize += version.size();
654 }
655 
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)656 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
657 {
658     flags.insert_or_assign(DBConstant::ROWID,
659         static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
660     flags.insert_or_assign(CloudDbConstant::TIMESTAMP,
661         static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
662     flags.insert_or_assign(CloudDbConstant::FLAG,
663         static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
664     Bytes hashKey;
665     (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
666     flags.insert_or_assign(CloudDbConstant::HASH_KEY, hashKey);
667     flags.insert_or_assign(CloudDbConstant::STATUS,
668         static_cast<int64_t>(sqlite3_column_int(logStatement, STATUS_INDEX)));
669 }
670 
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)671 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
672 {
673     if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
674         return;
675     }
676     std::string gid = reinterpret_cast<const std::string::value_type *>(
677         sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
678     if (gid.empty()) {
679         LOGW("[Relational] Get cloud gid is null.");
680         return;
681     }
682     cloudGid.emplace_back(gid);
683 }
684 }
685 
GetDataItemSerialSize(DataItem & item,size_t appendLen)686 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
687 {
688     // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
689     // the size would not be very large.
690     static const size_t maxOrigDevLength = 40;
691     size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
692     size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
693         Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
694     return dataSize;
695 }
696 
GetKvData(const Key & key,Value & value) const697 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
698 {
699     return SQLiteRelationalUtils::GetKvData(dbHandle_, isMemDb_, key, value);
700 }
701 
GetKvDataByPrefixKey(const Key & keyPrefix,std::map<Key,Value> & data) const702 int SQLiteSingleVerRelationalStorageExecutor::GetKvDataByPrefixKey(const Key &keyPrefix,
703     std::map<Key, Value> &data) const
704 {
705     std::string metaTableName = std::string(DBConstant::RELATIONAL_PREFIX) + "metadata";
706     return SqliteMetaExecutor::GetMetaDataByPrefixKey(dbHandle_, isMemDb_, metaTableName, keyPrefix, data);
707 }
708 
PutKvData(const Key & key,const Value & value) const709 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
710 {
711     return SQLiteRelationalUtils::PutKvData(dbHandle_, isMemDb_, key, value);
712 }
713 
DeleteMetaData(const std::vector<Key> & keys) const714 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
715 {
716     static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
717         "metadata WHERE key=?;";
718     sqlite3_stmt *statement = nullptr;
719     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
720     if (errCode != E_OK) {
721         return errCode;
722     }
723 
724     int ret = E_OK;
725     for (const auto &key : keys) {
726         errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
727         if (errCode != E_OK) {
728             break;
729         }
730 
731         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
732         if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
733             break;
734         }
735         errCode = E_OK;
736         SQLiteUtils::ResetStatement(statement, false, ret);
737     }
738     SQLiteUtils::ResetStatement(statement, true, ret);
739     return CheckCorruptedStatus(errCode);
740 }
741 
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const742 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
743 {
744     static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " +
745         std::string(DBConstant::RELATIONAL_PREFIX) + "metadata WHERE key>=? AND key<=?;";
746     sqlite3_stmt *statement = nullptr;
747     int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
748     if (errCode != E_OK) {
749         return errCode;
750     }
751 
752     errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
753     if (errCode == E_OK) {
754         errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
755         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
756             errCode = E_OK;
757         }
758     }
759     int ret = E_OK;
760     SQLiteUtils::ResetStatement(statement, true, ret);
761     return CheckCorruptedStatus(errCode);
762 }
763 
GetAllMetaKeys(std::vector<Key> & keys) const764 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
765 {
766     static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + std::string(DBConstant::RELATIONAL_PREFIX) +
767         "metadata;";
768     sqlite3_stmt *statement = nullptr;
769     int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
770     if (errCode != E_OK) {
771         LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
772         return errCode;
773     }
774     errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
775     int ret = E_OK;
776     SQLiteUtils::ResetStatement(statement, true, ret);
777     return errCode != E_OK ? errCode : ret;
778 }
779 
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)780 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
781     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
782 {
783     if (stmt == nullptr) {
784         int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
785         if (errCode != E_OK) {
786             LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
787             return errCode;
788         }
789     }
790 
791     int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
792     if (errCode != E_OK) {
793         return errCode;
794     }
795     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
796     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
797         errCode = E_OK;
798     }
799     int ret = E_OK;
800     SQLiteUtils::ResetStatement(stmt, false, ret);  // Finalize outside.
801     return errCode != E_OK ? errCode : ret;
802 }
803 
SaveSyncDataItem(const DataItem & dataItem,bool isUpdate,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,std::map<std::string,Type> & saveVals)804 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, bool isUpdate,
805     SaveSyncDataStmt &saveStmt, RelationalSyncDataInserter &inserter, std::map<std::string, Type> &saveVals)
806 {
807     if (std::get_if<int64_t>(&saveVals[DBConstant::ROWID]) == nullptr) {
808         LOGE("[SaveSyncDataItem] Invalid args because of no rowid!");
809         return -E_INVALID_ARGS;
810     }
811     if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
812         int errCode = inserter.GetObserverDataByRowId(dbHandle_, std::get<int64_t>(saveVals[DBConstant::ROWID]),
813             ChangeType::OP_DELETE);
814         if (errCode != E_OK) {
815             LOGE("[SaveSyncDataItem] Failed to get primary data before deletion, errCode: %d", errCode);
816             return errCode;
817         }
818         saveVals[DBConstant::ROWID] = static_cast<int64_t>(-1);
819         return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
820     }
821     // we don't know the rowid if user drop device table
822     // SPLIT_BY_DEVICE use insert or replace to update data
823     // no pk table should delete by hash key(rowid) first
824     if ((mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().IsNoPkTable())) {
825         int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
826         if (errCode != E_OK) {
827             LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
828             return errCode;
829         }
830     }
831     int errCode = inserter.SaveData(isUpdate, dataItem, saveStmt, saveVals);
832     if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
833         return errCode;
834     }
835     if (!isUpdate) {
836         saveVals[DBConstant::ROWID] = SQLiteUtils::GetLastRowId(dbHandle_);
837     }
838     errCode = inserter.GetObserverDataByRowId(dbHandle_, std::get<int64_t>(saveVals[DBConstant::ROWID]),
839         isUpdate ? ChangeType::OP_UPDATE : ChangeType::OP_INSERT);
840     if (errCode != E_OK) {
841         LOGE("Get observer data by rowid failed, errCode=%d.", errCode);
842     }
843     return errCode;
844 }
845 
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)846 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
847     RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
848 {
849     if (stmt == nullptr) {
850         int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
851         if (errCode != E_OK) {
852             LOGE("[DeleteSyncLog] Get statement fail!");
853             return errCode;
854         }
855     }
856 
857     int errCode = inserter.BindHashKeyAndDev(dataItem, stmt, 1); // 1 is hashKey begin index
858     if (errCode != E_OK) {
859         return errCode;
860     }
861     errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
862     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
863         errCode = E_OK;
864     }
865     int ret = E_OK;
866     SQLiteUtils::ResetStatement(stmt, false, ret);  // Finalize outside.
867     return errCode != E_OK ? errCode : ret;
868 }
869 
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)870 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
871     RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
872 {
873     int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
874     if (errCode != E_OK) {
875         return errCode;
876     }
877     return DeleteSyncLog(item, inserter, rmLogStmt);
878 }
879 
CheckDataConflictDefeated(const DataItem & dataItem,const RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DeviceSyncSaveDataInfo & saveDataInfo)880 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
881     const RelationalSyncDataInserter &inserter, SaveSyncDataStmt &saveStmt, DeviceSyncSaveDataInfo &saveDataInfo)
882 {
883     auto &logInfoGet = saveDataInfo.localLogInfo;
884     int errCode = SQLiteRelationalUtils::GetLocalLogInfo(inserter, dataItem, mode_, logInfoGet, saveStmt);
885     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
886         LOGE("Failed to get raw data. %d", errCode);
887         return errCode;
888     }
889 
890     auto &rowId = saveDataInfo.rowId;
891     auto &isExist = saveDataInfo.isExist;
892     auto &isDefeated = saveDataInfo.isDefeated;
893     rowId = logInfoGet.dataKey;
894     isExist = (errCode != -E_NOT_FOUND) && ((logInfoGet.flag & static_cast<uint32_t>(LogInfoFlag::FLAG_DELETE)) == 0);
895     if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
896         mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
897         isDefeated = false; // no need to solve conflict except miss query data
898         return E_OK;
899     }
900     // defeated if item timestamp is earlier than raw data
901     isDefeated = (dataItem.timestamp < logInfoGet.timestamp);
902     return E_OK;
903 }
904 
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)905 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
906     SaveSyncDataStmt &saveStmt, DataItem &item)
907 {
908     DeviceSyncSaveDataInfo deviceSyncSaveDataInfo;
909     int errCode = CheckDataConflictDefeated(item, inserter, saveStmt, deviceSyncSaveDataInfo);
910     if (errCode != E_OK) {
911         LOGE("check data conflict failed. %d", errCode);
912         return errCode;
913     }
914     bool &isDefeated = deviceSyncSaveDataInfo.isDefeated;
915     bool &isExist = deviceSyncSaveDataInfo.isExist;
916     int64_t &rowid = deviceSyncSaveDataInfo.rowId;
917 
918     if (isDefeated) {
919         LOGD("Data was defeated.");
920         return E_OK;
921     }
922     if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
923         return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
924     }
925     if (!isExist && ((item.flag & DataItem::DELETE_FLAG) != 0)) {
926         LOGI("[RelationalStorageExecutor][SaveSyncDataItem] Delete non-exist data. Nothing to save.");
927         return E_OK;
928     }
929     bool isUpdate = isExist && mode_ == DistributedTableMode::COLLABORATION;
930     Key sourceHash = item.hashKey;
931     if (isUpdate) {
932         item.hashKey = deviceSyncSaveDataInfo.localLogInfo.hashKey;
933     }
934     std::map<std::string, Type> saveVals;
935     saveVals[DBConstant::ROWID] = rowid;
936     errCode = SaveSyncDataItem(item, isUpdate, saveStmt, inserter, saveVals);
937     if (errCode == E_OK) {
938         item.hashKey = sourceHash;
939         errCode = inserter.SaveSyncLog(dbHandle_, item, deviceSyncSaveDataInfo, saveVals, saveStmt);
940     }
941     return errCode;
942 }
943 
SaveSyncDataItems(RelationalSyncDataInserter & inserter)944 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
945 {
946     SaveSyncDataStmt saveStmt;
947     int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
948     if (errCode != E_OK) {
949         LOGE("Prepare insert sync data statement failed.");
950         return errCode;
951     }
952 
953     errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
954         if (item.neglect) { // Do not save this record if it is neglected
955             return E_OK;
956         }
957         int errCode = SaveSyncDataItem(inserter, saveStmt, item);
958         if (errCode != E_OK) {
959             LOGE("save sync data item failed. err=%d", errCode);
960             return errCode;
961         }
962         // Need not reset rmDataStmt and rmLogStmt here.
963         return saveStmt.ResetStatements(false);
964     });
965 
966     int ret = saveStmt.ResetStatements(true);
967     return errCode != E_OK ? errCode : ret;
968 }
969 
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)970 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
971 {
972     if (useTrans) {
973         int errCode = StartTransaction(TransactType::IMMEDIATE);
974         if (errCode != E_OK) {
975             return errCode;
976         }
977     }
978 
979     int errCode = SetLogTriggerStatus(false);
980     if (errCode != E_OK) {
981         goto END;
982     }
983     errCode = SaveSyncDataItems(inserter);
984     if (errCode != E_OK) {
985         LOGE("Save sync data items failed. errCode=%d", errCode);
986         goto END;
987     }
988     errCode = SetLogTriggerStatus(true);
989 END:
990     if (useTrans) {
991         if (errCode == E_OK) {
992             errCode = Commit();
993         } else {
994             (void)Rollback(); // Keep the error code of the first scene
995         }
996     }
997     return errCode;
998 }
999 
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1000 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1001     bool isGettingDeletedData) const
1002 {
1003     RowDataWithLog data;
1004     int errCode = SQLiteRelationalUtils::GetLogData(stmt, data.logInfo);
1005     if (errCode != E_OK) {
1006         LOGE("relational data value transfer to kv fail");
1007         return errCode;
1008     }
1009     std::vector<FieldInfo> serializeFields;
1010     if (!isGettingDeletedData) {
1011         auto fields = table_.GetFieldInfos();
1012         for (size_t cid = 0; cid < fields.size(); ++cid) {
1013             if (localSchema_.IsNeedSkipSyncField(fields[cid], table_.GetTableName())) {
1014                 continue;
1015             }
1016             DataValue value;
1017             errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1018                 value);
1019             if (errCode != E_OK) {
1020                 return errCode;
1021             }
1022             data.rowData.push_back(std::move(value)); // sorted by cid
1023             serializeFields.push_back(fields[cid]);
1024         }
1025     }
1026 
1027     errCode = DataTransformer::SerializeDataItem(data,
1028         isGettingDeletedData ? std::vector<FieldInfo>() : serializeFields, dataItem);
1029     if (errCode != E_OK) {
1030         LOGE("relational data value transfer to kv fail");
1031     }
1032     return errCode;
1033 }
1034 
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1035 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1036 {
1037     int errCode = GetDataItemForSync(fullStmt, item, false);
1038     if (errCode != E_OK) {
1039         return errCode;
1040     }
1041     item.value = {};
1042     item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1043     return E_OK;
1044 }
1045 
1046 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1047 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp &timestamp)
1048 {
1049     if (stmt == nullptr) {
1050         return -E_INVALID_ARGS;
1051     }
1052     int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1053     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1054         timestamp = INT64_MAX;
1055         errCode = E_OK;
1056     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1057         timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3));  // 3 means timestamp index
1058         errCode = E_OK;
1059     }
1060     return errCode;
1061 }
1062 
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1063 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1064     std::vector<DataItem> &dataItems, DataItem &&item)
1065 {
1066     // If one record is over 4M, ignore it.
1067     if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1068         overLongSize++;
1069     } else {
1070         // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1071         dataTotalSize += GetDataItemSerialSize(item, appendLength);
1072         if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1073             return -E_UNFINISHED;
1074         } else {
1075             dataItems.push_back(item);
1076         }
1077     }
1078     return E_OK;
1079 }
1080 }
1081 
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1082 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1083     sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1084 {
1085     if (!isFirstTime) { // For the first time, never step before, can get nothing
1086         int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1087         if (errCode != E_OK) {
1088             return errCode;
1089         }
1090     }
1091     return StepNext(isMemDb_, queryStmt, queryTime);
1092 }
1093 
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1094 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1095     Timestamp &missQueryTime)
1096 {
1097     int errCode = GetMissQueryData(fullStmt, item);
1098     if (errCode != E_OK) {
1099         return errCode;
1100     }
1101     return StepNext(isMemDb_, fullStmt, missQueryTime);
1102 }
1103 
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1104 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1105     const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1106     const TableInfo &tableInfo)
1107 {
1108     baseTblName_ = tableInfo.GetTableName();
1109     SetTableInfo(tableInfo);
1110     sqlite3_stmt *queryStmt = nullptr;
1111     sqlite3_stmt *fullStmt = nullptr;
1112     bool isGettingDeletedData = false;
1113     int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1114     if (errCode != E_OK) {
1115         return errCode;
1116     }
1117 
1118     Timestamp queryTime = 0;
1119     Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1120 
1121     bool isFirstTime = true;
1122     size_t dataTotalSize = 0;
1123     size_t overLongSize = 0;
1124     do {
1125         DataItem item;
1126         if (queryTime < missQueryTime) {
1127             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1128         } else if (queryTime == missQueryTime) {
1129             errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1130             if (errCode != E_OK) {
1131                 break;
1132             }
1133             errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1134         } else {
1135             errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1136         }
1137 
1138         if (errCode == E_OK && !isFirstTime) {
1139             errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1140         }
1141 
1142         if (errCode != E_OK) {
1143             break;
1144         }
1145 
1146         isFirstTime = false;
1147         if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1148             errCode = -E_FINISHED;
1149             break;
1150         }
1151     } while (true);
1152     LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1153         errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1154     int ret = E_OK;
1155     SQLiteUtils::ResetStatement(queryStmt, true, ret);
1156     SQLiteUtils::ResetStatement(fullStmt, true, ret);
1157     return errCode != E_OK ? errCode : ret;
1158 }
1159 
CheckDBModeForRelational()1160 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1161 {
1162     std::string journalMode;
1163     int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1164 
1165     for (auto &c : journalMode) { // convert to lowercase
1166         c = static_cast<char>(std::tolower(c));
1167     }
1168 
1169     if (errCode == E_OK && journalMode != "wal") {
1170         LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1171         return -E_NOT_SUPPORT;
1172     }
1173     return errCode;
1174 }
1175 
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1176 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1177     const std::string &tableName)
1178 {
1179     std::vector<std::string> deviceTables;
1180     int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1181     if (errCode != E_OK) {
1182         LOGE("Get device table name for alter table failed. %d", errCode);
1183         return errCode;
1184     }
1185 
1186     LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1187     for (const auto &table : deviceTables) {
1188         std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1189         errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1190         if (errCode != E_OK) {
1191             LOGE("Delete device data failed. %d", errCode);
1192             break;
1193         }
1194     }
1195     return errCode;
1196 }
1197 
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1198 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1199 {
1200     std::string deleteLogSql =
1201         "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1202         "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1203     return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1204 }
1205 
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1206 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1207     const std::string &tableName)
1208 {
1209     std::string deleteLogSql = "DELETE FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1210         "_log WHERE device = ?";
1211     sqlite3_stmt *deleteLogStmt = nullptr;
1212     int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1213     if (errCode != E_OK) {
1214         LOGE("Get delete device data log statement failed. %d", errCode);
1215         return errCode;
1216     }
1217 
1218     int ret = E_OK;
1219     errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1220     if (errCode != E_OK) {
1221         LOGE("Bind device to delete data log statement failed. %d", errCode);
1222         SQLiteUtils::ResetStatement(deleteLogStmt, true, ret);
1223         return errCode;
1224     }
1225 
1226     errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1227     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1228         errCode = E_OK;
1229     } else {
1230         LOGE("Delete data log failed. %d", errCode);
1231     }
1232 
1233     SQLiteUtils::ResetStatement(deleteLogStmt, true, ret);
1234     return errCode != E_OK ? errCode : ret;
1235 }
1236 
DeleteDistributedLogTable(const std::string & tableName)1237 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1238 {
1239     if (tableName.empty()) {
1240         return -E_INVALID_ARGS;
1241     }
1242     std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1243     std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1244     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1245     if (errCode != E_OK) {
1246         LOGE("Delete distributed log table failed. %d", errCode);
1247     }
1248     return errCode;
1249 }
1250 
IsTableOnceDropped(const std::string & tableName,bool & onceDropped)1251 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, bool &onceDropped)
1252 {
1253     std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1254     Key key;
1255     DBCommon::StringToVector(keyStr, key);
1256     Value value;
1257 
1258     int errCode = GetKvData(key, value);
1259     if (errCode == E_OK) {
1260         onceDropped = true;
1261         LOGW("[RDBExecutor] Table %s once dropped!", DBCommon::StringMiddleMasking(tableName).c_str());
1262         return E_OK;
1263     } else if (errCode == -E_NOT_FOUND) {
1264         onceDropped = false;
1265         return E_OK;
1266     }
1267     return errCode;
1268 }
1269 
CleanResourceForDroppedTable(const std::string & tableName)1270 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1271 {
1272     int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1273     if (errCode != E_OK) {
1274         LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1275         return errCode;
1276     }
1277     errCode = DeleteDistributedLogTable(tableName);
1278     if (errCode != E_OK) {
1279         LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1280         return errCode;
1281     }
1282     errCode = DeleteTableTrigger(tableName);
1283     if (errCode != E_OK) {
1284         LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1285     }
1286     return errCode;
1287 }
1288 
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1289 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1290     std::vector<std::string> &missingTables)
1291 {
1292     if (tableNames.empty()) {
1293         return E_OK;
1294     }
1295     const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1296     sqlite3_stmt *stmt = nullptr;
1297     int ret = E_OK;
1298     int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1299     if (errCode != E_OK) {
1300         SQLiteUtils::ResetStatement(stmt, true, ret);
1301         return errCode;
1302     }
1303     for (const auto &tableName : tableNames) {
1304         errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1305         if (errCode != E_OK) {
1306             LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1307             break;
1308         }
1309 
1310         errCode = SQLiteUtils::StepWithRetry(stmt, false);
1311         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1312             errCode = CleanResourceForDroppedTable(tableName);
1313             if (errCode != E_OK) {
1314                 LOGE("Clean resource for dropped table failed. %d", errCode);
1315                 break;
1316             }
1317             missingTables.emplace_back(tableName);
1318         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1319             LOGE("[CheckAndCleanDisTable]Check distributed table failed. %d", errCode);
1320             break;
1321         }
1322         errCode = E_OK; // Check result ok for distributed table is still exists
1323         SQLiteUtils::ResetStatement(stmt, false, ret);
1324     }
1325     SQLiteUtils::ResetStatement(stmt, true, ret);
1326     return CheckCorruptedStatus(errCode);
1327 }
1328 
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1329 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1330     const TableInfo &baseTbl, const StoreInfo &info)
1331 {
1332     if (dbHandle_ == nullptr) {
1333         return -E_INVALID_DB;
1334     }
1335 
1336     if (device.empty() || !baseTbl.IsValid()) {
1337         return -E_INVALID_ARGS;
1338     }
1339 
1340     std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1341     int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1342     if (errCode != E_OK) {
1343         LOGE("Create device table failed. %d", errCode);
1344         return errCode;
1345     }
1346 
1347     errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1348     if (errCode != E_OK) {
1349         LOGE("Copy index to device table failed. %d", errCode);
1350     }
1351     return errCode;
1352 }
1353 
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1354 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1355     const std::string &schemaVersion)
1356 {
1357     if (dbHandle_ == nullptr) {
1358         return -E_INVALID_DB;
1359     }
1360 
1361     TableInfo newTable;
1362     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1363     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1364         LOGE("Check new schema failed. %d", errCode);
1365         return errCode;
1366     } else {
1367         errCode = table.CompareWithTable(newTable, schemaVersion);
1368         if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1369             LOGE("Check schema failed, schema was changed. %d", errCode);
1370             return -E_DISTRIBUTED_SCHEMA_CHANGED;
1371         } else {
1372             errCode = E_OK;
1373         }
1374     }
1375 
1376     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1377     if (errCode != E_OK) {
1378         LOGE("Get query helper for check query failed. %d", errCode);
1379         return errCode;
1380     }
1381 
1382     if (!query.IsQueryForRelationalDB()) {
1383         LOGE("Not support for this query type.");
1384         return -E_NOT_SUPPORT;
1385     }
1386 
1387     SyncTimeRange defaultTimeRange;
1388     sqlite3_stmt *stmt = nullptr;
1389     errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1390         stmt);
1391     if (errCode != E_OK) {
1392         LOGE("Get query statement for check query failed. %d", errCode);
1393     }
1394 
1395     int ret = E_OK;
1396     SQLiteUtils::ResetStatement(stmt, true, ret);
1397     return errCode != E_OK ? errCode : ret;
1398 }
1399 
CheckQueryObjectLegal(const QuerySyncObject & query)1400 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1401 {
1402     if (dbHandle_ == nullptr) {
1403         return -E_INVALID_DB;
1404     }
1405     TableInfo newTable;
1406     int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1407     if (errCode != E_OK) {
1408         LOGE("Check new schema failed. %d", errCode);
1409     }
1410     return errCode;
1411 }
1412 
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1413 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1414     Timestamp &maxTimestamp) const
1415 {
1416     maxTimestamp = 0;
1417     int ret = E_OK;
1418     for (const auto &tableName : tableNames) {
1419         const std::string sql = "SELECT MAX(timestamp) FROM " + std::string(DBConstant::RELATIONAL_PREFIX) + tableName +
1420             "_log;";
1421         sqlite3_stmt *stmt = nullptr;
1422         int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1423         if (errCode != E_OK) {
1424             return errCode;
1425         }
1426         errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1427         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1428             maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1429             errCode = E_OK;
1430         }
1431         SQLiteUtils::ResetStatement(stmt, true, ret);
1432         if (errCode != E_OK) {
1433             maxTimestamp = 0;
1434             return errCode;
1435         }
1436     }
1437     return E_OK;
1438 }
1439 
SetLogTriggerStatus(bool status)1440 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1441 {
1442     return SQLiteRelationalUtils::SetLogTriggerStatus(dbHandle_, status);
1443 }
1444 
1445 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1446 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1447     std::vector<RelationalRowData *> &data)
1448 {
1449     size_t totalLength = 0;
1450     do {
1451         int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1452         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1453             return E_OK;
1454         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1455             LOGE("Get data by bind sql failed:%d", errCode);
1456             return errCode;
1457         }
1458 
1459         if (colNames.empty()) {
1460             SQLiteUtils::GetSelectCols(stmt, colNames);  // Get column names.
1461         }
1462         auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1463         if (relaRowData == nullptr) {
1464             LOGE("ExecuteQueryBySqlStmt OOM");
1465             return -E_OUT_OF_MEMORY;
1466         }
1467 
1468         auto dataSz = relaRowData->CalcLength();
1469         if (dataSz == 0) {  // invalid data
1470             delete relaRowData;
1471             relaRowData = nullptr;
1472             continue;
1473         }
1474 
1475         totalLength += static_cast<size_t>(dataSz);
1476         if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) {  // the set has been full
1477             delete relaRowData;
1478             relaRowData = nullptr;
1479             LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1480             return -E_REMOTE_OVER_SIZE;
1481         }
1482         data.push_back(relaRowData);
1483     } while (true);
1484     return E_OK;
1485 }
1486 }
1487 
1488 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1489 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1490     const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1491     std::vector<RelationalRowData *> &data)
1492 {
1493     int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1494     if (errCode != E_OK) {
1495         return errCode;
1496     }
1497 
1498     sqlite3_stmt *stmt = nullptr;
1499     errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1500     if (errCode != E_OK) {
1501         (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1502         return errCode;
1503     }
1504     ResFinalizer finalizer([this, &stmt, &errCode] {
1505         (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1506         SQLiteUtils::ResetStatement(stmt, true, errCode);
1507     });
1508     for (size_t i = 0; i < bindArgs.size(); ++i) {
1509         errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1510         if (errCode != E_OK) {
1511             return errCode;
1512         }
1513     }
1514     return GetRowDatas(stmt, isMemDb_, colNames, data);
1515 }
1516 
CheckEncryptedOrCorrupted() const1517 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1518 {
1519     if (dbHandle_ == nullptr) {
1520         return -E_INVALID_DB;
1521     }
1522 
1523     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1524     if (errCode != E_OK) {
1525         LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1526     }
1527     return errCode;
1528 }
1529 
GetExistsDeviceList(std::set<std::string> & devices) const1530 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1531 {
1532     return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1533         isMemDb_, devices);
1534 }
1535 
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,bool isCompensatedTask,std::vector<std::string> & cloudGid)1536 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1537     const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy,
1538     bool isCompensatedTask, std::vector<std::string> &cloudGid)
1539 {
1540     sqlite3_stmt *queryStmt = nullptr;
1541     int errCode = E_OK;
1542     SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1543     if (errCode != E_OK) {
1544         return errCode;
1545     }
1546     std::string sql = helper.GetGidRelationalCloudQuerySql(tableSchema_.fields, isCloudForcePushStrategy,
1547         isCompensatedTask);
1548     errCode = helper.GetCloudQueryStatement(false, dbHandle_, sql, queryStmt);
1549     if (errCode != E_OK) {
1550         return errCode;
1551     }
1552     do {
1553         errCode = SQLiteUtils::StepNext(queryStmt, isMemDb_);
1554         if (errCode != E_OK) {
1555             errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1556             break;
1557         }
1558         GetCloudGid(queryStmt, cloudGid);
1559     } while (errCode == E_OK);
1560     int resetStatementErrCode = E_OK;
1561     SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1562     queryStmt = nullptr;
1563     return (errCode == E_OK ? resetStatementErrCode : errCode);
1564 }
1565 
GetCloudDataForSync(const CloudUploadRecorder & uploadRecorder,sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize)1566 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(const CloudUploadRecorder &uploadRecorder,
1567     sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize)
1568 {
1569     VBucket log;
1570     VBucket extraLog;
1571     uint32_t preSize = totalSize;
1572     int64_t revisedTime = 0;
1573     int64_t invalidTime = 0;
1574     GetCloudLog(statement, log, totalSize, revisedTime, invalidTime);
1575     GetCloudExtraLog(statement, extraLog);
1576     if (revisedTime != 0) {
1577         Bytes hashKey = std::get<Bytes>(extraLog[CloudDbConstant::HASH_KEY]);
1578         cloudDataResult.revisedData.push_back({hashKey, revisedTime, invalidTime});
1579     }
1580 
1581     VBucket data;
1582     int64_t flag = 0;
1583     int errCode = CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::FLAG, extraLog, flag);
1584     if (errCode != E_OK) {
1585         return errCode;
1586     }
1587     if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1588         for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1589             Type cloudValue;
1590             errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1591                 tableSchema_.fields[cid].type, cid + STATUS_INDEX + 1, cloudValue);
1592             if (errCode != E_OK) {
1593                 return errCode;
1594             }
1595             SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1596             errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1597             if (errCode != E_OK) {
1598                 return errCode;
1599             }
1600         }
1601     }
1602 
1603     if (CloudStorageUtils::IsGetCloudDataContinue(stepNum, totalSize, maxUploadSize_, maxUploadCount_)) {
1604         errCode = CloudStorageUtils::IdentifyCloudType(uploadRecorder, cloudDataResult, data, log, extraLog);
1605     } else {
1606         errCode = -E_UNFINISHED;
1607     }
1608     if (errCode == -E_IGNORE_DATA) {
1609         errCode = E_OK;
1610         totalSize = preSize;
1611         stepNum--;
1612     }
1613     return errCode;
1614 }
1615 
SetLocalSchema(const RelationalSchemaObject & localSchema)1616 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1617 {
1618     localSchema_ = localSchema;
1619 }
1620 
CleanCloudDataOnLogTable(const std::string & logTableName,ClearMode mode)1621 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName, ClearMode mode)
1622 {
1623     std::string setFlag;
1624     if (mode == FLAG_ONLY && isLogicDelete_) {
1625         setFlag = SET_FLAG_CLEAN_WAIT_COMPENSATED_SYNC;
1626     } else {
1627         setFlag = SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC;
1628     }
1629     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " + setFlag +
1630         ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1631         SHARING_RESOURCE + " = '' " + "WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1632         CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1633     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1634     if (errCode != E_OK) {
1635         LOGE("clean cloud log failed, %d", errCode);
1636         return errCode;
1637     }
1638     cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1639     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1640     if (errCode != E_OK) {
1641         LOGE("delete cloud log failed, %d", errCode);
1642         return errCode;
1643     }
1644     // set all flag logout and data upload is not finished.
1645     cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG;
1646     if (mode == FLAG_ONLY) {
1647         cleanLogSql += " = flag | 0x800 & ~0x400;";
1648     } else {
1649         cleanLogSql += " = flag & ~0x400;";
1650     }
1651     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1652 }
1653 
ClearVersionOnLogTable(const std::string & logTableName)1654 int SQLiteSingleVerRelationalStorageExecutor::ClearVersionOnLogTable(const std::string &logTableName)
1655 {
1656     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + VERSION + " = '';";
1657     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1658 }
1659 
CleanUploadFinishedFlag(const std::string & tableName)1660 int SQLiteSingleVerRelationalStorageExecutor::CleanUploadFinishedFlag(const std::string &tableName)
1661 {
1662     // unset upload finished flag
1663     std::string cleanUploadFinishedSql = "UPDATE " + DBCommon::GetLogTableName(tableName) + " SET " +
1664         CloudDbConstant::FLAG + " = flag & ~0x400;";
1665     return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanUploadFinishedSql);
1666 }
1667 
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1668 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1669     const std::string &logTableName, const RelationalSchemaObject &localSchema)
1670 {
1671     std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1672         " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1673         ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD + " OR " + FLAG_IS_CLOUD_CONSISTENCY +
1674         "));";
1675     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1676     if (errCode != E_OK) {
1677         LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1678         return errCode;
1679     }
1680     std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + " OR " +
1681         FLAG_IS_CLOUD_CONSISTENCY + ";";
1682     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1683     if (errCode != E_OK) {
1684         LOGE("Failed to delete cloud data on log table, %d.", errCode);
1685         return errCode;
1686     }
1687     errCode = DoCleanAssetId(tableName, localSchema);
1688     if (errCode != E_OK) {
1689         LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1690         return errCode;
1691     }
1692     errCode = CleanCloudDataOnLogTable(logTableName, FLAG_AND_DATA);
1693     if (errCode != E_OK) {
1694         LOGE("Failed to clean gid on log table, %d.", errCode);
1695     }
1696     return errCode;
1697 }
1698 
GetFlagIsLocalCount(const std::string & logTableName,int32_t & count)1699 int SQLiteSingleVerRelationalStorageExecutor::GetFlagIsLocalCount(const std::string &logTableName, int32_t &count)
1700 {
1701     std::string sql = "SELECT count(*) from " + logTableName + " WHERE FLAG&0x02 = 0x02;";
1702     int errCode = SQLiteUtils::GetCountBySql(dbHandle_, sql, count);
1703     if (errCode != E_OK) {
1704         LOGW("[RDBExecutor] get mark local count from log table failed: %d", errCode);
1705     }
1706     return errCode;
1707 }
1708 
ChangeCloudDataFlagOnLogTable(const std::string & logTableName)1709 int SQLiteSingleVerRelationalStorageExecutor::ChangeCloudDataFlagOnLogTable(const std::string &logTableName)
1710 {
1711     std::string cleanLogSql = "UPDATE " + logTableName + " SET " + CloudDbConstant::FLAG + " = " +
1712         SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1713         CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '' " + "WHERE NOT " + FLAG_IS_CLOUD_CONSISTENCY + ";";
1714     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1715     if (errCode != E_OK) {
1716         LOGE("change cloud log flag failed, %d", errCode);
1717     }
1718     return errCode;
1719 }
1720 
SetDataOnUserTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1721 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnUserTableWithLogicDelete(const std::string &tableName,
1722     const std::string &logTableName)
1723 {
1724     UpdateCursorContext context;
1725     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1726     LOGI("removeData on userTable:%s length:%zu start and cursor is %llu.",
1727         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1728     errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1729     if (errCode != E_OK) {
1730         LOGE("Failed to create updateCursor func on userTable errCode=%d.", errCode);
1731         return errCode;
1732     }
1733     // data from cloud and not modified by local or consistency with cloud to flag logout
1734     std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " + SET_FLAG_WHEN_LOGOUT +
1735                       ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '', " +
1736                       SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1737                       " WHERE (CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND (" + FLAG_IS_CLOUD_CONSISTENCY + " OR " +
1738                       FLAG_IS_CLOUD + ") AND NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE
1739                       + "));";
1740     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1741     // here just clear updateCursor func, fail will not influence other function
1742     (void)CreateFuncUpdateCursor(context, nullptr);
1743     if (errCode != E_OK) {
1744         LOGE("Failed to change cloud data flag on userTable, %d.", errCode);
1745         return errCode;
1746     }
1747     // clear some column when data is logicDelete or physical delete
1748     sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1749           " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1750     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1751     if (errCode != E_OK) {
1752         LOGE("Failed to deal logic delete data flag on userTable, %d.", errCode);
1753         return errCode;
1754     }
1755     LOGI("removeData on userTable:%s length:%zu finish and cursor is %llu.",
1756         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1757     errCode = SetCursor(tableName, context.cursor);
1758     if (errCode != E_OK) {
1759         LOGE("set new cursor after removeData error %d.", errCode);
1760         return errCode;
1761     }
1762     return ChangeCloudDataFlagOnLogTable(logTableName);
1763 }
1764 
SetDataOnShareTableWithLogicDelete(const std::string & tableName,const std::string & logTableName)1765 int SQLiteSingleVerRelationalStorageExecutor::SetDataOnShareTableWithLogicDelete(const std::string &tableName,
1766     const std::string &logTableName)
1767 {
1768     UpdateCursorContext context;
1769     int errCode = SQLiteRelationalUtils::GetCursor(dbHandle_, tableName, context.cursor);
1770     LOGI("removeData on shareTable:%s length:%zu start and cursor is %llu.",
1771         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1772     errCode = CreateFuncUpdateCursor(context, &UpdateCursor);
1773     if (errCode != E_OK) {
1774         LOGE("Failed to create updateCursor func on shareTable errCode=%d.", errCode);
1775         return errCode;
1776     }
1777     std::string sql = "UPDATE '" + logTableName + "' SET " + CloudDbConstant::FLAG + " = " +
1778                       SET_FLAG_WHEN_LOGOUT_FOR_SHARE_TABLE + ", " + VERSION + " = '', " + DEVICE_FIELD + " = '', " +
1779                       CLOUD_GID_FIELD + " = '', " + SHARING_RESOURCE + " = '', " + UPDATE_CURSOR_SQL +
1780                       " WHERE (NOT (" + DATA_IS_DELETE + ") " + " AND NOT (" + FLAG_IS_LOGIC_DELETE + "));";
1781     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1782     // here just clear updateCursor func, fail will not influence other function
1783     (void)CreateFuncUpdateCursor(context, nullptr);
1784     if (errCode != E_OK) {
1785         LOGE("Failed to change cloud data flag on shareTable, %d.", errCode);
1786         return errCode;
1787     }
1788     // clear some column when data is logicDelete or physical delete
1789     sql = "UPDATE '" + logTableName + "' SET " + VERSION + " = '', " + DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD +
1790           " = '', " + SHARING_RESOURCE + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR (" + DATA_IS_DELETE + ");";
1791     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1792     if (errCode != E_OK) {
1793         LOGE("Failed to deal logic delete data flag on shareTable, %d.", errCode);
1794         return errCode;
1795     }
1796     LOGI("removeData on shareTable:%s length:%zu finish and cursor is %llu.",
1797         DBCommon::StringMiddleMasking(tableName).c_str(), tableName.size(), context.cursor);
1798     return SetCursor(tableName, context.cursor);
1799 }
1800 
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1801 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1802     std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1803 {
1804     sqlite3_stmt *selectStmt = nullptr;
1805     std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1806         " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '' AND data_key != '-1'";
1807     if (distinguishCloudFlag) {
1808         sql += " AND (";
1809         sql += FLAG_IS_CLOUD;
1810         sql += " OR ";
1811         sql += FLAG_IS_CLOUD_CONSISTENCY;
1812         sql += " )";
1813     }
1814     sql += ";";
1815     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1816     if (errCode != E_OK) {
1817         LOGE("Get select data_key statement failed, %d", errCode);
1818         return errCode;
1819     }
1820     do {
1821         errCode = SQLiteUtils::StepWithRetry(selectStmt);
1822         if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1823             dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1824         } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1825             LOGE("SQLite step failed when query log's data_key : %d", errCode);
1826             break;
1827         }
1828     } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1829     SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1830     return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1831 }
1832 
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1833 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1834     const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1835 {
1836     std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1837     if (opType == OpType::ONLY_UPDATE_GID) {
1838         updateLogSql += "cloud_gid = ?";
1839         updateColName.push_back(CloudDbConstant::GID_FIELD);
1840         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1841     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1842         updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1843         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1844     } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1845         updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1846         CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1847     }  else if (opType == OpType::UPDATE_TIMESTAMP) {
1848         updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1849             ", timestamp = ?, cloud_gid = '', version = '', sharing_resource = ''";
1850         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1851     } else if (opType == OpType::CLEAR_GID) {
1852         updateLogSql += "cloud_gid = '', version = '', sharing_resource = '', flag = flag & " +
1853             std::to_string(SET_FLAG_ZERO_MASK);
1854     } else if (opType == OpType::LOCKED_NOT_HANDLE) {
1855         updateLogSql += std::string(CloudDbConstant::TO_LOCAL_CHANGE) + ", cloud_gid = ?";
1856         updateColName.push_back(CloudDbConstant::GID_FIELD);
1857         updateLogSql += ", version = ?";
1858         updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1859     } else {
1860         updateLogSql += " device = 'cloud', timestamp = ?,";
1861         updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1862         if (opType == OpType::DELETE) {
1863             int errCode = GetCloudDeleteSql(tableSchema.name, updateLogSql);
1864             if (errCode != E_OK) {
1865                 return errCode;
1866             }
1867         } else {
1868             updateLogSql += GetUpdateDataFlagSql(vBucket) + ", cloud_gid = ?";
1869             updateColName.push_back(CloudDbConstant::GID_FIELD);
1870             CloudStorageUtils::AddUpdateColForShare(tableSchema, updateLogSql, updateColName);
1871         }
1872     }
1873 
1874     int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1875     if (errCode != E_OK) {
1876         return errCode;
1877     }
1878 
1879     errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1880     if (errCode != E_OK) {
1881         LOGE("Get update log statement failed when update cloud data, %d", errCode);
1882     }
1883     return errCode;
1884 }
1885 
GetLockStatusByGid(const std::string & tableName,const std::string & gid,LockStatus & status)1886 int SQLiteSingleVerRelationalStorageExecutor::GetLockStatusByGid(const std::string &tableName, const std::string &gid,
1887     LockStatus &status)
1888 {
1889     std::string sql = "select status from " + DBCommon::GetLogTableName(tableName) + " where cloud_gid = '" +
1890         gid + "';";
1891     sqlite3_stmt *stmt = nullptr;
1892     int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1893     if (errCode != E_OK) {
1894         LOGE("Get stmt failed when get lock status: %d", errCode);
1895         return errCode;
1896     }
1897     errCode = SQLiteUtils::StepWithRetry(stmt);
1898     if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1899         status = static_cast<LockStatus>(sqlite3_column_int(stmt, 0));
1900         errCode = E_OK;
1901     } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1902         LOGE("Not found lock status.");
1903         errCode = -E_NOT_FOUND;
1904     } else {
1905         LOGE("Get lock status failed: %d", errCode);
1906     }
1907     int resetRet;
1908     SQLiteUtils::ResetStatement(stmt, true, resetRet);
1909     return errCode;
1910 }
1911 
UpdateHashKey(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1912 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKey(DistributedTableMode mode, const TableInfo &tableInfo,
1913     TableSyncType syncType, const std::string &localIdentity)
1914 {
1915     if (tableInfo.GetIdentifyKey().size() == 1u && tableInfo.GetIdentifyKey().at(0) == "rowid") {
1916         return UpdateHashKeyWithOutPk(mode, tableInfo, syncType, localIdentity);
1917     }
1918     auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1919     auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
1920     std::string sql = "UPDATE OR REPLACE " + logName + " SET hash_key = hash_key || '_old' where data_key in " +
1921         "(select _rowid_ from '" + tableInfo.GetTableName() + "') AND device = '';";
1922     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1923     if (errCode != E_OK) {
1924         return errCode;
1925     }
1926     sql = "UPDATE OR REPLACE " + logName + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
1927         tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, "") + " AS hash_value " +
1928         "FROM '" + tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + logName +
1929         ".data_key AND device = '';";
1930     return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1931 }
1932 
UpdateHashKeyWithOutPk(DistributedTableMode mode,const TableInfo & tableInfo,TableSyncType syncType,const std::string & localIdentity)1933 int SQLiteSingleVerRelationalStorageExecutor::UpdateHashKeyWithOutPk(
1934     DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity)
1935 {
1936     auto tableManager = LogTableManagerFactory::GetTableManager(tableInfo, mode, syncType);
1937     auto logName = DBCommon::GetLogTableName(tableInfo.GetTableName());
1938     std::string extendStr = "_tmp";
1939     std::string tmpTable = logName + extendStr;
1940     std::string sql = tableManager->GetCreateRelationalLogTableSql(tableInfo, extendStr);
1941     int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1942     if (errCode != E_OK) {
1943         return errCode;
1944     }
1945     sql = "INSERT OR REPLACE INTO " + tmpTable + " SELECT * FROM " + logName + " WHERE data_key != -1;";
1946     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1947     if (errCode != E_OK) {
1948         return errCode;
1949     }
1950 
1951     sql = "UPDATE " + logName + " SET flag = flag | 0x01, data_key = -1 WHERE data_key IN " + "(SELECT _rowid_ FROM '" +
1952           tableInfo.GetTableName() + "');";
1953     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1954     if (errCode != E_OK) {
1955         return errCode;
1956     }
1957 
1958     sql = "UPDATE " + tmpTable + " SET hash_key = data.hash_value FROM (SELECT _rowid_, " +
1959           tableManager->CalcPrimaryKeyHash("dataTable.", tableInfo, localIdentity) + " AS hash_value " + "FROM '" +
1960           tableInfo.GetTableName() + "' AS dataTable) AS data WHERE data._rowid_ = " + tmpTable +
1961           ".data_key AND device = '';";
1962     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1963     if (errCode != E_OK) {
1964         return errCode;
1965     }
1966 
1967     sql = "INSERT OR REPLACE INTO " + logName + " SELECT * FROM " + tmpTable + ";";
1968     errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1969     if (errCode != E_OK) {
1970         return errCode;
1971     }
1972 
1973     sql = "DROP TABLE " + tmpTable + ";";
1974     return SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1975 }
1976 
SetTableMode(DistributedTableMode mode)1977 void SQLiteSingleVerRelationalStorageExecutor::SetTableMode(DistributedTableMode mode)
1978 {
1979     mode_ = mode;
1980 }
1981 } // namespace DistributedDB
1982 #endif
1983