1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "sqlite_single_ver_relational_storage_executor.h"
17
18 #include <algorithm>
19 #include <optional>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "data_transformer.h"
24 #include "db_common.h"
25 #include "log_table_manager_factory.h"
26 #include "relational_row_data_impl.h"
27 #include "res_finalizer.h"
28 #include "runtime_context.h"
29 #include "sqlite_meta_executor.h"
30 #include "sqlite_relational_utils.h"
31 #include "value_hash_calc.h"
32
33 namespace DistributedDB {
34 namespace {
35 static constexpr const char *ROWID = "ROWID";
36 static constexpr const char *TIMESTAMP = "TIMESTAMP";
37 static constexpr const char *FLAG = "FLAG";
38 static constexpr const char *DATAKEY = "DATA_KEY";
39 static constexpr const char *DEVICE_FIELD = "DEVICE";
40 static constexpr const char *CLOUD_GID_FIELD = "CLOUD_GID";
41 static constexpr const char *HASH_KEY = "HASH_KEY";
42 static constexpr const char *FLAG_IS_CLOUD = "FLAG & 0x02 = 0"; // see if 1th bit of a flag is cloud
43 // set 1th bit of flag to one which is local, clean 5th bit of flag to one which is wait compensated sync
44 static constexpr const char *SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC = "(FLAG | 0x02) & (~0x10)";
45 static constexpr const char *FLAG_IS_LOGIC_DELETE = "FLAG & 0x08 != 0"; // see if 3th bit of a flag is logic delete
46 static constexpr const char *DATA_IS_DELETE = "data_key = -1 AND FLAG & 0X08 = 0"; // see if data is delete
47 static constexpr const int SET_FLAG_ZERO_MASK = 0x0B; // clear 2th bit of flag 1011 use with &
48 static constexpr const int SET_FLAG_ONE_MASK = 0x04; // set 2th bit of flag 0100 use with |
49 static constexpr const int SET_CLOUD_FLAG = 0x0D; // set 1th bit of flag to 0 1101 use with &
50 static constexpr const int DATA_KEY_INDEX = 0;
51 static constexpr const int TIMESTAMP_INDEX = 3;
52 static constexpr const int W_TIMESTAMP_INDEX = 4;
53 static constexpr const int FLAG_INDEX = 5;
54 static constexpr const int HASH_KEY_INDEX = 6;
55 static constexpr const int CLOUD_GID_INDEX = 7;
56 static constexpr const int VERSION_INDEX = 8;
57
PermitSelect(void * a,int b,const char * c,const char * d,const char * e,const char * f)58 int PermitSelect(void *a, int b, const char *c, const char *d, const char *e, const char *f)
59 {
60 if (b != SQLITE_SELECT && b != SQLITE_READ && b != SQLITE_FUNCTION) {
61 return SQLITE_DENY;
62 }
63 return SQLITE_OK;
64 }
65 }
SQLiteSingleVerRelationalStorageExecutor(sqlite3 * dbHandle,bool writable,DistributedTableMode mode)66 SQLiteSingleVerRelationalStorageExecutor::SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable,
67 DistributedTableMode mode)
68 : SQLiteStorageExecutor(dbHandle, writable, false), mode_(mode), isLogicDelete_(false),
69 assetLoader_(nullptr), putDataMode_(PutDataMode::SYNC), markFlagOption_(MarkFlagOption::DEFAULT)
70 {
71 bindCloudFieldFuncMap_[TYPE_INDEX<int64_t>] = &CloudStorageUtils::BindInt64;
72 bindCloudFieldFuncMap_[TYPE_INDEX<bool>] = &CloudStorageUtils::BindBool;
73 bindCloudFieldFuncMap_[TYPE_INDEX<double>] = &CloudStorageUtils::BindDouble;
74 bindCloudFieldFuncMap_[TYPE_INDEX<std::string>] = &CloudStorageUtils::BindText;
75 bindCloudFieldFuncMap_[TYPE_INDEX<Bytes>] = &CloudStorageUtils::BindBlob;
76 bindCloudFieldFuncMap_[TYPE_INDEX<Asset>] = &CloudStorageUtils::BindAsset;
77 bindCloudFieldFuncMap_[TYPE_INDEX<Assets>] = &CloudStorageUtils::BindAsset;
78 }
79
CheckTableConstraint(const TableInfo & table,DistributedTableMode mode,TableSyncType syncType)80 int CheckTableConstraint(const TableInfo &table, DistributedTableMode mode, TableSyncType syncType)
81 {
82 std::string trimedSql = DBCommon::TrimSpace(table.GetCreateTableSql());
83 if (DBCommon::HasConstraint(trimedSql, "WITHOUT ROWID", " ),", " ,;")) {
84 LOGE("[CreateDistributedTable] Not support create distributed table without rowid.");
85 return -E_NOT_SUPPORT;
86 }
87 std::vector<FieldInfo> fieldInfos = table.GetFieldInfos();
88 for (const auto &field : fieldInfos) {
89 if (DBCommon::CaseInsensitiveCompare(field.GetFieldName(), std::string(DBConstant::SQLITE_INNER_ROWID))) {
90 LOGE("[CreateDistributedTable] Not support create distributed table with _rowid_ column.");
91 return -E_NOT_SUPPORT;
92 }
93 }
94
95 if (mode == DistributedTableMode::COLLABORATION || syncType == CLOUD_COOPERATION) {
96 if (DBCommon::HasConstraint(trimedSql, "CHECK", " ,", " (")) {
97 LOGE("[CreateDistributedTable] Not support create distributed table with 'CHECK' constraint.");
98 return -E_NOT_SUPPORT;
99 }
100
101 if (DBCommon::HasConstraint(trimedSql, "ON CONFLICT", " )", " ")) {
102 LOGE("[CreateDistributedTable] Not support create distributed table with 'ON CONFLICT' constraint.");
103 return -E_NOT_SUPPORT;
104 }
105
106 if (mode == DistributedTableMode::COLLABORATION) {
107 if (DBCommon::HasConstraint(trimedSql, "REFERENCES", " )", " ")) {
108 LOGE("[CreateDistributedTable] Not support create distributed table with 'FOREIGN KEY' constraint.");
109 return -E_NOT_SUPPORT;
110 }
111 }
112
113 if (syncType == CLOUD_COOPERATION) {
114 int errCode = CloudStorageUtils::ConstraintsCheckForCloud(table, trimedSql);
115 if (errCode != E_OK) {
116 LOGE("ConstraintsCheckForCloud failed, errCode = %d", errCode);
117 return errCode;
118 }
119 }
120 }
121
122 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && syncType == DEVICE_COOPERATION) {
123 if (table.GetPrimaryKey().size() > 1) {
124 LOGE("[CreateDistributedTable] Not support create distributed table with composite primary keys.");
125 return -E_NOT_SUPPORT;
126 }
127 }
128
129 return E_OK;
130 }
131
132 namespace {
GetExistedDataTimeOffset(sqlite3 * db,const std::string & tableName,bool isMem,int64_t & timeOffset)133 int GetExistedDataTimeOffset(sqlite3 *db, const std::string &tableName, bool isMem, int64_t &timeOffset)
134 {
135 std::string sql = "SELECT get_sys_time(0) - max(" + std::string(DBConstant::SQLITE_INNER_ROWID) + ") - 1 FROM '" +
136 tableName + "';";
137 sqlite3_stmt *stmt = nullptr;
138 int errCode = SQLiteUtils::GetStatement(db, sql, stmt);
139 if (errCode != E_OK) {
140 return errCode;
141 }
142 errCode = SQLiteUtils::StepWithRetry(stmt, isMem);
143 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
144 timeOffset = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
145 errCode = E_OK;
146 }
147 SQLiteUtils::ResetStatement(stmt, true, errCode);
148 return errCode;
149 }
150 }
151
GeneLogInfoForExistedData(sqlite3 * db,const std::string & tableName,const std::string & calPrimaryKeyHash,TableInfo & tableInfo)152 int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 *db, const std::string &tableName,
153 const std::string &calPrimaryKeyHash, TableInfo &tableInfo)
154 {
155 int64_t timeOffset = 0;
156 int errCode = GetExistedDataTimeOffset(db, tableName, isMemDb_, timeOffset);
157 if (errCode != E_OK) {
158 return errCode;
159 }
160 std::string timeOffsetStr = std::to_string(timeOffset);
161 std::string logTable = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
162 std::string sql = "INSERT OR REPLACE INTO " + logTable + " SELECT " + std::string(DBConstant::SQLITE_INNER_ROWID) +
163 ", '', '', " + timeOffsetStr + " + " + std::string(DBConstant::SQLITE_INNER_ROWID) + ", " +
164 timeOffsetStr + " + " + std::string(DBConstant::SQLITE_INNER_ROWID) + ", 0x2, " +
165 calPrimaryKeyHash + ", '', ";
166 if (tableInfo.GetTableSyncType() == TableSyncType::DEVICE_COOPERATION) {
167 sql += "'', ''";
168 } else {
169 sql += tableInfo.GetTrackerTable().GetExtendName().empty() ? "''" : tableInfo.GetTrackerTable().GetExtendName();
170 sql += ", case when (SELECT count(1)<>0 FROM " + logTable + ")" +
171 " then ((SELECT CASE WHEN MAX(cursor) IS NULL THEN 0 ELSE MAX(cursor) END FROM " + logTable + ") + " +
172 std::string(DBConstant::SQLITE_INNER_ROWID) +
173 ") ELSE " + std::string(DBConstant::SQLITE_INNER_ROWID) + " end";
174 }
175 sql += ", ''";
176 sql += " FROM '" + tableName + "' AS a WHERE 1=1;";
177 return SQLiteUtils::ExecuteRawSQL(db, sql);
178 }
179
CreateDistributedTable(DistributedTableMode mode,bool isUpgraded,const std::string & identity,TableInfo & table,TableSyncType syncType)180 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedTable(DistributedTableMode mode, bool isUpgraded,
181 const std::string &identity, TableInfo &table, TableSyncType syncType)
182 {
183 if (dbHandle_ == nullptr) {
184 return -E_INVALID_DB;
185 }
186
187 const std::string tableName = table.GetTableName();
188 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, table);
189 if (errCode != E_OK) {
190 LOGE("[CreateDistributedTable] analysis table schema failed. %d", errCode);
191 return errCode;
192 }
193
194 if (mode == DistributedTableMode::SPLIT_BY_DEVICE && !isUpgraded) {
195 bool isEmpty = false;
196 errCode = SQLiteUtils::CheckTableEmpty(dbHandle_, tableName, isEmpty);
197 if (errCode != E_OK) {
198 LOGE("[CreateDistributedTable] check table empty failed. error=%d", errCode);
199 return -E_NOT_SUPPORT;
200 }
201 if (!isEmpty) {
202 LOGW("[CreateDistributedTable] generate %.3s log for existed data, table type %d",
203 DBCommon::TransferStringToHex(DBCommon::TransferHashString(tableName)).c_str(),
204 static_cast<int>(syncType));
205 }
206 }
207
208 errCode = CheckTableConstraint(table, mode, syncType);
209 if (errCode != E_OK) {
210 LOGE("[CreateDistributedTable] check table constraint failed.");
211 return errCode;
212 }
213
214 // create log table
215 auto tableManager = LogTableManagerFactory::GetTableManager(mode, syncType);
216 errCode = tableManager->CreateRelationalLogTable(dbHandle_, table);
217 if (errCode != E_OK) {
218 LOGE("[CreateDistributedTable] create log table failed");
219 return errCode;
220 }
221
222 if (!isUpgraded) {
223 std::string calPrimaryKeyHash = tableManager->CalcPrimaryKeyHash("a.", table, identity);
224 errCode = GeneLogInfoForExistedData(dbHandle_, tableName, calPrimaryKeyHash, table);
225 }
226 if (errCode != E_OK) {
227 return errCode;
228 }
229
230 // add trigger
231 errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity);
232 if (errCode != E_OK) {
233 LOGE("[CreateDistributedTable] Add relational log table trigger failed.");
234 return errCode;
235 }
236 return SetLogTriggerStatus(true);
237 }
238
UpgradeDistributedTable(const std::string & tableName,DistributedTableMode mode,bool & schemaChanged,RelationalSchemaObject & schema,TableSyncType syncType)239 int SQLiteSingleVerRelationalStorageExecutor::UpgradeDistributedTable(const std::string &tableName,
240 DistributedTableMode mode, bool &schemaChanged, RelationalSchemaObject &schema, TableSyncType syncType)
241 {
242 if (dbHandle_ == nullptr) {
243 return -E_INVALID_DB;
244 }
245 TableInfo newTableInfo;
246 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, tableName, newTableInfo);
247 if (errCode != E_OK) {
248 LOGE("[UpgradeDistributedTable] analysis table schema failed. %d", errCode);
249 return errCode;
250 }
251
252 if (CheckTableConstraint(newTableInfo, mode, syncType)) {
253 LOGE("[UpgradeDistributedTable] Not support create distributed table when violate constraints.");
254 return -E_NOT_SUPPORT;
255 }
256
257 // new table should has same or compatible upgrade
258 TableInfo tableInfo = schema.GetTable(tableName);
259 errCode = tableInfo.CompareWithTable(newTableInfo, schema.GetSchemaVersion());
260 if (errCode == -E_RELATIONAL_TABLE_INCOMPATIBLE) {
261 LOGE("[UpgradeDistributedTable] Not support with incompatible upgrade.");
262 return -E_SCHEMA_MISMATCH;
263 } else if (errCode == -E_RELATIONAL_TABLE_EQUAL) {
264 LOGD("[UpgradeDistributedTable] schema has not changed.");
265 // update table if tableName changed
266 schema.RemoveRelationalTable(tableName);
267 tableInfo.SetTableName(tableName);
268 schema.AddRelationalTable(tableInfo);
269 return E_OK;
270 }
271
272 schemaChanged = true;
273 errCode = AlterAuxTableForUpgrade(tableInfo, newTableInfo);
274 if (errCode != E_OK) {
275 LOGE("[UpgradeDistributedTable] Alter aux table for upgrade failed. %d", errCode);
276 }
277
278 schema.AddRelationalTable(newTableInfo);
279 return errCode;
280 }
281
282 namespace {
GetDeviceTableName(sqlite3 * handle,const std::string & tableName,const std::string & device,std::vector<std::string> & deviceTables)283 int GetDeviceTableName(sqlite3 *handle, const std::string &tableName, const std::string &device,
284 std::vector<std::string> &deviceTables)
285 {
286 if (device.empty() && tableName.empty()) { // device and table name should not both be empty
287 return -E_INVALID_ARGS;
288 }
289 std::string devicePattern = device.empty() ? "%" : device;
290 std::string tablePattern = tableName.empty() ? "%" : tableName;
291 std::string deviceTableName = DBConstant::RELATIONAL_PREFIX + tablePattern + "_" + devicePattern;
292
293 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE '" +
294 deviceTableName + "';";
295 sqlite3_stmt *stmt = nullptr;
296 int errCode = SQLiteUtils::GetStatement(handle, checkSql, stmt);
297 if (errCode != E_OK) {
298 return errCode;
299 }
300
301 do {
302 errCode = SQLiteUtils::StepWithRetry(stmt, false);
303 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
304 errCode = E_OK;
305 break;
306 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
307 LOGE("Get table name failed. %d", errCode);
308 break;
309 }
310 std::string realTableName;
311 errCode = SQLiteUtils::GetColumnTextValue(stmt, 0, realTableName); // 0: table name result column index
312 if (errCode != E_OK || realTableName.empty()) { // sqlite might return a row with NULL
313 continue;
314 }
315 if (realTableName.rfind("_log") == (realTableName.length() - 4)) { // 4:suffix length of "_log"
316 continue;
317 }
318 deviceTables.emplace_back(realTableName);
319 } while (true);
320
321 SQLiteUtils::ResetStatement(stmt, true, errCode);
322 return errCode;
323 }
324
GetUpgradeFields(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)325 std::vector<FieldInfo> GetUpgradeFields(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
326 {
327 std::vector<FieldInfo> fields;
328 auto itOld = oldTableInfo.GetFields().begin();
329 auto itNew = newTableInfo.GetFields().begin();
330 for (; itNew != newTableInfo.GetFields().end(); itNew++) {
331 if (itOld == oldTableInfo.GetFields().end() || itOld->first != itNew->first) {
332 fields.emplace_back(itNew->second);
333 continue;
334 }
335 itOld++;
336 }
337 return fields;
338 }
339
UpgradeFields(sqlite3 * db,const std::vector<std::string> & tables,std::vector<FieldInfo> & fields)340 int UpgradeFields(sqlite3 *db, const std::vector<std::string> &tables, std::vector<FieldInfo> &fields)
341 {
342 if (db == nullptr) {
343 return -E_INVALID_ARGS;
344 }
345
346 std::sort(fields.begin(), fields.end(), [] (const FieldInfo &a, const FieldInfo &b) {
347 return a.GetColumnId()< b.GetColumnId();
348 });
349 int errCode = E_OK;
350 for (const auto &table : tables) {
351 for (const auto &field : fields) {
352 std::string alterSql = "ALTER TABLE " + table + " ADD '" + field.GetFieldName() + "' ";
353 alterSql += "'" + field.GetDataType() + "'";
354 alterSql += field.IsNotNull() ? " NOT NULL" : "";
355 alterSql += field.HasDefaultValue() ? " DEFAULT " + field.GetDefaultValue() : "";
356 alterSql += ";";
357 errCode = SQLiteUtils::ExecuteRawSQL(db, alterSql);
358 if (errCode != E_OK) {
359 LOGE("Alter table failed. %d", errCode);
360 break;
361 }
362 }
363 }
364 return errCode;
365 }
366
GetChangedIndexes(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)367 IndexInfoMap GetChangedIndexes(const TableInfo &oldTableInfo, const TableInfo &newTableInfo)
368 {
369 IndexInfoMap indexes;
370 auto itOld = oldTableInfo.GetIndexDefine().begin();
371 auto itNew = newTableInfo.GetIndexDefine().begin();
372 auto itOldEnd = oldTableInfo.GetIndexDefine().end();
373 auto itNewEnd = newTableInfo.GetIndexDefine().end();
374
375 while (itOld != itOldEnd && itNew != itNewEnd) {
376 if (itOld->first == itNew->first) {
377 if (itOld->second != itNew->second) {
378 indexes.insert({itNew->first, itNew->second});
379 }
380 itOld++;
381 itNew++;
382 } else if (itOld->first < itNew->first) {
383 indexes.insert({itOld->first, {}});
384 itOld++;
385 } else {
386 indexes.insert({itNew->first, itNew->second});
387 itNew++;
388 }
389 }
390
391 while (itOld != itOldEnd) {
392 indexes.insert({itOld->first, {}});
393 itOld++;
394 }
395
396 while (itNew != itNewEnd) {
397 indexes.insert({itNew->first, itNew->second});
398 itNew++;
399 }
400
401 return indexes;
402 }
403
UpgradeIndexes(sqlite3 * db,const std::vector<std::string> & tables,const IndexInfoMap & indexes)404 int UpgradeIndexes(sqlite3 *db, const std::vector<std::string> &tables, const IndexInfoMap &indexes)
405 {
406 if (db == nullptr) {
407 return -E_INVALID_ARGS;
408 }
409
410 int errCode = E_OK;
411 for (const auto &table : tables) {
412 for (const auto &index : indexes) {
413 if (index.first.empty()) {
414 continue;
415 }
416 std::string realIndexName = table + "_" + index.first;
417 std::string deleteIndexSql = "DROP INDEX IF EXISTS " + realIndexName;
418 errCode = SQLiteUtils::ExecuteRawSQL(db, deleteIndexSql);
419 if (errCode != E_OK) {
420 LOGE("Drop index failed. %d", errCode);
421 return errCode;
422 }
423
424 if (index.second.empty()) { // empty means drop index only
425 continue;
426 }
427
428 auto it = index.second.begin();
429 std::string indexDefine = *it++;
430 while (it != index.second.end()) {
431 indexDefine += ", " + *it++;
432 }
433 std::string createIndexSql = "CREATE INDEX IF NOT EXISTS " + realIndexName + " ON " + table +
434 "(" + indexDefine + ");";
435 errCode = SQLiteUtils::ExecuteRawSQL(db, createIndexSql);
436 if (errCode != E_OK) {
437 LOGE("Create index failed. %d", errCode);
438 break;
439 }
440 }
441 }
442 return errCode;
443 }
444 }
445
AlterAuxTableForUpgrade(const TableInfo & oldTableInfo,const TableInfo & newTableInfo)446 int SQLiteSingleVerRelationalStorageExecutor::AlterAuxTableForUpgrade(const TableInfo &oldTableInfo,
447 const TableInfo &newTableInfo)
448 {
449 std::vector<FieldInfo> upgradeFields = GetUpgradeFields(oldTableInfo, newTableInfo);
450 IndexInfoMap upgradeIndexes = GetChangedIndexes(oldTableInfo, newTableInfo);
451 std::vector<std::string> deviceTables;
452 int errCode = GetDeviceTableName(dbHandle_, oldTableInfo.GetTableName(), {}, deviceTables);
453 if (errCode != E_OK) {
454 LOGE("Get device table name for alter table failed. %d", errCode);
455 return errCode;
456 }
457
458 LOGD("Begin to alter table: upgrade fields[%zu], indexes[%zu], deviceTable[%zu]", upgradeFields.size(),
459 upgradeIndexes.size(), deviceTables.size());
460 errCode = UpgradeFields(dbHandle_, deviceTables, upgradeFields);
461 if (errCode != E_OK) {
462 LOGE("upgrade fields failed. %d", errCode);
463 return errCode;
464 }
465
466 errCode = UpgradeIndexes(dbHandle_, deviceTables, upgradeIndexes);
467 if (errCode != E_OK) {
468 LOGE("upgrade indexes failed. %d", errCode);
469 }
470
471 return errCode;
472 }
473
StartTransaction(TransactType type)474 int SQLiteSingleVerRelationalStorageExecutor::StartTransaction(TransactType type)
475 {
476 if (dbHandle_ == nullptr) {
477 LOGE("Begin transaction failed, dbHandle is null.");
478 return -E_INVALID_DB;
479 }
480 int errCode = SQLiteUtils::BeginTransaction(dbHandle_, type);
481 if (errCode != E_OK) {
482 LOGE("Begin transaction failed, errCode = %d", errCode);
483 }
484 return errCode;
485 }
486
Commit()487 int SQLiteSingleVerRelationalStorageExecutor::Commit()
488 {
489 if (dbHandle_ == nullptr) {
490 return -E_INVALID_DB;
491 }
492
493 return SQLiteUtils::CommitTransaction(dbHandle_);
494 }
495
Rollback()496 int SQLiteSingleVerRelationalStorageExecutor::Rollback()
497 {
498 if (dbHandle_ == nullptr) {
499 return -E_INVALID_DB;
500 }
501 int errCode = SQLiteUtils::RollbackTransaction(dbHandle_);
502 if (errCode != E_OK) {
503 LOGE("sqlite single ver storage executor rollback fail! errCode = [%d]", errCode);
504 }
505 return errCode;
506 }
507
SetTableInfo(const TableInfo & tableInfo)508 void SQLiteSingleVerRelationalStorageExecutor::SetTableInfo(const TableInfo &tableInfo)
509 {
510 table_ = tableInfo;
511 }
512
GetLogData(sqlite3_stmt * logStatement,LogInfo & logInfo)513 static int GetLogData(sqlite3_stmt *logStatement, LogInfo &logInfo)
514 {
515 logInfo.dataKey = sqlite3_column_int64(logStatement, 0); // 0 means dataKey index
516
517 std::vector<uint8_t> dev;
518 int errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 1, dev); // 1 means dev index
519 if (errCode != E_OK) {
520 return errCode;
521 }
522 logInfo.device = std::string(dev.begin(), dev.end());
523
524 std::vector<uint8_t> oriDev;
525 errCode = SQLiteUtils::GetColumnBlobValue(logStatement, 2, oriDev); // 2 means ori_dev index
526 if (errCode != E_OK) {
527 return errCode;
528 }
529 logInfo.originDev = std::string(oriDev.begin(), oriDev.end());
530 logInfo.timestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 3)); // 3 means timestamp index
531 logInfo.wTimestamp = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 4)); // 4 means w_timestamp index
532 logInfo.flag = static_cast<uint64_t>(sqlite3_column_int64(logStatement, 5)); // 5 means flag index
533 logInfo.flag &= (~DataItem::LOCAL_FLAG);
534 logInfo.flag &= (~DataItem::UPDATE_FLAG);
535 return SQLiteUtils::GetColumnBlobValue(logStatement, 6, logInfo.hashKey); // 6 means hashKey index
536 }
537
538 namespace {
GetCloudLog(sqlite3_stmt * logStatement,VBucket & logInfo,uint32_t & totalSize,bool isShared)539 void GetCloudLog(sqlite3_stmt *logStatement, VBucket &logInfo, uint32_t &totalSize, bool isShared)
540 {
541 logInfo.insert_or_assign(CloudDbConstant::MODIFY_FIELD,
542 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
543 logInfo.insert_or_assign(CloudDbConstant::CREATE_FIELD,
544 static_cast<int64_t>(sqlite3_column_int64(logStatement, W_TIMESTAMP_INDEX)));
545 totalSize += sizeof(int64_t) + sizeof(int64_t);
546 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) != nullptr) {
547 std::string cloudGid = reinterpret_cast<const std::string::value_type *>(
548 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
549 if (!cloudGid.empty()) {
550 logInfo.insert_or_assign(CloudDbConstant::GID_FIELD, cloudGid);
551 totalSize += cloudGid.size();
552 }
553 }
554 if (isShared) {
555 std::string version;
556 SQLiteUtils::GetColumnTextValue(logStatement, VERSION_INDEX, version);
557 logInfo.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
558 totalSize += version.size();
559 }
560 }
561
GetCloudExtraLog(sqlite3_stmt * logStatement,VBucket & flags)562 void GetCloudExtraLog(sqlite3_stmt *logStatement, VBucket &flags)
563 {
564 flags.insert_or_assign(ROWID,
565 static_cast<int64_t>(sqlite3_column_int64(logStatement, DATA_KEY_INDEX)));
566 flags.insert_or_assign(TIMESTAMP,
567 static_cast<int64_t>(sqlite3_column_int64(logStatement, TIMESTAMP_INDEX)));
568 flags.insert_or_assign(FLAG,
569 static_cast<int64_t>(sqlite3_column_int64(logStatement, FLAG_INDEX)));
570 Bytes hashKey;
571 (void)SQLiteUtils::GetColumnBlobValue(logStatement, HASH_KEY_INDEX, hashKey);
572 flags.insert_or_assign(HASH_KEY, hashKey);
573 }
574
IsAbnormalData(const VBucket & data)575 bool IsAbnormalData(const VBucket &data)
576 {
577 for (const auto &item : data) {
578 const Asset *asset = std::get_if<TYPE_INDEX<Asset>>(&item.second);
579 if (asset != nullptr) {
580 if (asset->status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
581 (asset->status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
582 return true;
583 }
584 continue;
585 }
586 const Assets *assets = std::get_if<TYPE_INDEX<Assets>>(&item.second);
587 if (assets == nullptr) {
588 continue;
589 }
590 for (const auto &oneAsset : *assets) {
591 if (oneAsset.status == static_cast<uint32_t>(AssetStatus::ABNORMAL) ||
592 (oneAsset.status & static_cast<uint32_t>(AssetStatus::DOWNLOAD_WITH_NULL)) != 0) {
593 return true;
594 }
595 }
596 }
597 return false;
598 }
599
IdentifyCloudType(CloudSyncData & cloudSyncData,VBucket & data,VBucket & log,VBucket & flags)600 int IdentifyCloudType(CloudSyncData &cloudSyncData, VBucket &data, VBucket &log, VBucket &flags)
601 {
602 int64_t *rowid = std::get_if<int64_t>(&flags[ROWID]);
603 int64_t *flag = std::get_if<int64_t>(&flags[FLAG]);
604 int64_t *timeStamp = std::get_if<int64_t>(&flags[TIMESTAMP]);
605 Bytes *hashKey = std::get_if<Bytes>(&flags[HASH_KEY]);
606 if (rowid == nullptr || flag == nullptr || timeStamp == nullptr || hashKey == nullptr) {
607 return -E_INVALID_DATA;
608 }
609 if ((static_cast<uint64_t>(*flag) & DataItem::DELETE_FLAG) != 0) {
610 cloudSyncData.delData.record.push_back(data);
611 cloudSyncData.delData.extend.push_back(log);
612 cloudSyncData.delData.hashKey.push_back(*hashKey);
613 } else {
614 bool isInsert = log.find(CloudDbConstant::GID_FIELD) == log.end();
615 if (data.empty()) {
616 LOGE("The cloud data is empty, isInsert:%d", isInsert);
617 return -E_INVALID_DATA;
618 }
619 if (IsAbnormalData(data)) {
620 LOGW("This data is abnormal, ignore it when upload, isInsert:%d", isInsert);
621 cloudSyncData.ignoredCount++;
622 return -E_IGNORE_DATA;
623 }
624 CloudSyncBatch &opData = isInsert ? cloudSyncData.insData : cloudSyncData.updData;
625 opData.record.push_back(data);
626 opData.rowid.push_back(*rowid);
627 VBucket asset;
628 CloudStorageUtils::ObtainAssetFromVBucket(data, asset);
629 opData.timestamp.push_back(*timeStamp);
630 opData.assets.push_back(asset);
631 opData.extend.push_back(log);
632 opData.hashKey.push_back(*hashKey);
633 }
634 return E_OK;
635 }
636
GetCloudGid(sqlite3_stmt * logStatement,std::vector<std::string> & cloudGid)637 void GetCloudGid(sqlite3_stmt *logStatement, std::vector<std::string> &cloudGid)
638 {
639 if (sqlite3_column_text(logStatement, CLOUD_GID_INDEX) == nullptr) {
640 return;
641 }
642 std::string gid = reinterpret_cast<const std::string::value_type *>(
643 sqlite3_column_text(logStatement, CLOUD_GID_INDEX));
644 if (gid.empty()) {
645 LOGW("[Relational] Get cloud gid is null.");
646 return;
647 }
648 cloudGid.emplace_back(gid);
649 }
650 }
651
GetDataItemSerialSize(DataItem & item,size_t appendLen)652 static size_t GetDataItemSerialSize(DataItem &item, size_t appendLen)
653 {
654 // timestamp and local flag: 3 * uint64_t, version(uint32_t), key, value, origin dev and the padding size.
655 // the size would not be very large.
656 static const size_t maxOrigDevLength = 40;
657 size_t devLength = std::max(maxOrigDevLength, item.origDev.size());
658 size_t dataSize = (Parcel::GetUInt64Len() * 3 + Parcel::GetUInt32Len() + Parcel::GetVectorCharLen(item.key) +
659 Parcel::GetVectorCharLen(item.value) + devLength + appendLen);
660 return dataSize;
661 }
662
GetKvData(const Key & key,Value & value) const663 int SQLiteSingleVerRelationalStorageExecutor::GetKvData(const Key &key, Value &value) const
664 {
665 static const std::string SELECT_META_VALUE_SQL = "SELECT value FROM " + DBConstant::RELATIONAL_PREFIX +
666 "metadata WHERE key=?;";
667 sqlite3_stmt *statement = nullptr;
668 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_META_VALUE_SQL, statement);
669 if (errCode != E_OK) {
670 goto END;
671 }
672
673 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
674 if (errCode != E_OK) {
675 goto END;
676 }
677
678 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
679 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
680 errCode = -E_NOT_FOUND;
681 goto END;
682 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
683 goto END;
684 }
685
686 errCode = SQLiteUtils::GetColumnBlobValue(statement, 0, value); // only one result.
687 END:
688 SQLiteUtils::ResetStatement(statement, true, errCode);
689 return errCode;
690 }
691
PutKvData(const Key & key,const Value & value) const692 int SQLiteSingleVerRelationalStorageExecutor::PutKvData(const Key &key, const Value &value) const
693 {
694 static const std::string INSERT_META_SQL = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX +
695 "metadata VALUES(?,?);";
696 sqlite3_stmt *statement = nullptr;
697 int errCode = SQLiteUtils::GetStatement(dbHandle_, INSERT_META_SQL, statement);
698 if (errCode != E_OK) {
699 return errCode;
700 }
701
702 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // 1 means key index
703 if (errCode != E_OK) {
704 LOGE("[SingleVerExe][BindPutKv]Bind key error:%d", errCode);
705 goto ERROR;
706 }
707
708 errCode = SQLiteUtils::BindBlobToStatement(statement, 2, value, true); // 2 means value index
709 if (errCode != E_OK) {
710 LOGE("[SingleVerExe][BindPutKv]Bind value error:%d", errCode);
711 goto ERROR;
712 }
713 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
714 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
715 errCode = E_OK;
716 }
717 ERROR:
718 SQLiteUtils::ResetStatement(statement, true, errCode);
719 return errCode;
720 }
721
DeleteMetaData(const std::vector<Key> & keys) const722 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaData(const std::vector<Key> &keys) const
723 {
724 static const std::string REMOVE_META_VALUE_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
725 "metadata WHERE key=?;";
726 sqlite3_stmt *statement = nullptr;
727 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_SQL, statement);
728 if (errCode != E_OK) {
729 return errCode;
730 }
731
732 for (const auto &key : keys) {
733 errCode = SQLiteUtils::BindBlobToStatement(statement, 1, key, false); // first arg.
734 if (errCode != E_OK) {
735 break;
736 }
737
738 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
739 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
740 break;
741 }
742 errCode = E_OK;
743 SQLiteUtils::ResetStatement(statement, false, errCode);
744 }
745 SQLiteUtils::ResetStatement(statement, true, errCode);
746 return CheckCorruptedStatus(errCode);
747 }
748
DeleteMetaDataByPrefixKey(const Key & keyPrefix) const749 int SQLiteSingleVerRelationalStorageExecutor::DeleteMetaDataByPrefixKey(const Key &keyPrefix) const
750 {
751 static const std::string REMOVE_META_VALUE_BY_KEY_PREFIX_SQL = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX +
752 "metadata WHERE key>=? AND key<=?;";
753 sqlite3_stmt *statement = nullptr;
754 int errCode = SQLiteUtils::GetStatement(dbHandle_, REMOVE_META_VALUE_BY_KEY_PREFIX_SQL, statement);
755 if (errCode != E_OK) {
756 return errCode;
757 }
758
759 errCode = SQLiteUtils::BindPrefixKey(statement, 1, keyPrefix); // 1 is first arg.
760 if (errCode == E_OK) {
761 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
762 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
763 errCode = E_OK;
764 }
765 }
766 SQLiteUtils::ResetStatement(statement, true, errCode);
767 return CheckCorruptedStatus(errCode);
768 }
769
GetAllMetaKeys(std::vector<Key> & keys) const770 int SQLiteSingleVerRelationalStorageExecutor::GetAllMetaKeys(std::vector<Key> &keys) const
771 {
772 static const std::string SELECT_ALL_META_KEYS = "SELECT key FROM " + DBConstant::RELATIONAL_PREFIX + "metadata;";
773 sqlite3_stmt *statement = nullptr;
774 int errCode = SQLiteUtils::GetStatement(dbHandle_, SELECT_ALL_META_KEYS, statement);
775 if (errCode != E_OK) {
776 LOGE("[Relational][GetAllKey] Get statement failed:%d", errCode);
777 return errCode;
778 }
779 errCode = SqliteMetaExecutor::GetAllKeys(statement, isMemDb_, keys);
780 SQLiteUtils::ResetStatement(statement, true, errCode);
781 return errCode;
782 }
783
GetLogInfoPre(sqlite3_stmt * queryStmt,const DataItem & dataItem,LogInfo & logInfoGet)784 int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoPre(sqlite3_stmt *queryStmt, const DataItem &dataItem,
785 LogInfo &logInfoGet)
786 {
787 if (queryStmt == nullptr) {
788 return -E_INVALID_ARGS;
789 }
790 int errCode = SQLiteUtils::BindBlobToStatement(queryStmt, 1, dataItem.hashKey); // 1 means hashkey index.
791 if (errCode != E_OK) {
792 return errCode;
793 }
794 if (mode_ != DistributedTableMode::COLLABORATION) {
795 errCode = SQLiteUtils::BindTextToStatement(queryStmt, 2, dataItem.dev); // 2 means device index.
796 if (errCode != E_OK) {
797 return errCode;
798 }
799 }
800
801 errCode = SQLiteUtils::StepWithRetry(queryStmt, isMemDb_);
802 if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
803 errCode = -E_NOT_FOUND;
804 } else {
805 errCode = GetLogData(queryStmt, logInfoGet);
806 }
807 return errCode;
808 }
809
SaveSyncLog(sqlite3_stmt * statement,sqlite3_stmt * queryStmt,const DataItem & dataItem,int64_t rowid)810 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncLog(sqlite3_stmt *statement, sqlite3_stmt *queryStmt,
811 const DataItem &dataItem, int64_t rowid)
812 {
813 LogInfo logInfoGet;
814 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
815 LogInfo logInfoBind;
816 logInfoBind.hashKey = dataItem.hashKey;
817 logInfoBind.device = dataItem.dev;
818 logInfoBind.timestamp = dataItem.timestamp;
819 logInfoBind.flag = dataItem.flag;
820
821 if (errCode == -E_NOT_FOUND) { // insert
822 logInfoBind.wTimestamp = dataItem.writeTimestamp;
823 logInfoBind.originDev = dataItem.dev;
824 } else if (errCode == E_OK) { // update
825 logInfoBind.wTimestamp = logInfoGet.wTimestamp;
826 logInfoBind.originDev = logInfoGet.originDev;
827 } else {
828 return errCode;
829 }
830
831 // bind
832 SQLiteUtils::BindInt64ToStatement(statement, 1, rowid); // 1 means dataKey index
833 std::vector<uint8_t> originDev(logInfoBind.originDev.begin(), logInfoBind.originDev.end());
834 SQLiteUtils::BindBlobToStatement(statement, 2, originDev); // 2 means ori_dev index
835 SQLiteUtils::BindInt64ToStatement(statement, 3, logInfoBind.timestamp); // 3 means timestamp index
836 SQLiteUtils::BindInt64ToStatement(statement, 4, logInfoBind.wTimestamp); // 4 means w_timestamp index
837 SQLiteUtils::BindInt64ToStatement(statement, 5, logInfoBind.flag); // 5 means flag index
838 SQLiteUtils::BindBlobToStatement(statement, 6, logInfoBind.hashKey); // 6 means hashKey index
839 errCode = SQLiteUtils::StepWithRetry(statement, isMemDb_);
840 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
841 return E_OK;
842 }
843 return errCode;
844 }
845
DeleteSyncDataItem(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)846 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncDataItem(const DataItem &dataItem,
847 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
848 {
849 if (stmt == nullptr) {
850 int errCode = inserter.GetDeleteSyncDataStmt(dbHandle_, stmt);
851 if (errCode != E_OK) {
852 LOGE("[DeleteSyncDataItem] Get statement fail!, errCode:%d", errCode);
853 return errCode;
854 }
855 }
856
857 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hash_key index
858 if (errCode != E_OK) {
859 SQLiteUtils::ResetStatement(stmt, true, errCode);
860 return errCode;
861 }
862 if (mode_ != DistributedTableMode::COLLABORATION) {
863 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
864 if (errCode != E_OK) {
865 SQLiteUtils::ResetStatement(stmt, true, errCode);
866 return errCode;
867 }
868 }
869 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
870 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
871 errCode = E_OK;
872 }
873 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
874 return errCode;
875 }
876
SaveSyncDataItem(const DataItem & dataItem,SaveSyncDataStmt & saveStmt,RelationalSyncDataInserter & inserter,int64_t & rowid)877 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(const DataItem &dataItem, SaveSyncDataStmt &saveStmt,
878 RelationalSyncDataInserter &inserter, int64_t &rowid)
879 {
880 if ((dataItem.flag & DataItem::DELETE_FLAG) != 0) {
881 return DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
882 }
883 if ((mode_ == DistributedTableMode::COLLABORATION && inserter.GetLocalTable().GetIdentifyKey().size() == 1u &&
884 inserter.GetLocalTable().GetIdentifyKey().at(0) == "rowid") ||
885 (mode_ == DistributedTableMode::SPLIT_BY_DEVICE && inserter.GetLocalTable().GetPrimaryKey().size() == 1u &&
886 inserter.GetLocalTable().GetPrimaryKey().at(0) == "rowid") ||
887 inserter.GetLocalTable().GetAutoIncrement()) { // No primary key of auto increment
888 int errCode = DeleteSyncDataItem(dataItem, inserter, saveStmt.rmDataStmt);
889 if (errCode != E_OK) {
890 LOGE("Delete no pk data before insert failed, errCode=%d.", errCode);
891 return errCode;
892 }
893 }
894
895 int errCode = inserter.BindInsertStatement(saveStmt.saveDataStmt, dataItem);
896 if (errCode != E_OK) {
897 LOGE("Bind data failed, errCode=%d.", errCode);
898 return errCode;
899 }
900 errCode = SQLiteUtils::StepWithRetry(saveStmt.saveDataStmt, isMemDb_);
901 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
902 rowid = SQLiteUtils::GetLastRowId(dbHandle_);
903 errCode = E_OK;
904 }
905 return errCode;
906 }
907
DeleteSyncLog(const DataItem & dataItem,RelationalSyncDataInserter & inserter,sqlite3_stmt * & stmt)908 int SQLiteSingleVerRelationalStorageExecutor::DeleteSyncLog(const DataItem &dataItem,
909 RelationalSyncDataInserter &inserter, sqlite3_stmt *&stmt)
910 {
911 if (stmt == nullptr) {
912 int errCode = inserter.GetDeleteLogStmt(dbHandle_, stmt);
913 if (errCode != E_OK) {
914 LOGE("[DeleteSyncLog] Get statement fail!");
915 return errCode;
916 }
917 }
918
919 int errCode = SQLiteUtils::BindBlobToStatement(stmt, 1, dataItem.hashKey); // 1 means hashkey index
920 if (errCode != E_OK) {
921 SQLiteUtils::ResetStatement(stmt, true, errCode);
922 return errCode;
923 }
924 if (mode_ != DistributedTableMode::COLLABORATION) {
925 errCode = SQLiteUtils::BindTextToStatement(stmt, 2, dataItem.dev); // 2 means device index
926 if (errCode != E_OK) {
927 SQLiteUtils::ResetStatement(stmt, true, errCode);
928 return errCode;
929 }
930 }
931 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
932 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
933 errCode = E_OK;
934 }
935 SQLiteUtils::ResetStatement(stmt, false, errCode); // Finalize outside.
936 return errCode;
937 }
938
ProcessMissQueryData(const DataItem & item,RelationalSyncDataInserter & inserter,sqlite3_stmt * & rmDataStmt,sqlite3_stmt * & rmLogStmt)939 int SQLiteSingleVerRelationalStorageExecutor::ProcessMissQueryData(const DataItem &item,
940 RelationalSyncDataInserter &inserter, sqlite3_stmt *&rmDataStmt, sqlite3_stmt *&rmLogStmt)
941 {
942 int errCode = DeleteSyncDataItem(item, inserter, rmDataStmt);
943 if (errCode != E_OK) {
944 return errCode;
945 }
946 return DeleteSyncLog(item, inserter, rmLogStmt);
947 }
948
GetSyncDataPre(const DataItem & dataItem,sqlite3_stmt * queryStmt,DataItem & itemGet)949 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataPre(const DataItem &dataItem, sqlite3_stmt *queryStmt,
950 DataItem &itemGet)
951 {
952 LogInfo logInfoGet;
953 int errCode = GetLogInfoPre(queryStmt, dataItem, logInfoGet);
954 itemGet.timestamp = logInfoGet.timestamp;
955 SQLiteUtils::ResetStatement(queryStmt, false, errCode);
956 return errCode;
957 }
958
CheckDataConflictDefeated(const DataItem & dataItem,sqlite3_stmt * queryStmt,bool & isDefeated)959 int SQLiteSingleVerRelationalStorageExecutor::CheckDataConflictDefeated(const DataItem &dataItem,
960 sqlite3_stmt *queryStmt, bool &isDefeated)
961 {
962 if ((dataItem.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != DataItem::REMOTE_DEVICE_DATA_MISS_QUERY &&
963 mode_ == DistributedTableMode::SPLIT_BY_DEVICE) {
964 isDefeated = false; // no need to solve conflict except miss query data
965 return E_OK;
966 }
967
968 DataItem itemGet;
969 int errCode = GetSyncDataPre(dataItem, queryStmt, itemGet);
970 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
971 LOGE("Failed to get raw data. %d", errCode);
972 return errCode;
973 }
974 isDefeated = (dataItem.timestamp <= itemGet.timestamp); // defeated if item timestamp is earlier then raw data
975 return E_OK;
976 }
977
SaveSyncDataItem(RelationalSyncDataInserter & inserter,SaveSyncDataStmt & saveStmt,DataItem & item)978 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItem(RelationalSyncDataInserter &inserter,
979 SaveSyncDataStmt &saveStmt, DataItem &item)
980 {
981 bool isDefeated = false;
982 int errCode = CheckDataConflictDefeated(item, saveStmt.queryStmt, isDefeated);
983 if (errCode != E_OK) {
984 LOGE("check data conflict failed. %d", errCode);
985 return errCode;
986 }
987
988 if (isDefeated) {
989 LOGD("Data was defeated.");
990 return E_OK;
991 }
992 if ((item.flag & DataItem::REMOTE_DEVICE_DATA_MISS_QUERY) != 0) {
993 return ProcessMissQueryData(item, inserter, saveStmt.rmDataStmt, saveStmt.rmLogStmt);
994 }
995 int64_t rowid = -1;
996 errCode = SaveSyncDataItem(item, saveStmt, inserter, rowid);
997 if (errCode == E_OK || errCode == -E_NOT_FOUND) {
998 errCode = SaveSyncLog(saveStmt.saveLogStmt, saveStmt.queryStmt, item, rowid);
999 }
1000 return errCode;
1001 }
1002
SaveSyncDataItems(RelationalSyncDataInserter & inserter)1003 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncDataItems(RelationalSyncDataInserter &inserter)
1004 {
1005 SaveSyncDataStmt saveStmt;
1006 int errCode = inserter.PrepareStatement(dbHandle_, saveStmt);
1007 if (errCode != E_OK) {
1008 LOGE("Prepare insert sync data statement failed.");
1009 return errCode;
1010 }
1011
1012 errCode = inserter.Iterate([this, &saveStmt, &inserter] (DataItem &item) -> int {
1013 if (item.neglect) { // Do not save this record if it is neglected
1014 return E_OK;
1015 }
1016 int errCode = SaveSyncDataItem(inserter, saveStmt, item);
1017 if (errCode != E_OK) {
1018 LOGE("save sync data item failed. err=%d", errCode);
1019 return errCode;
1020 }
1021 // Need not reset rmDataStmt and rmLogStmt here.
1022 return saveStmt.ResetStatements(false);
1023 });
1024
1025 int ret = saveStmt.ResetStatements(true);
1026 return errCode != E_OK ? errCode : ret;
1027 }
1028
SaveSyncItems(RelationalSyncDataInserter & inserter,bool useTrans)1029 int SQLiteSingleVerRelationalStorageExecutor::SaveSyncItems(RelationalSyncDataInserter &inserter, bool useTrans)
1030 {
1031 if (useTrans) {
1032 int errCode = StartTransaction(TransactType::IMMEDIATE);
1033 if (errCode != E_OK) {
1034 return errCode;
1035 }
1036 }
1037
1038 int errCode = SetLogTriggerStatus(false);
1039 if (errCode != E_OK) {
1040 goto END;
1041 }
1042
1043 errCode = SaveSyncDataItems(inserter);
1044 if (errCode != E_OK) {
1045 LOGE("Save sync data items failed. errCode=%d", errCode);
1046 goto END;
1047 }
1048
1049 errCode = SetLogTriggerStatus(true);
1050 END:
1051 if (useTrans) {
1052 if (errCode == E_OK) {
1053 errCode = Commit();
1054 } else {
1055 (void)Rollback(); // Keep the error code of the first scene
1056 }
1057 }
1058 return errCode;
1059 }
1060
GetDataItemForSync(sqlite3_stmt * stmt,DataItem & dataItem,bool isGettingDeletedData) const1061 int SQLiteSingleVerRelationalStorageExecutor::GetDataItemForSync(sqlite3_stmt *stmt, DataItem &dataItem,
1062 bool isGettingDeletedData) const
1063 {
1064 RowDataWithLog data;
1065 int errCode = GetLogData(stmt, data.logInfo);
1066 if (errCode != E_OK) {
1067 LOGE("relational data value transfer to kv fail");
1068 return errCode;
1069 }
1070
1071 if (!isGettingDeletedData) {
1072 for (size_t cid = 0; cid < table_.GetFields().size(); ++cid) {
1073 DataValue value;
1074 errCode = SQLiteRelationalUtils::GetDataValueByType(stmt, cid + DBConstant::RELATIONAL_LOG_TABLE_FIELD_NUM,
1075 value);
1076 if (errCode != E_OK) {
1077 return errCode;
1078 }
1079 data.rowData.push_back(std::move(value)); // sorted by cid
1080 }
1081 }
1082
1083 errCode = DataTransformer::SerializeDataItem(data,
1084 isGettingDeletedData ? std::vector<FieldInfo>() : table_.GetFieldInfos(), dataItem);
1085 if (errCode != E_OK) {
1086 LOGE("relational data value transfer to kv fail");
1087 }
1088 return errCode;
1089 }
1090
GetMissQueryData(sqlite3_stmt * fullStmt,DataItem & item)1091 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryData(sqlite3_stmt *fullStmt, DataItem &item)
1092 {
1093 int errCode = GetDataItemForSync(fullStmt, item, false);
1094 if (errCode != E_OK) {
1095 return errCode;
1096 }
1097 item.value = {};
1098 item.flag |= DataItem::REMOTE_DEVICE_DATA_MISS_QUERY;
1099 return E_OK;
1100 }
1101
1102 namespace {
StepNext(bool isMemDB,sqlite3_stmt * stmt,Timestamp & timestamp)1103 int StepNext(bool isMemDB, sqlite3_stmt *stmt, Timestamp ×tamp)
1104 {
1105 if (stmt == nullptr) {
1106 return -E_INVALID_ARGS;
1107 }
1108 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDB);
1109 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1110 timestamp = INT64_MAX;
1111 errCode = E_OK;
1112 } else if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1113 timestamp = static_cast<uint64_t>(sqlite3_column_int64(stmt, 3)); // 3 means timestamp index
1114 errCode = E_OK;
1115 }
1116 return errCode;
1117 }
1118
AppendData(const DataSizeSpecInfo & sizeInfo,size_t appendLength,size_t & overLongSize,size_t & dataTotalSize,std::vector<DataItem> & dataItems,DataItem && item)1119 int AppendData(const DataSizeSpecInfo &sizeInfo, size_t appendLength, size_t &overLongSize, size_t &dataTotalSize,
1120 std::vector<DataItem> &dataItems, DataItem &&item)
1121 {
1122 // If one record is over 4M, ignore it.
1123 if (item.value.size() > DBConstant::MAX_VALUE_SIZE) {
1124 overLongSize++;
1125 } else {
1126 // If dataTotalSize value is bigger than blockSize value , reserve the surplus data item.
1127 dataTotalSize += GetDataItemSerialSize(item, appendLength);
1128 if ((dataTotalSize > sizeInfo.blockSize && !dataItems.empty()) || dataItems.size() >= sizeInfo.packetSize) {
1129 return -E_UNFINISHED;
1130 } else {
1131 dataItems.push_back(item);
1132 }
1133 }
1134 return E_OK;
1135 }
1136 }
1137
GetQueryDataAndStepNext(bool isFirstTime,bool isGettingDeletedData,sqlite3_stmt * queryStmt,DataItem & item,Timestamp & queryTime)1138 int SQLiteSingleVerRelationalStorageExecutor::GetQueryDataAndStepNext(bool isFirstTime, bool isGettingDeletedData,
1139 sqlite3_stmt *queryStmt, DataItem &item, Timestamp &queryTime)
1140 {
1141 if (!isFirstTime) { // For the first time, never step before, can get nothing
1142 int errCode = GetDataItemForSync(queryStmt, item, isGettingDeletedData);
1143 if (errCode != E_OK) {
1144 return errCode;
1145 }
1146 }
1147 return StepNext(isMemDb_, queryStmt, queryTime);
1148 }
1149
GetMissQueryDataAndStepNext(sqlite3_stmt * fullStmt,DataItem & item,Timestamp & missQueryTime)1150 int SQLiteSingleVerRelationalStorageExecutor::GetMissQueryDataAndStepNext(sqlite3_stmt *fullStmt, DataItem &item,
1151 Timestamp &missQueryTime)
1152 {
1153 int errCode = GetMissQueryData(fullStmt, item);
1154 if (errCode != E_OK) {
1155 return errCode;
1156 }
1157 return StepNext(isMemDb_, fullStmt, missQueryTime);
1158 }
1159
GetSyncDataByQuery(std::vector<DataItem> & dataItems,size_t appendLength,const DataSizeSpecInfo & sizeInfo,std::function<int (sqlite3 *,sqlite3_stmt * &,sqlite3_stmt * &,bool &)> getStmt,const TableInfo & tableInfo)1160 int SQLiteSingleVerRelationalStorageExecutor::GetSyncDataByQuery(std::vector<DataItem> &dataItems, size_t appendLength,
1161 const DataSizeSpecInfo &sizeInfo, std::function<int(sqlite3 *, sqlite3_stmt *&, sqlite3_stmt *&, bool &)> getStmt,
1162 const TableInfo &tableInfo)
1163 {
1164 baseTblName_ = tableInfo.GetTableName();
1165 SetTableInfo(tableInfo);
1166 sqlite3_stmt *queryStmt = nullptr;
1167 sqlite3_stmt *fullStmt = nullptr;
1168 bool isGettingDeletedData = false;
1169 int errCode = getStmt(dbHandle_, queryStmt, fullStmt, isGettingDeletedData);
1170 if (errCode != E_OK) {
1171 return errCode;
1172 }
1173
1174 Timestamp queryTime = 0;
1175 Timestamp missQueryTime = (fullStmt == nullptr ? INT64_MAX : 0);
1176
1177 bool isFirstTime = true;
1178 size_t dataTotalSize = 0;
1179 size_t overLongSize = 0;
1180 do {
1181 DataItem item;
1182 if (queryTime < missQueryTime) {
1183 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1184 } else if (queryTime == missQueryTime) {
1185 errCode = GetQueryDataAndStepNext(isFirstTime, isGettingDeletedData, queryStmt, item, queryTime);
1186 if (errCode != E_OK) {
1187 break;
1188 }
1189 errCode = StepNext(isMemDb_, fullStmt, missQueryTime);
1190 } else {
1191 errCode = GetMissQueryDataAndStepNext(fullStmt, item, missQueryTime);
1192 }
1193
1194 if (errCode == E_OK && !isFirstTime) {
1195 errCode = AppendData(sizeInfo, appendLength, overLongSize, dataTotalSize, dataItems, std::move(item));
1196 }
1197
1198 if (errCode != E_OK) {
1199 break;
1200 }
1201
1202 isFirstTime = false;
1203 if (queryTime == INT64_MAX && missQueryTime == INT64_MAX) {
1204 errCode = -E_FINISHED;
1205 break;
1206 }
1207 } while (true);
1208 LOGI("Get sync data finished, rc:%d, record size:%zu, overlong size:%zu, isDeleted:%d",
1209 errCode, dataItems.size(), overLongSize, isGettingDeletedData);
1210 SQLiteUtils::ResetStatement(queryStmt, true, errCode);
1211 SQLiteUtils::ResetStatement(fullStmt, true, errCode);
1212 return errCode;
1213 }
1214
CheckDBModeForRelational()1215 int SQLiteSingleVerRelationalStorageExecutor::CheckDBModeForRelational()
1216 {
1217 std::string journalMode;
1218 int errCode = SQLiteUtils::GetJournalMode(dbHandle_, journalMode);
1219
1220 for (auto &c : journalMode) { // convert to lowercase
1221 c = static_cast<char>(std::tolower(c));
1222 }
1223
1224 if (errCode == E_OK && journalMode != "wal") {
1225 LOGE("Not support journal mode %s for relational db, expect wal mode.", journalMode.c_str());
1226 return -E_NOT_SUPPORT;
1227 }
1228 return errCode;
1229 }
1230
DeleteDistributedDeviceTable(const std::string & device,const std::string & tableName)1231 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTable(const std::string &device,
1232 const std::string &tableName)
1233 {
1234 std::vector<std::string> deviceTables;
1235 int errCode = GetDeviceTableName(dbHandle_, tableName, device, deviceTables);
1236 if (errCode != E_OK) {
1237 LOGE("Get device table name for alter table failed. %d", errCode);
1238 return errCode;
1239 }
1240
1241 LOGD("Begin to delete device table: deviceTable[%zu]", deviceTables.size());
1242 for (const auto &table : deviceTables) {
1243 std::string deleteSql = "DROP TABLE IF EXISTS " + table + ";"; // drop the found table
1244 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1245 if (errCode != E_OK) {
1246 LOGE("Delete device data failed. %d", errCode);
1247 break;
1248 }
1249 }
1250 return errCode;
1251 }
1252
DeleteDistributedAllDeviceTableLog(const std::string & tableName)1253 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedAllDeviceTableLog(const std::string &tableName)
1254 {
1255 std::string deleteLogSql =
1256 "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName +
1257 "_log WHERE flag&0x02=0 AND (cloud_gid = '' OR cloud_gid IS NULL)";
1258 return SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteLogSql);
1259 }
1260
DeleteDistributedDeviceTableLog(const std::string & device,const std::string & tableName)1261 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedDeviceTableLog(const std::string &device,
1262 const std::string &tableName)
1263 {
1264 std::string deleteLogSql = "DELETE FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log WHERE device = ?";
1265 sqlite3_stmt *deleteLogStmt = nullptr;
1266 int errCode = SQLiteUtils::GetStatement(dbHandle_, deleteLogSql, deleteLogStmt);
1267 if (errCode != E_OK) {
1268 LOGE("Get delete device data log statement failed. %d", errCode);
1269 return errCode;
1270 }
1271
1272 errCode = SQLiteUtils::BindTextToStatement(deleteLogStmt, 1, device);
1273 if (errCode != E_OK) {
1274 LOGE("Bind device to delete data log statement failed. %d", errCode);
1275 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1276 return errCode;
1277 }
1278
1279 errCode = SQLiteUtils::StepWithRetry(deleteLogStmt);
1280 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1281 errCode = E_OK;
1282 } else {
1283 LOGE("Delete data log failed. %d", errCode);
1284 }
1285
1286 SQLiteUtils::ResetStatement(deleteLogStmt, true, errCode);
1287 return errCode;
1288 }
1289
DeleteDistributedLogTable(const std::string & tableName)1290 int SQLiteSingleVerRelationalStorageExecutor::DeleteDistributedLogTable(const std::string &tableName)
1291 {
1292 if (tableName.empty()) {
1293 return -E_INVALID_ARGS;
1294 }
1295 std::string logTableName = DBConstant::RELATIONAL_PREFIX + tableName + "_log";
1296 std::string deleteSql = "DROP TABLE IF EXISTS " + logTableName + ";";
1297 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, deleteSql);
1298 if (errCode != E_OK) {
1299 LOGE("Delete distributed log table failed. %d", errCode);
1300 }
1301 return errCode;
1302 }
1303
IsTableOnceDropped(const std::string & tableName,int execCode,bool & onceDropped)1304 int SQLiteSingleVerRelationalStorageExecutor::IsTableOnceDropped(const std::string &tableName, int execCode,
1305 bool &onceDropped)
1306 {
1307 if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) { // The table in schema was dropped
1308 onceDropped = true;
1309 return E_OK;
1310 } else if (execCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1311 std::string keyStr = DBConstant::TABLE_IS_DROPPED + tableName;
1312 Key key;
1313 DBCommon::StringToVector(keyStr, key);
1314 Value value;
1315
1316 int errCode = GetKvData(key, value);
1317 if (errCode == E_OK) {
1318 // if user drop table first, then create again(but don't create distributed table), will reach this branch
1319 onceDropped = true;
1320 return E_OK;
1321 } else if (errCode == -E_NOT_FOUND) {
1322 onceDropped = false;
1323 return E_OK;
1324 } else {
1325 LOGE("[IsTableOnceDropped] query is table dropped failed, %d", errCode);
1326 return errCode;
1327 }
1328 } else {
1329 return execCode;
1330 }
1331 }
1332
CleanResourceForDroppedTable(const std::string & tableName)1333 int SQLiteSingleVerRelationalStorageExecutor::CleanResourceForDroppedTable(const std::string &tableName)
1334 {
1335 int errCode = DeleteDistributedDeviceTable({}, tableName); // Clean the auxiliary tables for the dropped table
1336 if (errCode != E_OK) {
1337 LOGE("Delete device tables for missing distributed table failed. %d", errCode);
1338 return errCode;
1339 }
1340 errCode = DeleteDistributedLogTable(tableName);
1341 if (errCode != E_OK) {
1342 LOGE("Delete log tables for missing distributed table failed. %d", errCode);
1343 return errCode;
1344 }
1345 errCode = DeleteTableTrigger(tableName);
1346 if (errCode != E_OK) {
1347 LOGE("Delete trigger for missing distributed table failed. %d", errCode);
1348 }
1349 return errCode;
1350 }
1351
CheckAndCleanDistributedTable(const std::vector<std::string> & tableNames,std::vector<std::string> & missingTables)1352 int SQLiteSingleVerRelationalStorageExecutor::CheckAndCleanDistributedTable(const std::vector<std::string> &tableNames,
1353 std::vector<std::string> &missingTables)
1354 {
1355 if (tableNames.empty()) {
1356 return E_OK;
1357 }
1358 const std::string checkSql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;";
1359 sqlite3_stmt *stmt = nullptr;
1360 int ret = E_OK;
1361 int errCode = SQLiteUtils::GetStatement(dbHandle_, checkSql, stmt);
1362 if (errCode != E_OK) {
1363 SQLiteUtils::ResetStatement(stmt, true, ret);
1364 return errCode;
1365 }
1366 for (const auto &tableName : tableNames) {
1367 errCode = SQLiteUtils::BindTextToStatement(stmt, 1, tableName); // 1: tablename bind index
1368 if (errCode != E_OK) {
1369 LOGE("Bind table name to check distributed table statement failed. %d", errCode);
1370 break;
1371 }
1372
1373 errCode = SQLiteUtils::StepWithRetry(stmt, false);
1374 bool onceDropped = false;
1375 errCode = IsTableOnceDropped(tableName, errCode, onceDropped);
1376 if (errCode != E_OK) {
1377 LOGE("query is table once dropped failed. %d", errCode);
1378 break;
1379 }
1380 SQLiteUtils::ResetStatement(stmt, false, ret);
1381 if (onceDropped) { // The table in schema was once dropped
1382 errCode = CleanResourceForDroppedTable(tableName);
1383 if (errCode != E_OK) {
1384 break;
1385 }
1386 missingTables.emplace_back(tableName);
1387 }
1388 }
1389 SQLiteUtils::ResetStatement(stmt, true, ret);
1390 return CheckCorruptedStatus(errCode);
1391 }
1392
CreateDistributedDeviceTable(const std::string & device,const TableInfo & baseTbl,const StoreInfo & info)1393 int SQLiteSingleVerRelationalStorageExecutor::CreateDistributedDeviceTable(const std::string &device,
1394 const TableInfo &baseTbl, const StoreInfo &info)
1395 {
1396 if (dbHandle_ == nullptr) {
1397 return -E_INVALID_DB;
1398 }
1399
1400 if (device.empty() || !baseTbl.IsValid()) {
1401 return -E_INVALID_ARGS;
1402 }
1403
1404 std::string deviceTableName = DBCommon::GetDistributedTableName(device, baseTbl.GetTableName(), info);
1405 int errCode = SQLiteUtils::CreateSameStuTable(dbHandle_, baseTbl, deviceTableName);
1406 if (errCode != E_OK) {
1407 LOGE("Create device table failed. %d", errCode);
1408 return errCode;
1409 }
1410
1411 errCode = SQLiteUtils::CloneIndexes(dbHandle_, baseTbl.GetTableName(), deviceTableName);
1412 if (errCode != E_OK) {
1413 LOGE("Copy index to device table failed. %d", errCode);
1414 }
1415 return errCode;
1416 }
1417
CheckQueryObjectLegal(const TableInfo & table,QueryObject & query,const std::string & schemaVersion)1418 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const TableInfo &table, QueryObject &query,
1419 const std::string &schemaVersion)
1420 {
1421 if (dbHandle_ == nullptr) {
1422 return -E_INVALID_DB;
1423 }
1424
1425 TableInfo newTable;
1426 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, table.GetTableName(), newTable);
1427 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1428 LOGE("Check new schema failed. %d", errCode);
1429 return errCode;
1430 } else {
1431 errCode = table.CompareWithTable(newTable, schemaVersion);
1432 if (errCode != -E_RELATIONAL_TABLE_EQUAL && errCode != -E_RELATIONAL_TABLE_COMPATIBLE) {
1433 LOGE("Check schema failed, schema was changed. %d", errCode);
1434 return -E_DISTRIBUTED_SCHEMA_CHANGED;
1435 } else {
1436 errCode = E_OK;
1437 }
1438 }
1439
1440 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1441 if (errCode != E_OK) {
1442 LOGE("Get query helper for check query failed. %d", errCode);
1443 return errCode;
1444 }
1445
1446 if (!query.IsQueryForRelationalDB()) {
1447 LOGE("Not support for this query type.");
1448 return -E_NOT_SUPPORT;
1449 }
1450
1451 SyncTimeRange defaultTimeRange;
1452 sqlite3_stmt *stmt = nullptr;
1453 errCode = helper.GetRelationalQueryStatement(dbHandle_, defaultTimeRange.beginTime, defaultTimeRange.endTime, {},
1454 stmt);
1455 if (errCode != E_OK) {
1456 LOGE("Get query statement for check query failed. %d", errCode);
1457 }
1458
1459 SQLiteUtils::ResetStatement(stmt, true, errCode);
1460 return errCode;
1461 }
1462
CheckQueryObjectLegal(const QuerySyncObject & query)1463 int SQLiteSingleVerRelationalStorageExecutor::CheckQueryObjectLegal(const QuerySyncObject &query)
1464 {
1465 if (dbHandle_ == nullptr) {
1466 return -E_INVALID_DB;
1467 }
1468 TableInfo newTable;
1469 int errCode = SQLiteUtils::AnalysisSchema(dbHandle_, query.GetTableName(), newTable);
1470 if (errCode != E_OK) {
1471 LOGE("Check new schema failed. %d", errCode);
1472 }
1473 return errCode;
1474 }
1475
GetMaxTimestamp(const std::vector<std::string> & tableNames,Timestamp & maxTimestamp) const1476 int SQLiteSingleVerRelationalStorageExecutor::GetMaxTimestamp(const std::vector<std::string> &tableNames,
1477 Timestamp &maxTimestamp) const
1478 {
1479 maxTimestamp = 0;
1480 for (const auto &tableName : tableNames) {
1481 const std::string sql = "SELECT MAX(timestamp) FROM " + DBConstant::RELATIONAL_PREFIX + tableName + "_log;";
1482 sqlite3_stmt *stmt = nullptr;
1483 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1484 if (errCode != E_OK) {
1485 return errCode;
1486 }
1487 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1488 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1489 maxTimestamp = std::max(maxTimestamp, static_cast<Timestamp>(sqlite3_column_int64(stmt, 0))); // 0 is index
1490 errCode = E_OK;
1491 }
1492 SQLiteUtils::ResetStatement(stmt, true, errCode);
1493 if (errCode != E_OK) {
1494 maxTimestamp = 0;
1495 return errCode;
1496 }
1497 }
1498 return E_OK;
1499 }
1500
SetLogTriggerStatus(bool status)1501 int SQLiteSingleVerRelationalStorageExecutor::SetLogTriggerStatus(bool status)
1502 {
1503 const std::string key = "log_trigger_switch";
1504 std::string val = status ? "true" : "false";
1505 std::string sql = "INSERT OR REPLACE INTO " + DBConstant::RELATIONAL_PREFIX + "metadata" +
1506 " VALUES ('" + key + "', '" + val + "')";
1507 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1508 if (errCode != E_OK) {
1509 LOGE("Set log trigger to %s failed. errCode=%d", val.c_str(), errCode);
1510 }
1511 return errCode;
1512 }
1513
1514 namespace {
GetRowDatas(sqlite3_stmt * stmt,bool isMemDb,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1515 int GetRowDatas(sqlite3_stmt *stmt, bool isMemDb, std::vector<std::string> &colNames,
1516 std::vector<RelationalRowData *> &data)
1517 {
1518 size_t totalLength = 0;
1519 do {
1520 int errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb);
1521 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1522 return E_OK;
1523 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1524 LOGE("Get data by bind sql failed:%d", errCode);
1525 return errCode;
1526 }
1527
1528 if (colNames.empty()) {
1529 SQLiteUtils::GetSelectCols(stmt, colNames); // Get column names.
1530 }
1531 auto relaRowData = new (std::nothrow) RelationalRowDataImpl(SQLiteRelationalUtils::GetSelectValues(stmt));
1532 if (relaRowData == nullptr) {
1533 LOGE("ExecuteQueryBySqlStmt OOM");
1534 return -E_OUT_OF_MEMORY;
1535 }
1536
1537 auto dataSz = relaRowData->CalcLength();
1538 if (dataSz == 0) { // invalid data
1539 delete relaRowData;
1540 relaRowData = nullptr;
1541 continue;
1542 }
1543
1544 totalLength += static_cast<size_t>(dataSz);
1545 if (totalLength > static_cast<uint32_t>(DBConstant::MAX_REMOTEDATA_SIZE)) { // the set has been full
1546 delete relaRowData;
1547 relaRowData = nullptr;
1548 LOGE("ExecuteQueryBySqlStmt OVERSIZE");
1549 return -E_REMOTE_OVER_SIZE;
1550 }
1551 data.push_back(relaRowData);
1552 } while (true);
1553 return E_OK;
1554 }
1555 }
1556
1557 // sql must not be empty, colNames and data must be empty
ExecuteQueryBySqlStmt(const std::string & sql,const std::vector<std::string> & bindArgs,int packetSize,std::vector<std::string> & colNames,std::vector<RelationalRowData * > & data)1558 int SQLiteSingleVerRelationalStorageExecutor::ExecuteQueryBySqlStmt(const std::string &sql,
1559 const std::vector<std::string> &bindArgs, int packetSize, std::vector<std::string> &colNames,
1560 std::vector<RelationalRowData *> &data)
1561 {
1562 int errCode = SQLiteUtils::SetAuthorizer(dbHandle_, &PermitSelect);
1563 if (errCode != E_OK) {
1564 return errCode;
1565 }
1566
1567 sqlite3_stmt *stmt = nullptr;
1568 errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1569 if (errCode != E_OK) {
1570 (void)SQLiteUtils::SetAuthorizer(dbHandle_, nullptr);
1571 return errCode;
1572 }
1573 ResFinalizer finalizer([this, &stmt, &errCode] {
1574 (void)SQLiteUtils::SetAuthorizer(this->dbHandle_, nullptr);
1575 SQLiteUtils::ResetStatement(stmt, true, errCode);
1576 });
1577 for (size_t i = 0; i < bindArgs.size(); ++i) {
1578 errCode = SQLiteUtils::BindTextToStatement(stmt, i + 1, bindArgs.at(i));
1579 if (errCode != E_OK) {
1580 return errCode;
1581 }
1582 }
1583 return GetRowDatas(stmt, isMemDb_, colNames, data);
1584 }
1585
CheckEncryptedOrCorrupted() const1586 int SQLiteSingleVerRelationalStorageExecutor::CheckEncryptedOrCorrupted() const
1587 {
1588 if (dbHandle_ == nullptr) {
1589 return -E_INVALID_DB;
1590 }
1591
1592 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, "SELECT count(*) FROM sqlite_master;");
1593 if (errCode != E_OK) {
1594 LOGE("[SingVerRelaExec] CheckEncryptedOrCorrupted failed:%d", errCode);
1595 }
1596 return errCode;
1597 }
1598
GetExistsDeviceList(std::set<std::string> & devices) const1599 int SQLiteSingleVerRelationalStorageExecutor::GetExistsDeviceList(std::set<std::string> &devices) const
1600 {
1601 return SqliteMetaExecutor::GetExistsDevicesFromMeta(dbHandle_, SqliteMetaExecutor::MetaMode::RDB,
1602 isMemDb_, devices);
1603 }
1604
GetUploadCount(const Timestamp & timestamp,bool isCloudForcePush,QuerySyncObject & query,int64_t & count)1605 int SQLiteSingleVerRelationalStorageExecutor::GetUploadCount(const Timestamp ×tamp, bool isCloudForcePush,
1606 QuerySyncObject &query, int64_t &count)
1607 {
1608 int errCode;
1609 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1610 if (errCode != E_OK) {
1611 return errCode;
1612 }
1613 std::string tableName = query.GetRelationTableName();
1614 sqlite3_stmt *stmt = nullptr;
1615 errCode = helper.GetCountRelationalCloudQueryStatement(dbHandle_, timestamp, isCloudForcePush, stmt);
1616 if (errCode != E_OK) {
1617 LOGE("failed to get count statement %d", errCode);
1618 return errCode;
1619 }
1620 errCode = SQLiteUtils::StepWithRetry(stmt, isMemDb_);
1621 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1622 count = static_cast<int64_t>(sqlite3_column_int64(stmt, 0));
1623 errCode = E_OK;
1624 } else {
1625 LOGE("Failed to get the count to be uploaded. %d", errCode);
1626 }
1627 SQLiteUtils::ResetStatement(stmt, true, errCode);
1628 LOGD("upload count is %d, isCloudForcePush is %d", count, isCloudForcePush);
1629 return errCode;
1630 }
1631
UpdateCloudLogGid(const CloudSyncData & cloudDataResult,bool ignoreEmptyGid)1632 int SQLiteSingleVerRelationalStorageExecutor::UpdateCloudLogGid(const CloudSyncData &cloudDataResult,
1633 bool ignoreEmptyGid)
1634 {
1635 if (cloudDataResult.insData.extend.empty() || cloudDataResult.insData.rowid.empty() ||
1636 cloudDataResult.insData.extend.size() != cloudDataResult.insData.rowid.size()) {
1637 return -E_INVALID_ARGS;
1638 }
1639 std::string sql = "UPDATE '" + DBCommon::GetLogTableName(cloudDataResult.tableName)
1640 + "' SET cloud_gid = ? WHERE data_key = ? ";
1641 sqlite3_stmt *stmt = nullptr;
1642 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, stmt);
1643 if (errCode != E_OK) {
1644 return errCode;
1645 }
1646 errCode = BindStmtWithCloudGid(cloudDataResult, ignoreEmptyGid, stmt);
1647 int resetCode = E_OK;
1648 SQLiteUtils::ResetStatement(stmt, true, resetCode);
1649 return errCode == E_OK ? resetCode : errCode;
1650 }
1651
GetSyncCloudData(CloudSyncData & cloudDataResult,const uint32_t & maxSize,SQLiteSingleVerRelationalContinueToken & token)1652 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudData(CloudSyncData &cloudDataResult,
1653 const uint32_t &maxSize, SQLiteSingleVerRelationalContinueToken &token)
1654 {
1655 token.GetCloudTableSchema(tableSchema_);
1656 sqlite3_stmt *queryStmt = nullptr;
1657 bool isStepNext = false;
1658 int errCode = token.GetCloudStatement(dbHandle_, cloudDataResult.isCloudForcePushStrategy, queryStmt, isStepNext);
1659 if (errCode != E_OK) {
1660 (void)token.ReleaseCloudStatement();
1661 return errCode;
1662 }
1663 uint32_t totalSize = 0;
1664 uint32_t stepNum = -1;
1665 do {
1666 if (isStepNext) {
1667 errCode = SQLiteRelationalUtils::StepNext(isMemDb_, queryStmt);
1668 if (errCode != E_OK) {
1669 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1670 break;
1671 }
1672 }
1673 isStepNext = true;
1674 errCode = GetCloudDataForSync(queryStmt, cloudDataResult, ++stepNum, totalSize, maxSize);
1675 } while (errCode == E_OK);
1676 LOGD("Get cloud sync data, insData:%u, upData:%u, delLog:%u", cloudDataResult.insData.record.size(),
1677 cloudDataResult.updData.record.size(), cloudDataResult.delData.extend.size());
1678 if (errCode != -E_UNFINISHED) {
1679 (void)token.ReleaseCloudStatement();
1680 }
1681 return errCode;
1682 }
1683
GetSyncCloudGid(QuerySyncObject & query,const SyncTimeRange & syncTimeRange,bool isCloudForcePushStrategy,std::vector<std::string> & cloudGid)1684 int SQLiteSingleVerRelationalStorageExecutor::GetSyncCloudGid(QuerySyncObject &query,
1685 const SyncTimeRange &syncTimeRange, bool isCloudForcePushStrategy, std::vector<std::string> &cloudGid)
1686 {
1687 sqlite3_stmt *queryStmt = nullptr;
1688 int errCode = E_OK;
1689 SqliteQueryHelper helper = query.GetQueryHelper(errCode);
1690 if (errCode != E_OK) {
1691 return errCode;
1692 }
1693 errCode = helper.GetGidRelationalCloudQueryStatement(dbHandle_, syncTimeRange.beginTime, tableSchema_.fields,
1694 isCloudForcePushStrategy, queryStmt);
1695 if (errCode != E_OK) {
1696 return errCode;
1697 }
1698 do {
1699 errCode = SQLiteRelationalUtils::StepNext(isMemDb_, queryStmt);
1700 if (errCode != E_OK) {
1701 errCode = (errCode == -E_FINISHED ? E_OK : errCode);
1702 break;
1703 }
1704 GetCloudGid(queryStmt, cloudGid);
1705 } while (errCode == E_OK);
1706 int resetStatementErrCode = E_OK;
1707 SQLiteUtils::ResetStatement(queryStmt, true, resetStatementErrCode);
1708 queryStmt = nullptr;
1709 return (errCode == E_OK ? resetStatementErrCode : errCode);
1710 }
1711
GetCloudDataForSync(sqlite3_stmt * statement,CloudSyncData & cloudDataResult,uint32_t & stepNum,uint32_t & totalSize,const uint32_t & maxSize)1712 int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(sqlite3_stmt *statement,
1713 CloudSyncData &cloudDataResult, uint32_t &stepNum, uint32_t &totalSize, const uint32_t &maxSize)
1714 {
1715 VBucket log;
1716 VBucket extraLog;
1717 uint32_t preSize = totalSize;
1718 GetCloudLog(statement, log, totalSize, cloudDataResult.isShared);
1719 GetCloudExtraLog(statement, extraLog);
1720
1721 VBucket data;
1722 int64_t flag = 0;
1723 int errCode = CloudStorageUtils::GetValueFromVBucket(FLAG, extraLog, flag);
1724 if (errCode != E_OK) {
1725 return errCode;
1726 }
1727 if ((static_cast<uint64_t>(flag) & DataItem::DELETE_FLAG) == 0) {
1728 for (size_t cid = 0; cid < tableSchema_.fields.size(); ++cid) {
1729 Type cloudValue;
1730 errCode = SQLiteRelationalUtils::GetCloudValueByType(statement,
1731 tableSchema_.fields[cid].type, cid + 9, cloudValue); // 9 is the start index of query cloud data
1732 if (errCode != E_OK) {
1733 return errCode;
1734 }
1735 SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize);
1736 errCode = PutVBucketByType(data, tableSchema_.fields[cid], cloudValue);
1737 if (errCode != E_OK) {
1738 return errCode;
1739 }
1740 }
1741 }
1742
1743 if (IsGetCloudDataContinue(stepNum, totalSize, maxSize)) {
1744 errCode = IdentifyCloudType(cloudDataResult, data, log, extraLog);
1745 } else {
1746 errCode = -E_UNFINISHED;
1747 }
1748 if (errCode == -E_IGNORE_DATA) {
1749 errCode = E_OK;
1750 totalSize = preSize;
1751 stepNum--;
1752 }
1753 return errCode;
1754 }
1755
PutVBucketByType(VBucket & vBucket,const Field & field,Type & cloudValue)1756 int SQLiteSingleVerRelationalStorageExecutor::PutVBucketByType(VBucket &vBucket, const Field &field, Type &cloudValue)
1757 {
1758 if (field.type == TYPE_INDEX<Asset> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1759 Asset asset;
1760 int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get<Bytes>(cloudValue), asset);
1761 if (errCode != E_OK) {
1762 return errCode;
1763 }
1764 if (!CloudStorageUtils::CheckAssetStatus({asset})) {
1765 return -E_CLOUD_INVALID_ASSET;
1766 }
1767 vBucket.insert_or_assign(field.colName, asset);
1768 } else if (field.type == TYPE_INDEX<Assets> && cloudValue.index() == TYPE_INDEX<Bytes>) {
1769 Assets assets;
1770 int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get<Bytes>(cloudValue), assets);
1771 if (errCode != E_OK) {
1772 return errCode;
1773 }
1774 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(assets)) {
1775 return -E_CLOUD_ERROR;
1776 }
1777 if (!CloudStorageUtils::CheckAssetStatus(assets)) {
1778 return -E_CLOUD_INVALID_ASSET;
1779 }
1780 vBucket.insert_or_assign(field.colName, assets);
1781 } else {
1782 vBucket.insert_or_assign(field.colName, cloudValue);
1783 }
1784 return E_OK;
1785 }
1786
SetLocalSchema(const RelationalSchemaObject & localSchema)1787 void SQLiteSingleVerRelationalStorageExecutor::SetLocalSchema(const RelationalSchemaObject &localSchema)
1788 {
1789 localSchema_ = localSchema;
1790 }
1791
CleanCloudDataOnLogTable(const std::string & logTableName)1792 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataOnLogTable(const std::string &logTableName)
1793 {
1794 std::string cleanLogSql = "UPDATE " + logTableName + " SET " + FLAG + " = " +
1795 SET_FLAG_LOCAL_AND_CLEAN_WAIT_COMPENSATED_SYNC + ", " +
1796 DEVICE_FIELD + " = '', " + CLOUD_GID_FIELD + " = '' WHERE (" + FLAG_IS_LOGIC_DELETE + ") OR " +
1797 CLOUD_GID_FIELD + " IS NOT NULL AND " + CLOUD_GID_FIELD + " != '';";
1798 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1799 if (errCode != E_OK) {
1800 LOGE("clean cloud log failed, %d", errCode);
1801 return errCode;
1802 }
1803 cleanLogSql = "DELETE FROM " + logTableName + " WHERE " + FLAG_IS_CLOUD + " AND " + DATA_IS_DELETE + ";";
1804 return SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1805 }
1806
CleanCloudDataAndLogOnUserTable(const std::string & tableName,const std::string & logTableName,const RelationalSchemaObject & localSchema)1807 int SQLiteSingleVerRelationalStorageExecutor::CleanCloudDataAndLogOnUserTable(const std::string &tableName,
1808 const std::string &logTableName, const RelationalSchemaObject &localSchema)
1809 {
1810 std::string sql = "DELETE FROM '" + tableName + "' WHERE " + std::string(DBConstant::SQLITE_INNER_ROWID) +
1811 " IN (SELECT " + DATAKEY + " FROM '" + logTableName + "' WHERE (" + FLAG_IS_LOGIC_DELETE +
1812 ") OR CLOUD_GID IS NOT NULL AND CLOUD_GID != '' AND " + FLAG_IS_CLOUD + ");";
1813 int errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, sql);
1814 if (errCode != E_OK) {
1815 LOGE("Failed to delete cloud data on usertable, %d.", errCode);
1816 return errCode;
1817 }
1818 std::string cleanLogSql = "DELETE FROM '" + logTableName + "' WHERE " + FLAG_IS_CLOUD + ";";
1819 errCode = SQLiteUtils::ExecuteRawSQL(dbHandle_, cleanLogSql);
1820 if (errCode != E_OK) {
1821 LOGE("Failed to delete cloud data on log table, %d.", errCode);
1822 return errCode;
1823 }
1824 errCode = DoCleanAssetId(tableName, localSchema);
1825 if (errCode != E_OK) {
1826 LOGE("[Storage Executor] failed to clean asset id when clean cloud data, %d", errCode);
1827 return errCode;
1828 }
1829 errCode = CleanCloudDataOnLogTable(logTableName);
1830 if (errCode != E_OK) {
1831 LOGE("Failed to clean gid on log table, %d.", errCode);
1832 }
1833 return errCode;
1834 }
1835
GetCleanCloudDataKeys(const std::string & logTableName,std::vector<int64_t> & dataKeys,bool distinguishCloudFlag)1836 int SQLiteSingleVerRelationalStorageExecutor::GetCleanCloudDataKeys(const std::string &logTableName,
1837 std::vector<int64_t> &dataKeys, bool distinguishCloudFlag)
1838 {
1839 sqlite3_stmt *selectStmt = nullptr;
1840 std::string sql = "SELECT DATA_KEY FROM '" + logTableName + "' WHERE " + CLOUD_GID_FIELD +
1841 " IS NOT NULL AND " + CLOUD_GID_FIELD + " != ''";
1842 if (distinguishCloudFlag) {
1843 sql += " AND ";
1844 sql += FLAG_IS_CLOUD;
1845 }
1846 sql += ";";
1847 int errCode = SQLiteUtils::GetStatement(dbHandle_, sql, selectStmt);
1848 if (errCode != E_OK) {
1849 LOGE("Get select data_key statement failed, %d", errCode);
1850 return errCode;
1851 }
1852 do {
1853 errCode = SQLiteUtils::StepWithRetry(selectStmt);
1854 if (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW)) {
1855 dataKeys.push_back(sqlite3_column_int64(selectStmt, 0));
1856 } else if (errCode != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
1857 LOGE("SQLite step failed when query log's data_key : %d", errCode);
1858 break;
1859 }
1860 } while (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_ROW));
1861 SQLiteUtils::ResetStatement(selectStmt, true, errCode);
1862 return (errCode == SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) ? E_OK : errCode;
1863 }
1864
GetUpdateLogRecordStatement(const TableSchema & tableSchema,const VBucket & vBucket,OpType opType,std::vector<std::string> & updateColName,sqlite3_stmt * & updateLogStmt)1865 int SQLiteSingleVerRelationalStorageExecutor::GetUpdateLogRecordStatement(const TableSchema &tableSchema,
1866 const VBucket &vBucket, OpType opType, std::vector<std::string> &updateColName, sqlite3_stmt *&updateLogStmt)
1867 {
1868 std::string updateLogSql = "update " + DBCommon::GetLogTableName(tableSchema.name) + " set ";
1869 if (opType == OpType::ONLY_UPDATE_GID) {
1870 updateLogSql += "cloud_gid = ?";
1871 updateColName.push_back(CloudDbConstant::GID_FIELD);
1872 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO) {
1873 updateLogSql += "flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK); // clear 2th bit of flag
1874 } else if (opType == OpType::SET_CLOUD_FORCE_PUSH_FLAG_ONE) {
1875 updateLogSql += "flag = flag | " + std::to_string(SET_FLAG_ONE_MASK); // set 2th bit of flag
1876 } else if (opType == OpType::UPDATE_TIMESTAMP) {
1877 updateLogSql += "device = 'cloud', flag = flag & " + std::to_string(SET_CLOUD_FLAG) +
1878 ", timestamp = ?, cloud_gid = '', version = ''";
1879 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1880 } else if (opType == OpType::CLEAR_GID) {
1881 updateLogSql += "cloud_gid = '', version = '', flag = flag & " + std::to_string(SET_FLAG_ZERO_MASK);
1882 } else {
1883 if (opType == OpType::DELETE) {
1884 updateLogSql += GetCloudDeleteSql(DBCommon::GetLogTableName(tableSchema.name));
1885 } else {
1886 updateLogSql += GetUpdateDataFlagSql() + ", cloud_gid = ?, ";
1887 updateColName.push_back(CloudDbConstant::GID_FIELD);
1888 }
1889 updateLogSql += "device = 'cloud', timestamp = ?";
1890 updateColName.push_back(CloudDbConstant::MODIFY_FIELD);
1891 }
1892 // only share table need to set version
1893 if (CloudStorageUtils::IsSharedTable(tableSchema) && opType != OpType::DELETE &&
1894 opType != OpType::CLEAR_GID && opType != OpType::UPDATE_TIMESTAMP) {
1895 updateLogSql += ", version = ?";
1896 updateColName.push_back(CloudDbConstant::VERSION_FIELD);
1897 }
1898
1899 int errCode = AppendUpdateLogRecordWhereSqlCondition(tableSchema, vBucket, updateLogSql);
1900 if (errCode != E_OK) {
1901 return errCode;
1902 }
1903
1904 errCode = SQLiteUtils::GetStatement(dbHandle_, updateLogSql, updateLogStmt);
1905 if (errCode != E_OK) {
1906 LOGE("Get update log statement failed when update cloud data, %d", errCode);
1907 }
1908 return errCode;
1909 }
1910 } // namespace DistributedDB
1911 #endif
1912